mike-neckのブログ

Java or Groovy or Swift or Golang

datomicチュートリアル7日目 #datomic

今日はdatomicと時間との関わりについてです。

Datomicのコンセプトは新しい事実が古い事実を更新しないということです。その代わりにすべての事実をトラッキングできるようにしてあります。

datomicでは次のようなデータを取得する手段を提供しています。

  • ある時点でのデータ
  • ある時点から別の時点までの変更データ
  • ある時点から予想される未来の状態のデータ

トランザクション発生のタイミングを取得する

次のクエリーでトランザクション発生のタイミングを取得できます。

[:find ?when :where[_ :db/txInstant ?when]]

次のコードでサンプルデータにより発生した二つのトランザクションのタイミングを取得できます。

@Test
void トランザクション自体もエンティティ() {
    //トランザクションが発生した時刻を取得するクエリー
    HashSet<PersistentVector> results = Peer.query($/[
:find ?when
:where
[_ :db/txInstant ?when]]/$ as String, db)
    assert results.size() == 3
    //トランザクション発生時刻を発生順の逆順にソート
    results.sort {PersistentVector left, PersistentVector right ->
        Date l = left[0] as Date
        Date r = right[0] as Date
        -l.compareTo(r)
    }.eachWithIndex {v, i ->
        //最新のトランザクションから表示される
        LOG.debug "[${test.methodName}] [${i}] value[class: [${v.getClass()}], value: [${v[0].format('yyyy/MM/dd HH:mm:ss.SSS')}]]"
    }
}

Groovyらしからぬ、型々したコードです(キャストしまくってるけど)。

これを実行すると次のような結果が得られます。

[0] value[class: [class clojure.lang.PersistentVector], value: [2015/04/02 17:50:14.192]]
[1] value[class: [class clojure.lang.PersistentVector], value: [2015/04/02 17:50:14.061]]
[2] value[class: [class clojure.lang.PersistentVector], value: [1970/01/01 09:00:00.000]]

逆順にソートして最近の変更から表示されています。上の二件は、@Beforeで実行したトランザクションを表します。

['seattle-schema.edn', 'seattle-data0.edn'].each {file ->
    List tx = DatomicUtil.from("${DIR}/${file}")[0]
    connection.transact(tx).get()
}

2015/04/02 17:50:14.192seattle-data0.ednファイルによるトランザクション2015/04/02 17:50:14.061seattle-schema.ednファイルによるトランザクションです。

さて、以下ではseattle-data0.ednファイルによるトランザクションの時刻をDate型のdataTxDateに、seattle-schema.ednファイルによるトランザクションの時刻をschemaTxDateにぶっこんでコードを動かします。

また、特に断りのない場合は次のクエリーを実行します。このクエリはCOUNT_QUERYによって参照されます。

[:find ?c :where[?c :community/name]]

過去の指定時点のデータベースを取得するメソッドasOf

Database#asOf(Date)はそのデータベースがトラックしている指定された時点のデータベースを返すメソッドです。

@Test
void ある時点でのデータを取得する() {
    [
            //スキーマ定義時点でデータは0件
            [when: schemaTxDate, expected: 0],
            //データ登録後でデータは150件
            [when: dataTxDate, expected: 150]
    ].each {
        def dbAtInstant = db.asOf(it.when)
        def results = Peer.query(COUNT_QUERY, dbAtInstant)
        assert results.size() == it.expected
    }
}

スキーマ定義時点では:communityエンティティにあるデータは0件であり、データ登録後は150件であり、このテストは通ります。

差分データベースを取得するメソッドsince

Database#since(Date)Databaseインスタンスの時点と指定時点のデータベースの差分を取得するメソッドです。

@Test
void 差分データを取得する() {
    [
            //スキーマ定義後に追加されたデータは150件
            [when: schemaTxDate, expected: 150],
            //データ登録後に追加されたデータは0件
            [when: dataTxDate, expected: 0]
    ].each {
        def dbSince = db.since(it.when)
        def results = Peer.query(COUNT_QUERY, dbSince)
        assert results.size() == it.expected
    }
}

こんどは逆に現在の時点の差分データベースを取得しますので、スキーマ定義後との差分件数は150件、データ登録後との差分件数は0件であることをテストしています。

未来のデータベースを取得するメソッドwith

未来のデータベースを取得するというのは奇妙な響きがあります。なぜならばデータベースは事実の集合だからです。

ここで未来と言っているのは、通常のRDBMSでのコミット前の状態を指します。

public final class DatomicUtil {
    public static List<?> from(String file) throws IOException {
        try (FileReader r = new FileReader(file)) {
            return Util.readAll(r);
        } catch (IOException e) {
            throw e;
        }
    }
}

このコードはこれまでも使ってきたトランザクションファイルを読み込むメソッドですが、これでサンプルで提供されているseattle-data1.ednを読み込みます。

@Test
void 未来のデータを想像する() {
    // トランザクションデータを読み込み
    List newTx = DatomicUtil.from("${DIR}/seattle-data1.edn")[0] as List
    // トランザクションが発生していないので単なる予想された未来のデータベースを取得
    Map dbMap = db.with(newTx)
    def futureDb = dbMap[Connection.DB_AFTER]
    // 追加されたデータ件数108が追加されていること
    assert Peer.query(COUNT_QUERY, futureDb).size() == 258
}

Database#with(List)によって返されるMapKeywordであるConnection.DB_AFTERを与えると、未来のデータベースが取得できます。

これをもとにクエリーを発行すると差分108件が追加(トータル258件)された状態になっていることをテストするコードです。

注意点とまとめ

ここまでに書いてきたコードから想像できるかと思いますが、Databaseインスタンスはある時点のデータベースを表すインスタンスです。

したがって、もし最新の状態をほしい場合は、Connection#db()メソッドを使って最新のデータベースを取得することが望ましいです。

以上の注意点を考慮しつつ、本日のまとめサンプルコードを下記に示します。

@Test
void まとめ() {
    //現時点でのデータは150件
    assert Peer.query(COUNT_QUERY, db).size() == 150
    def newTx = DatomicUtil.from("${DIR}/seattle-data1.edn")[0] as List
    Map txResultMap = connection.transact(newTx).get()
    //@Beforeで作成されているDatabaseのインスタンスは@Before時点でのDatabase
    assert Peer.query(COUNT_QUERY, db).size() != 258
    //最新のDatabaseを取得したい場合はconnection.db()を用いる
    assert Peer.query(COUNT_QUERY, connection.db()).size() == 258
    def sinceDb = connection.db().since(dataTxDate)
    assert Peer.query(COUNT_QUERY, sinceDb).size() == 108
}

Databaseインスタンスdbseattle-data0.ednトランザクション発生後のデータベースのインスタンスです。

したがって、上記コードの「げ時点でのデータは150件」はデータ登録後の状態のレコード登録件数を取得します。

次に新たなトランザクションを発生させてから、データ件数を取得するクエリーをdbに対して発行していますが、これは追加された分のデータはトラッキング外ですし、db取得時のデータ件数は150件のままですので、追加されたデータ件数は含まれていません。したがって、258件のデータを持っていません。

最新のDatabaseインスタンスを取得したい場合はConnection#db()で最新のDatabaseインスタンスを取得して、データ件数を取得します。そうすれば追加された分のデータも返さて258件になります。


以上

次回はデータ操作についてやっていきたいと思います。