mike-neckのブログ

Java or Groovy or Swift or Golang

Spring WebFlux + r2dbc + Kotlin でデータベーストランザクション

f:id:mike_neck:20171113232305p:plain

前回 ReactiveCrudRepository を使ったアプリケーションの話をしたが、残念なことに今の所トランザクションを指定できないので、アプリケーションを終了したら、データはなくなるし、複数のテーブルへの操作をアトミックな操作として指定できないことをメモしておいた。

mike-neck.hatenadiary.com

では、トランザクションをどうおこなうかだが、 TransactionalDatabaseClient を使うようである。

Configuration

TransactionalDatabaseClient を作るには ConnectionFactoryインスタンスTransactionalDatabaseClient#create にわたすだけでよい。そうして作ったインスタンスビーン登録する

@Configuration
@EnableR2dbcRepositories
class DatabaseConfiguration : AbstractR2dbcConfiguration() {

  @Bean
  override fun connectionFactory(): ConnectionFactory =
      ConnectionFactoryOptions.builder()
          .option(ConnectionFactoryOptions.DRIVER, "postgresql")
          .option(ConnectionFactoryOptions.HOST, "localhost")
          .option(ConnectionFactoryOptions.PORT, 5432)
          .option(ConnectionFactoryOptions.USER, "postgres-user")
          .option(ConnectionFactoryOptions.PASSWORD, "postgres-pass")
          .option(ConnectionFactoryOptions.DATABASE, "postgres")
          .build()
          .let { ConnectionFactories.get(it) }

  @Bean
  fun transactionalDatabaseClient(connectionFactory: ConnectionFactory): TransactionalDatabaseClient =
      TransactionalDatabaseClient.create(connectionFactory)
}

作った TransactionalDatabaseClient@Repository 相当のクラスで使うのが良いと思われる。

@Repository
class UserTransactionalRepository(private val dbClient: TransactionalDatabaseClient) {

  fun saveUser(user: User, password: UserPassword): Mono<Unit> =
      dbClient.inTransaction { db ->
        db.execute()
            //language=sql
            .sql("insert into users(id, name, created) values($1, $2, $3)")
            .bind("$1", user.id)
            .bind("$1", user.name)
            .bind("$1", user.created)
            .fetch()
            .rowsUpdated()
            .thenReturn(Unit)
            .flatMap {
              db.execute()
              //language=sql
                  .sql("insert into user_password(id, password_hash, created) values($1, $2, $3, $4)")
                  .bind("$1", user.id)
                  .bind("$2", password.hash)
                  .bind("$3", user.created)
                  .fetch()
                  .rowsUpdated()
            } // flatMap
            .thenReturn(Unit)
      } // inTransaction
      .last()
}
  • TransactionalDatabaseClient#inTransaction にわたす関数の中で、データベースへの操作を行っていく
  • 複数のデータベースの操作は前の操作の結果に依存するので(外部参照などの都合による)、失敗した/成功したの状態を引き継ぐために、 Monothen などで引き継いで行く(db.execute() 〜 はあくまで操作の予約なので、実際に実行されるのは subscribe されたあとになる)

実行するクエリによっては、この saveUser 関数の中はいるに耐えないものになるので、 Repository の更に下のレイヤーを作る必要があるように思われる。


以下、ゴミ

なお、前回の続きのようなアプリケーションの続きではこのあたりを抽象的に扱って、次のようなコードになるようにしている。

// サービスクラスの一部
unitOfWorkFactory.unitOfWork(
        userTransactionalRepository.countUserByName(user.name).condition { it == 0L },
        userTransactionalRepository.create(user),
        tokenTransactionalRepository.create(token))
        .thenReturn(user to userToken)

// UserTransactionalRepository の一部
RepositoryAction.create { db ->
  db.insert()
      .into(UserDb::class.java)
      .using(UserDb.from(user))
      .fetch()
      .rowsUpdated()
}

サンプルコードをそのうちに github にあげるつもりだけど、この休みの後半は時間がほとんど取れなかった…