mike-neckのブログ

JavaかJavaFXかJavaEE(なんかJava8が多め)

Reactor の Flux を flatten する3つのパターン

ここ最近 Reactor をいじって一定間隔の無限ストリームを生成するなどしていたこともあって、まとめることにした。タイトルに数字が入っていると読まれやすいっぽいので、タイトルには「3つのパターン」と書いたが、実際に flatMapflatMapSequentialconcatMap の3つなので、あながち間違っているわけではない。

f:id:mike_neck:20180221005726p:plain


Flux 導入

Flux は 0 以上のデータを持つデータストリーム。例えば 0.25 秒おきに 0 から 1 ずつ増えるデータを記述すると次のようになる。

@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE')

import reactor.core.publisher.*

def flux = Flux.interval(Duration.ofMillis(250)).take(4)

では、この Flux<Long> の数値を文字列として表示したいので、 String になるように map する。

def flux = Flux.interval(Duration.ofMillis(250)).take(4)

flux.map { i -> "test $i" }

次に、時刻を表示したい。そこで、この String と現在の時刻を文字列に埋め込んで表示する。

def flux = Flux.interval(Duration.ofMillis(250)).take(4)

flux.map { i -> "test $i" }
    .map { s -> "[${LocalDate.now()}] $s" }
    .subscribe { logger.info(it) }

Flux の中の処理はメインスレッドで subscribe を呼び出してから開始されるので、メインスレッドよりも遅れて実行される。そのため先に終わったメインスレッドが Flux の中の処理を待ち合わせるために、 CountDownLatch などの同期化するための機構を埋め込まないとならない。 CountDownLatchFlux の処理が終わったあとに countDown されるように onTerminate に処理を加えるなどすると、次のようなプログラムになる。

@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE')
@Grab('org.slf4j:slf4j-simple:1.7.25')

import java.util.concurrent.*
import java.time.*
import java.time.format.*
import reactor.core.publisher.*
import reactor.core.scheduler.*
import org.slf4j.*

def logger = LoggerFactory.getLogger('groovy')

Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4)

def latch = new CountDownLatch(1)

def disp = flux
    .map { i -> "test $i" }
    .map { s -> "[${LocalTime.now()}] $s" }
    .doOnTerminate { latch.countDown() }
    .subscribe { logger.info(it) }

latch.await()
disp.dispose()

これを実行すると、次のように表示される。

[parallel-6] INFO groovy - [02:00:41.573] test 0
[parallel-6] INFO groovy - [02:00:41.822] test 1
[parallel-6] INFO groovy - [02:00:42.075] test 2
[parallel-6] INFO groovy - [02:00:42.322] test 3

これでまずは Flux の導入ができた。


flatten する

以下では上で作った Flux を起点に一定間隔でデータを投げる Flux を作っていく。各 Flux の起点から 400 ms 感覚でエミットされるデータを3つずつ取るものとする。次の表はデータがエミットされる時刻の一覧(単位は ms)

1st 2nd-0 2nd-1 2nd2
0 0 400 800
1 250 650 1050
2 500 900 1300
3 750 1150 1550

次のようなクラスを導入して、どのデータがいつエミットされたのかを記録する

class Sample {
  final long first
  final long second
  final LocalTime created
  Sample(first, second) {
    this.first = first
    this.second = second
    this.created = LocalTime.now()
  }
  @Override String toString() { "($created) $first - $second" }
}

また、次の関数によって long から Sample を作る

def longToSample = {long fst ->
    Flux.interval(Duration.ofMillis(400))
        .take(3)
        .map { long snd -> new Sample(fst, snd) }
}

発生した時刻をキーにマージする flatMap

flatMap は発生した時刻をキーにしてマージしていく。したがって、想定されるデータ列は先程の表から、 0-0(0 ms) -> 1-0(250 ms) -> 0-1(400 ms) -> 2-0(500 ms) -> 1-1(650 ms) -> 3-0(750 ms) -> 0-2(800 ms)... となることが予想される。

Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4)

def disp = flux
    .flatMap(longToSample)
    .map { s -> "[${LocalTime.now()}] $s" }
    .doOnTerminate { latch.countDown() }
    .subscribe { logger.info(it) }

latch.await()
disp.dispose()

この場合、出力は次のようになる

