リアクティブな感じで RDB に接続できるやつ。内容的にはバファさんのブログの下位互換未満。
準備1
使ったデータベースは postgres で、docker で用意した。とりあえず、こんな感じのテーブルを作っておく。
create table users( id bigint not null primary key , name varchar(31) not null unique , created timestamp not null );
ついでにデータを入れておく
insert into users (id, name, created) values (1, 'test-user', '2019-01-01 10:00:00'), (2, 'test-admin', '2019-01-15 10:00:00'), (3, 'test-owner', '2019-02-01 10:00:00'), (4, 'test-guest', '2019-02-15 10:00:00');
準備2
gradle は次のような感じ(Kotlin)
import java.net.URI plugins { id("org.jetbrains.kotlin.jvm").version("1.3.20") } repositories { jcenter() mavenCentral() maven { url = URI.create("https://repo.spring.io/libs-milestone/") } } dependencies { implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.1.1") implementation("io.projectreactor:reactor-core:3.2.6.RELEASE") implementation(group = "io.r2dbc", name = "r2dbc-postgresql", version = "1.0.0.M7") testImplementation("org.junit.jupiter:junit-jupiter:5.4.0") testImplementation("org.jetbrains.kotlin:kotlin-test-junit5:1.3.21") testImplementation("io.projectreactor:reactor-test:3.2.6.RELEASE") }
ちょっと動かす
テストデータは入っているので、クエリーを投げるところです
@Test fun noKoroutine() { val options = ConnectionFactoryOptions.builder() .option(ConnectionFactoryOptions.DRIVER, "postgresql") .option(ConnectionFactoryOptions.HOST, "localhost") .option(ConnectionFactoryOptions.PORT, 5432) .option(ConnectionFactoryOptions.USER, "postgres-user") .option(ConnectionFactoryOptions.PASSWORD, "postgres-pass") .option(ConnectionFactoryOptions.DATABASE, "postgres") .build() val connectionFactory: ConnectionFactory = ConnectionFactories.get(options) val conn = Mono.from(connectionFactory.create()) val users = conn.flatMapMany { connection -> connection.createStatement( //language=SQL "SELECT u.id, u.name, u.created FROM users AS u WHERE u.name = $1") .bind("$1", "test-user") .execute() } .flatMap { result -> result.map { row, meta -> val name = row.get("name", String::class.javaObjectType) val id = row.get("id", Long::class.javaObjectType) val created = row.get("created", Instant::class.java) return@map if (created != null && name != null && id != null) User(id, name, created) else null } } .filter { it != null } .cast(User::class.java) runBlocking { users.buffer().consumeEach { println(it) } } }
実行結果
番外編
Kotlin-Coroutine を使うといい感じのコードになるのかと思ったけど、
// GlobalScope.flux 内 val result = connection.createStatement( //language=SQL "SELECT u.id, u.name, u.created FROM users AS u WHERE u.name = $1") .bind("$1", "test-user") .execute() .awaitSingle() result.map { row, _ -> val name = row.get("name", String::class.javaObjectType) val id = row.get("id", Long::class.javaObjectType) val created = row.get("created", Instant::class.java) return@map if (created != null && name != null && id != null) User(id, name, created) else null }.consumeEach { if (it != null) channel.offer(it) }
AbstractByteBuff#getCharaSequence
のあたりで、 ByteBuff
の readableBytes
が 0
になってしまう現象に遭遇したので、諦めました
Kotlin Coroutine はもう少し業務寄りの部分で多様なデータが踊っているような箇所で使って、こういう複雑にデータをさわらない箇所ではあまり効果がなさそうに思いました(要検討)。