今日はdatomicと時間との関わりについてです。
Datomicのコンセプトは新しい事実が古い事実を更新しないということです。その代わりにすべての事実をトラッキングできるようにしてあります。
datomicでは次のようなデータを取得する手段を提供しています。
- ある時点でのデータ
- ある時点から別の時点までの変更データ
- ある時点から予想される未来の状態のデータ
トランザクション発生のタイミングを取得する
次のクエリーでトランザクション発生のタイミングを取得できます。
[:find ?when :where[_ :db/txInstant ?when]]
次のコードでサンプルデータにより発生した二つのトランザクションのタイミングを取得できます。
@Test void トランザクション自体もエンティティ() { //トランザクションが発生した時刻を取得するクエリー HashSet<PersistentVector> results = Peer.query($/[ :find ?when :where [_ :db/txInstant ?when]]/$ as String, db) assert results.size() == 3 //トランザクション発生時刻を発生順の逆順にソート results.sort {PersistentVector left, PersistentVector right -> Date l = left[0] as Date Date r = right[0] as Date -l.compareTo(r) }.eachWithIndex {v, i -> //最新のトランザクションから表示される LOG.debug "[${test.methodName}] [${i}] value[class: [${v.getClass()}], value: [${v[0].format('yyyy/MM/dd HH:mm:ss.SSS')}]]" } }
Groovyらしからぬ、型々したコードです(キャストしまくってるけど)。
これを実行すると次のような結果が得られます。
[0] value[class: [class clojure.lang.PersistentVector], value: [2015/04/02 17:50:14.192]] [1] value[class: [class clojure.lang.PersistentVector], value: [2015/04/02 17:50:14.061]] [2] value[class: [class clojure.lang.PersistentVector], value: [1970/01/01 09:00:00.000]]
逆順にソートして最近の変更から表示されています。上の二件は、@Before
で実行したトランザクションを表します。
['seattle-schema.edn', 'seattle-data0.edn'].each {file -> List tx = DatomicUtil.from("${DIR}/${file}")[0] connection.transact(tx).get() }
2015/04/02 17:50:14.192
はseattle-data0.edn
ファイルによるトランザクション、2015/04/02 17:50:14.061
はseattle-schema.edn
ファイルによるトランザクションです。
さて、以下ではseattle-data0.edn
ファイルによるトランザクションの時刻をDate
型のdataTxDate
に、seattle-schema.edn
ファイルによるトランザクションの時刻をschemaTxDate
にぶっこんでコードを動かします。
また、特に断りのない場合は次のクエリーを実行します。このクエリはCOUNT_QUERY
によって参照されます。
[:find ?c :where[?c :community/name]]
過去の指定時点のデータベースを取得するメソッドasOf
Database#asOf(Date)
はそのデータベースがトラックしている指定された時点のデータベースを返すメソッドです。
@Test void ある時点でのデータを取得する() { [ //スキーマ定義時点でデータは0件 [when: schemaTxDate, expected: 0], //データ登録後でデータは150件 [when: dataTxDate, expected: 150] ].each { def dbAtInstant = db.asOf(it.when) def results = Peer.query(COUNT_QUERY, dbAtInstant) assert results.size() == it.expected } }
スキーマ定義時点では:community
エンティティにあるデータは0件であり、データ登録後は150件であり、このテストは通ります。
差分データベースを取得するメソッドsince
Database#since(Date)
はDatabase
のインスタンスの時点と指定時点のデータベースの差分を取得するメソッドです。
@Test void 差分データを取得する() { [ //スキーマ定義後に追加されたデータは150件 [when: schemaTxDate, expected: 150], //データ登録後に追加されたデータは0件 [when: dataTxDate, expected: 0] ].each { def dbSince = db.since(it.when) def results = Peer.query(COUNT_QUERY, dbSince) assert results.size() == it.expected } }
こんどは逆に現在の時点の差分データベースを取得しますので、スキーマ定義後との差分件数は150件、データ登録後との差分件数は0件であることをテストしています。
未来のデータベースを取得するメソッドwith
未来のデータベースを取得するというのは奇妙な響きがあります。なぜならばデータベースは事実の集合だからです。
ここで未来と言っているのは、通常のRDBMSでのコミット前の状態を指します。
public final class DatomicUtil { public static List<?> from(String file) throws IOException { try (FileReader r = new FileReader(file)) { return Util.readAll(r); } catch (IOException e) { throw e; } } }
このコードはこれまでも使ってきたトランザクションファイルを読み込むメソッドですが、これでサンプルで提供されているseattle-data1.edn
を読み込みます。
@Test void 未来のデータを想像する() { // トランザクションデータを読み込み List newTx = DatomicUtil.from("${DIR}/seattle-data1.edn")[0] as List // トランザクションが発生していないので単なる予想された未来のデータベースを取得 Map dbMap = db.with(newTx) def futureDb = dbMap[Connection.DB_AFTER] // 追加されたデータ件数108が追加されていること assert Peer.query(COUNT_QUERY, futureDb).size() == 258 }
Database#with(List)
によって返されるMap
にKeyword
であるConnection.DB_AFTER
を与えると、未来のデータベースが取得できます。
これをもとにクエリーを発行すると差分108件が追加(トータル258件)された状態になっていることをテストするコードです。
注意点とまとめ
ここまでに書いてきたコードから想像できるかと思いますが、Database
のインスタンスはある時点のデータベースを表すインスタンスです。
したがって、もし最新の状態をほしい場合は、Connection#db()
メソッドを使って最新のデータベースを取得することが望ましいです。
以上の注意点を考慮しつつ、本日のまとめサンプルコードを下記に示します。
@Test void まとめ() { //現時点でのデータは150件 assert Peer.query(COUNT_QUERY, db).size() == 150 def newTx = DatomicUtil.from("${DIR}/seattle-data1.edn")[0] as List Map txResultMap = connection.transact(newTx).get() //@Beforeで作成されているDatabaseのインスタンスは@Before時点でのDatabase assert Peer.query(COUNT_QUERY, db).size() != 258 //最新のDatabaseを取得したい場合はconnection.db()を用いる assert Peer.query(COUNT_QUERY, connection.db()).size() == 258 def sinceDb = connection.db().since(dataTxDate) assert Peer.query(COUNT_QUERY, sinceDb).size() == 108 }
Database
のインスタンスdb
はseattle-data0.edn
のトランザクション発生後のデータベースのインスタンスです。
したがって、上記コードの「げ時点でのデータは150件」はデータ登録後の状態のレコード登録件数を取得します。
次に新たなトランザクションを発生させてから、データ件数を取得するクエリーをdb
に対して発行していますが、これは追加された分のデータはトラッキング外ですし、db
取得時のデータ件数は150件のままですので、追加されたデータ件数は含まれていません。したがって、258件のデータを持っていません。
最新のDatabase
インスタンスを取得したい場合はConnection#db()
で最新のDatabase
インスタンスを取得して、データ件数を取得します。そうすれば追加された分のデータも返さて258件になります。
以上
次回はデータ操作についてやっていきたいと思います。