mike-neckのブログ

Java or Groovy or Swift or Golang

Reactor-Netty の http クライアントを Kotlin Coroutine とともにいじってみる

Reactor-Netty と Kotlin coroutine を同時にいじっているブログ記事を見かけなかったので、勉強がてらブログを書いてみることにしました。

f:id:mike_neck:20171113232305p:plain

gradle

まずは、 build.gradle.kts はこんな感じ

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.jetbrains.kotlin.jvm").version("1.3.20")
}

repositories {
    jcenter()
    mavenCentral()
}

dependencies {
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.1.1")

    implementation("io.projectreactor:reactor-core:3.2.6.RELEASE")
    implementation("io.projectreactor.netty:reactor-netty:0.8.5.RELEASE")

    testImplementation("org.junit.jupiter:junit-jupiter:5.4.0")
    testImplementation("org.jetbrains.kotlin:kotlin-test-junit5:1.3.21")
    testImplementation("io.projectreactor:reactor-test:3.2.6.RELEASE")
}

tasks.withType<Test> {
    useJUnitPlatform()
}

tasks.withType<KotlinCompile> {
    kotlinOptions.jvmTarget = "1.8"
}

get リクエス

実質のハローワールドです。なお、リクエスト先は https://httpbin.org を利用しました。

class Http {

  val httpClient = HttpClient.create()

  @Test fun get() {
    val path = GlobalScope.mono { // (1)
      "/get?query=" + URLEncoder.encode("テスト", StandardCharsets.UTF_8)
    }
    val body = httpClient.baseUrl("https://httpbin.org")
        .get()  // (2)
        .uri(path) // (3)
        .responseContent() // (4)
        .aggregate() // (5)
        .asString(StandardCharsets.UTF_8) // (6)

    runBlocking { // (7)
      val res = body.awaitSingle() // (8)
      println(res)
    }
  }
}

実行結果

{
  "args": {
    "query": "\u30c6\u30b9\u30c8"
  }, 
  "headers": {
    "Accept": "*/*", 
    "Host": "httpbin.org", 
    "User-Agent": "ReactorNetty/0.8.5.RELEASE"
  }, 
  "url": "https://httpbin.org/get?query=\u30c6\u30b9\u30c8"
}
  1. Global.mono ブロックの中では、 Mono を透過的に扱えます。最後の式の戻り値が Mono に包まれて返ってきます(Kotlin coroutine)
  2. GET メソッドでリクエストを投げる場合に get メソッドを呼び出します
  3. uri メソッドでパスを指定します。 String または Mono<String> を受け付けます
  4. レスポンスのハンドリングを行いますが、 responseContent ではヘッダーやステータスを無視して、ボディだけを ByteBufFlux で取れるようにします。なお戻り値の型は ByteBufFlux という Flux<ByteBuf> のサブクラスです
  5. aggregate で複数の ByteBuf をまとめて一つの ByteBuf にまとめます。内部的には CompositeByteBuf を使っています
  6. asStringByteBuf を文字列として扱います。この結果の戻り値は Mono<String> です
  7. runBlocking ブロックは coroutine と同じように透過的に Mono を扱いますが、現在のスレッドをブロックします
  8. Mono<String> の結果が返ってくるまで、処理を中断します(ここは runBlocking ブロックなので現在のスレッドもブロックしますが、 GlobalScope.mono ブロック内などの場合は、処理を一旦止めて他の処理をおこない、結果が返ってきたら処理を再開します)

というわけで、Reactor-Netty と Kotlin coroutine を同時に完全にマスターしました。

Kotlin coroutine の観点で言えば、 map/flatMap などが若干うるさくなくなったので、きれいに書けるといえばきれいに書けると言えそうですが、まあ、気持ち悪い感じではあります。

val fooMono: Mono<Foo> = operationFoo()
val bazMono = fooMono.flatMap { foo -> operationBar(foo).flatMap { bar -> operationBaz(foo, bar) } }

val bazMono = GlobalScope.mono {
  val foo = operationFoo().awaitSingle()
  val bar = operationBar(foo).awaitSingle()
  operationBaz(foo, bar).awaitSingle()
}

Http Client の観点で言えば、フルーエントでモダンなAPIという感じです


post リクエス

続いて post 投げてみます。

@Test fun post() {
  val objectMapper = ObjectMapper()
  val jsonMono = GlobalScope.mono { // (1)
    val example = Example(name = "test", id = 200)
    objectMapper.writeValueAsString(example)
  }
  val response =
      httpClient.baseUrl("https://httpbin.org")
          .headers { // (2)
            it["content-type"] = "application/json"
            it["X-USER-APP"] = "reactor-netty"
            it["user-agent"] = "reactor-netty-http-client"
          }
          .post()
          .uri("/post")
          .send(jsonMono.map { Unpooled.buffer().writeString(it) }) // (3)
          .responseSingle { response, body -> body.asString(StandardCharsets.UTF_8) // (4)
              .map { string -> response.status() to string } }
          .toMono()

  runBlocking {
    val res = response.awaitSingle()
    println(res.first)
    println(res.second)
  }
}

実行結果

200 OK
{
  "args": {}, 
  "data": "{\"name\":\"test\",\"id\":200}", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "*/*", 
    "Content-Length": "32", 
    "Content-Type": "application/json", 
    "Host": "httpbin.org", 
    "User-Agent": "reactor-netty-http-client", 
    "X-User-App": "reactor-netty"
  }, 
  "json": {
    "id": 200, 
    "name": "test"
  }, 
  "url": "https://httpbin.org/post"
}
  1. GlobalScope#mono ブロックにてオブジェクトを jsonエンコードします(結果は Mono<String>)
  2. ヘッダーを指定します。ヘッダーの値は (Map<String,Object>) -> Unit にて指定します(正確には (HttpHeaders) -> Unit)
  3. send にて body を指定します。 渡せる型は Publisher<out ByteBuf>/(HttpClientRequest,NettyOutBound) -> Publisher<Void> などです
  4. responseSingle メソッドは ByteBufFlux をすべて集約した ByteBufMonoHttpResponse を引数にとって何かしらのオブジェクトを返す関数でレスポンスを処理します

結論

  • reactor-netty の HTTP クライアント
    • わりと使いやすかった
  • coroutine
    • 完全にマスターした