[parallel-5] INFO groovy - [10:32:07.413] (10:32:07.413) 0 - 0
[parallel-6] INFO groovy - [10:32:07.647] (10:32:07.647) 1 - 0
[parallel-5] INFO groovy - [10:32:07.798] (10:32:07.798) 0 - 1
[parallel-7] INFO groovy - [10:32:07.900] (10:32:07.900) 2 - 0
[parallel-6] INFO groovy - [10:32:08.047] (10:32:08.046) 1 - 1
[parallel-8] INFO groovy - [10:32:08.151] (10:32:08.151) 3 - 0
[parallel-5] INFO groovy - [10:32:08.199] (10:32:08.199) 0 - 2
[parallel-7] INFO groovy - [10:32:08.297] (10:32:08.297) 2 - 1
[parallel-6] INFO groovy - [10:32:08.446] (10:32:08.446) 1 - 2
[parallel-8] INFO groovy - [10:32:08.551] (10:32:08.551) 3 - 1
[parallel-7] INFO groovy - [10:32:08.696] (10:32:08.696) 2 - 2
[parallel-8] INFO groovy - [10:32:08.948] (10:32:08.948) 3 - 2

複数の系列で並行になんらかの処理を行いたい場合に用いるとよさそうである


元の順序をキープして新たな間隔でエミットする concatMap

concatMap でまとめられた Flux は元の順序(1st)を保持しつつ、新しく指定された間隔でエミットされる。想定される順番は 0-0 -> 0-1 -> 0-2 -> 1-0 -> 1-1 -> ... となり、それらの間隔が 400 ms となる(最初の 250 ms は無視される)。

Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4)

def disp = flux
    .concatMap(longToSample)
    .map { s -> "[${LocalTime.now()}] $s" }
    .doOnTerminate { latch.countDown() }
    .subscribe { logger.info(it) }

latch.await()
disp.dispose()

この場合の出力は次のようになる

[parallel-3] INFO groovy - [10:36:56.553] (10:36:56.553) 0 - 0
[parallel-3] INFO groovy - [10:36:56.939] (10:36:56.939) 0 - 1
[parallel-3] INFO groovy - [10:36:57.342] (10:36:57.342) 0 - 2
[parallel-4] INFO groovy - [10:36:57.743] (10:36:57.743) 1 - 0
[parallel-4] INFO groovy - [10:36:58.145] (10:36:58.145) 1 - 1
[parallel-4] INFO groovy - [10:36:58.543] (10:36:58.543) 1 - 2
[parallel-5] INFO groovy - [10:36:58.948] (10:36:58.948) 2 - 0
[parallel-5] INFO groovy - [10:36:59.346] (10:36:59.346) 2 - 1
[parallel-5] INFO groovy - [10:36:59.748] (10:36:59.748) 2 - 2
[parallel-6] INFO groovy - [10:37:00.153] (10:37:00.153) 3 - 0
[parallel-6] INFO groovy - [10:37:00.552] (10:37:00.552) 3 - 1
[parallel-6] INFO groovy - [10:37:00.949] (10:37:00.949) 3 - 2

元の順序をキープ(concatMap)、各要素は指定時刻に生成(flatMap)される flatMapSequential

flatMapSequentialconcatMapflatMap を組み合わせたようなもので、 要素の順序は元の順序(1st -> 2nd の順序)をキープしつつ、データのエミットタイミングもキープされる動作をとる。なお元の順序を飛び越してしまったデータはバッファリングされる。

def disp = flux
    .flatMapSequential(longToSample)
    .map { s -> "[${LocalTime.now()}] $s" }
    .doOnTerminate { latch.countDown() }
    .subscribe { logger.info(it) }

latch.await()
disp.dispose()

出力は次のようになる

[parallel-8] INFO groovy - [12:06:06.070] (12:06:06.069) 0 - 0
[parallel-8] INFO groovy - [12:06:06.455] (12:06:06.455) 0 - 1
[parallel-8] INFO groovy - [12:06:06.855] (12:06:06.855) 0 - 2
[parallel-8] INFO groovy - [12:06:06.856] (12:06:06.304) 1 - 0
[parallel-8] INFO groovy - [12:06:06.856] (12:06:06.702) 1 - 1
[parallel-9] INFO groovy - [12:06:07.104] (12:06:07.104) 1 - 2
[parallel-9] INFO groovy - [12:06:07.105] (12:06:06.551) 2 - 0
[parallel-9] INFO groovy - [12:06:07.105] (12:06:06.953) 2 - 1
[parallel-10] INFO groovy - [12:06:07.350] (12:06:07.350) 2 - 2
[parallel-10] INFO groovy - [12:06:07.351] (12:06:06.805) 3 - 0
[parallel-10] INFO groovy - [12:06:07.351] (12:06:07.205) 3 - 1
[parallel-11] INFO groovy - [12:06:07.605] (12:06:07.605) 3 - 2

