mike-neckのブログ

Java or Groovy or Swift or Golang

Reactor Test の StepVerifier の withVirtualTime の使い方

f:id:mike_neck:20180221005726p:plain

Spring WebFlux で project Reactor が使われていて、まだ手に馴染んでいないので、 チュートリアル をやっていたところ、 StepVerifier#withVirtualTime の使い方がわからず、検索しても要領を得ず、いろいろ試して使い方がわかったので、そのメモ

tech.io


前提知識

Mono<T>Flux<T> のテストには StepVerifier を用いてテストを行う

@Test
void sampleUsageOfStepVerifier() {
  StepVerifier.create(Flux.just("foo", "bar", "baz"))
      .expectNext("foo")
      .expectNext("bar")
      .expectNext("baz")
      .verifyComplete();
}

Flux<T> などの内部のスケジューラーを操作するために StepVerifier#withVirtualTime(Supplier<Publisher<T>>) というメソッドがある

これで、100時間かかるようなスケジューリングをしている Flux でも数秒でテストできる。


問題

生成まで 1 秒、イベントの間隔が 1 秒で 0 から 1 ずつ単調増加する 3600 個の順列 Flux<Long> がある。この Flux<Long> に対して、次のテストをおこなえるコードを記述すること

  • 3600 個の要素があること
  • 任意の3箇所の値が正しい値であるかを検証すること
  • 2 秒以内に終わること

テスト対象の Flux<Long>

テストのデータになる Flux<Long> は次のように生成する。

Supplier<Flux<Long>> flux() {
  final Duration oneSecond = Duration.ofSecond(1L);
  return () -> Flux.interval(oneSecond, oneSecond)
      .take(3_600L)
      .log();
}

最初に書いたダメなテスト

1秒を3598回待つというコードをどのように書くのかわからず、とりあえず書いたコードです。

@Test
void tooLongFlux() {
  StepVerifier.withVirtual(flux())
      .expectSubscription() // 購読開始
      .expectNoEvent(Duration.ofSeconds(1L)) // 発生まで1秒
      .expectNext(0L) // 最初の要素
      .thenAwait(Duration.ofSeconds(1L)) // 1秒待つ
      .expectNext(1L) // 2回目の要素
      .thenAwait() // ん?1秒 x 3598 回 をどうやって待つ?
      .expectNextCount(3597L)
      .expectNext(3599L) // 最後の要素
      .verifyComplete(); // 終わったことを確認
}

これを実行すると次のようにログが表示されたところで動かなくなります。

[ INFO] (pool-1-thread-1) onSubscribe(FluxTake.TakeSubscriber)
[ INFO] (pool-1-thread-1) request(unbounded)
[ INFO] (pool-1-thread-1) onNext(0)
[ INFO] (pool-1-thread-1) onNext(1)

次に書いたイケてないコード

とりあえず、強引に1秒 x 3598回待つようにコードを書きます。

@Test
void tooLongFlux() {
  final StepVerifier.Step<Long> verifier = StepVerifier.withVirtual(flux())
      .expectSubscription()
      .expectNoEvent(Duration.ofSeconds(1L));
  final StepVerifier.Step<Long> intermediate = Interval.fromTo(0, 3599)
      .injectInto(verifier,
          (step, index) -> step.expectNext((long) index)
              .thenAwait(Duration.ofSeconds(1L)));
  intermediate.verifyComplete();
}

これはすべての要素を検査するので、正しいコードと言えば正しいコードです。とはいえ、一部を省略していいという緩めのテストなので、若干過剰でもあります。

やっとたどり着いたコード

ここで先ほどのコードでは 3600 秒待てばよいところを 3601 秒待っていることに気づいたので、 thenAwait はトータルで 3600 秒進ませればあとは間隔が空いていない Flux<T> と同じテストと変わらないのではないかと予想しました。

@Test
void tooLongFlux() {
  StepVerifier.withVirtual(flux())
      .expectSubscription()
      .expectNoEvent(Duration.ofSeconds(1L))
      .expectNext(0L)
      .thenAwait(Duration.ofSeconds(1L))
      .expectNext(1L)
      .thenAwait(Duration.ofSeconds(3600L - 2L)) // 3600 - 2 要素分スケジューラーを進ませる
      .expectNextCount(3600L - 3) // 値をテストする3個を除いた個数を飛ばす
      .expectNext(3599L)
      .verifyComplete();
}

これを実行したところ、先ほどのテストと同様にパスできました。

thenAwait(Duration) はその都度待つ時間をあらわすでのはなく、一気に時間を飛ばすというメソッドでした。 thenAwait(Duration) の挙動がいまいち分かりづらかったので、このテストについて長時間ハマってしまいました。


まとめ

非常に長い Flux<T> をテストする場合は、 StepVerifier#withVirtualTime でスケジュールを調整できる StepVerifier.Step<T> を作った上で、 thenAwait(Duration) で進ませたい時間を進ませることによって、すべての要素のテストが実施できる。