データ 1-0 は データ 0-1 よりも先にエミットされるが、バッファリングされて 0-2 の後に出てきているのがわかる


おわり

Swift で Foundation を import するとヘッダーまわりでエラーが発生する場合の対処

f:id:mike_neck:20180609045321p:plain

以前書いたエントリー

mike-neck.hatenadiary.com

の対処法が間違えていて、ビルドもなにもできなくなった上に、 REPL で import Foundation をやっても同じエラーが出てしまう割と深刻な状況の対処法。

(巷にある ~/Library/Developer/Xcode を消すやつが何の役にも立たなくて、絶望しかなかった)

もしかしたら、 swift を書けるエンジニアは知ってて当たり前の方法なのかもしれんけど…

参考にしたのはこちら。

stackoverflow.com

原因は brew で以前にインストールしたヘッダーファイルが何らかの原因によってインクルードされた上、brew で入れたアプリケーションからはリンクされていないような状態にあるときに、このエラーは発生する模様。

$ brew doctor
Please note that these warnings are just used to help the Homebrew maintainers
with debugging if you file an issue. If everything you use Homebrew for is
working fine: please don't worry or file an issue; just ignore this. Thanks!

Warning: Unbrewed header files were found in /usr/local/include.
If you didn't put them there on purpose they could cause problems when
building Homebrew formulae, and may need to be deleted.

Unexpected header files:
  /usr/local/include/6.1.0/backward/auto_ptr.h
  /usr/local/include/6.1.0/backward/backward_warning.h
 〜こんな感じで2400個くらいヘッダーファイルがリストされている〜
  /usr/local/include/zconf.h
  /usr/local/include/zlib.h

で、対処はこれらのヘッダーファイルを消す。

$ brew doctor 2>&1 | grep "\.h$" | xargs sudo rm

おわり

Spring WebFlux の Router Function のテスト

久々に Spring WebFlux を書いてたのだが、いまいちルーティングがうまくできず、 Routing Function に対してテストを書いてみることにした。

f:id:mike_neck:20180802004547p:plain

で、テストの書き方を調べてみたのだが、とりあえず何も見つからなかった(サーバーを起動するパターンは見つかったが、ルーターのテストごときでサーバーを起動したくない)ので、Reactor のテストの書き方をもとに強引に書いてみた。これが正しいのかどうかはわからない。


プロダクションコード

プロダクションコードはこんな感じ。

@Configuration
class Routing {

  private final MyHandler handler;

  Routing(final MyHandler handler) {
    this.handler = handler;
  }

  @Bean
  RouterFunction<ServerResponse> routerFunction() {
    return routes(GET("/foo"), handler::handle);
  }
}

テストコード

で、書いたテストコード。これはあくまでルーティングが意図したものになっているかを確認するテスト。というのも、ルーティングがうまくできていなかったからテストを書いたため。

@ExtendWith(SpringExtension.class)
@Import({Routing.class, MyHandler.class})
class RoutingTest {

  final Routing routing;

  @Autowired
  RoutingTest(final Routing routing) {
    this.routing = routing;
  }

  @Test
  void test() {
    final ServerRequest request =
      MockServerRequest.builder()
        .method(HttpMethod.GET)
        .uri(URI.create("/foo"))
        .build();

    final Mono<HandlerFunction<ServerResponse>>
      handler =
        routing
          .routerFunction()
          .route(request);

    StepVerifier.create(handler)
        .expectNextCount(1)
        .verifyComplete();
  }
}

なお、 URI については、かなりゆるい URI でもマッチする。具体的には http://bar-baz/foo でもマッチする。当然、 /foo/baz はマッチしない。ちなみに、マッチしない場合のテストの書き方はこんな感じ。

StepVerifier.create(handler)
    .verifyComplete();

余談、というか…

もともとは、パスに対して正規表現でマッチングする方法を調べてた。最初うまくできなかったが、次のようにやればよいらしい。

return route(GET("/{foo:[a-zA-Z0-9]+}"), handler::handle);