Reactor の Flux を flatten する3つのパターン
ここ最近 Reactor をいじって一定間隔の無限ストリームを生成するなどしていたこともあって、まとめることにした。タイトルに数字が入っていると読まれやすいっぽいので、タイトルには「3つのパターン」と書いたが、実際に flatMap 、 flatMapSequential 、 concatMap の3つなので、あながち間違っているわけではない。

Flux 導入
Flux は 0 以上のデータを持つデータストリーム。例えば 0.25 秒おきに 0 から 1 ずつ増えるデータを記述すると次のようになる。
@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE') import reactor.core.publisher.* def flux = Flux.interval(Duration.ofMillis(250)).take(4)
では、この Flux<Long> の数値を文字列として表示したいので、 String になるように map する。
def flux = Flux.interval(Duration.ofMillis(250)).take(4) flux.map { i -> "test $i" }
次に、時刻を表示したい。そこで、この String と現在の時刻を文字列に埋め込んで表示する。
def flux = Flux.interval(Duration.ofMillis(250)).take(4) flux.map { i -> "test $i" } .map { s -> "[${LocalDate.now()}] $s" } .subscribe { logger.info(it) }
Flux の中の処理はメインスレッドで subscribe を呼び出してから開始されるので、メインスレッドよりも遅れて実行される。そのため先に終わったメインスレッドが Flux の中の処理を待ち合わせるために、 CountDownLatch などの同期化するための機構を埋め込まないとならない。 CountDownLatch を Flux の処理が終わったあとに countDown されるように onTerminate に処理を加えるなどすると、次のようなプログラムになる。
@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE') @Grab('org.slf4j:slf4j-simple:1.7.25') import java.util.concurrent.* import java.time.* import java.time.format.* import reactor.core.publisher.* import reactor.core.scheduler.* import org.slf4j.* def logger = LoggerFactory.getLogger('groovy') Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def latch = new CountDownLatch(1) def disp = flux .map { i -> "test $i" } .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
これを実行すると、次のように表示される。
[parallel-6] INFO groovy - [02:00:41.573] test 0 [parallel-6] INFO groovy - [02:00:41.822] test 1 [parallel-6] INFO groovy - [02:00:42.075] test 2 [parallel-6] INFO groovy - [02:00:42.322] test 3
これでまずは Flux の導入ができた。
flatten する
以下では上で作った Flux を起点に一定間隔でデータを投げる Flux を作っていく。各 Flux の起点から 400 ms 感覚でエミットされるデータを3つずつ取るものとする。次の表はデータがエミットされる時刻の一覧(単位は ms)
| 1st | 2nd-0 | 2nd-1 | 2nd2 |
|---|---|---|---|
| 0 | 0 |
400 |
800 |
| 1 | 250 |
650 |
1050 |
| 2 | 500 |
900 |
1300 |
| 3 | 750 |
1150 |
1550 |
次のようなクラスを導入して、どのデータがいつエミットされたのかを記録する
class Sample { final long first final long second final LocalTime created Sample(first, second) { this.first = first this.second = second this.created = LocalTime.now() } @Override String toString() { "($created) $first - $second" } }
また、次の関数によって long から Sample を作る
def longToSample = {long fst -> Flux.interval(Duration.ofMillis(400)) .take(3) .map { long snd -> new Sample(fst, snd) } }
発生した時刻をキーにマージする flatMap
flatMap は発生した時刻をキーにしてマージしていく。したがって、想定されるデータ列は先程の表から、 0-0(0 ms) -> 1-0(250 ms) -> 0-1(400 ms) -> 2-0(500 ms) -> 1-1(650 ms) -> 3-0(750 ms) -> 0-2(800 ms)... となることが予想される。
Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def disp = flux .flatMap(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
この場合、出力は次のようになる
[parallel-5] INFO groovy - [10:32:07.413] (10:32:07.413) 0 - 0 [parallel-6] INFO groovy - [10:32:07.647] (10:32:07.647) 1 - 0 [parallel-5] INFO groovy - [10:32:07.798] (10:32:07.798) 0 - 1 [parallel-7] INFO groovy - [10:32:07.900] (10:32:07.900) 2 - 0 [parallel-6] INFO groovy - [10:32:08.047] (10:32:08.046) 1 - 1 [parallel-8] INFO groovy - [10:32:08.151] (10:32:08.151) 3 - 0 [parallel-5] INFO groovy - [10:32:08.199] (10:32:08.199) 0 - 2 [parallel-7] INFO groovy - [10:32:08.297] (10:32:08.297) 2 - 1 [parallel-6] INFO groovy - [10:32:08.446] (10:32:08.446) 1 - 2 [parallel-8] INFO groovy - [10:32:08.551] (10:32:08.551) 3 - 1 [parallel-7] INFO groovy - [10:32:08.696] (10:32:08.696) 2 - 2 [parallel-8] INFO groovy - [10:32:08.948] (10:32:08.948) 3 - 2
複数の系列で並行になんらかの処理を行いたい場合に用いるとよさそうである
元の順序をキープして新たな間隔でエミットする concatMap
concatMap でまとめられた Flux は元の順序(1st)を保持しつつ、新しく指定された間隔でエミットされる。想定される順番は 0-0 -> 0-1 -> 0-2 -> 1-0 -> 1-1 -> ... となり、それらの間隔が 400 ms となる(最初の 250 ms は無視される)。
Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def disp = flux .concatMap(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
この場合の出力は次のようになる
[parallel-3] INFO groovy - [10:36:56.553] (10:36:56.553) 0 - 0 [parallel-3] INFO groovy - [10:36:56.939] (10:36:56.939) 0 - 1 [parallel-3] INFO groovy - [10:36:57.342] (10:36:57.342) 0 - 2 [parallel-4] INFO groovy - [10:36:57.743] (10:36:57.743) 1 - 0 [parallel-4] INFO groovy - [10:36:58.145] (10:36:58.145) 1 - 1 [parallel-4] INFO groovy - [10:36:58.543] (10:36:58.543) 1 - 2 [parallel-5] INFO groovy - [10:36:58.948] (10:36:58.948) 2 - 0 [parallel-5] INFO groovy - [10:36:59.346] (10:36:59.346) 2 - 1 [parallel-5] INFO groovy - [10:36:59.748] (10:36:59.748) 2 - 2 [parallel-6] INFO groovy - [10:37:00.153] (10:37:00.153) 3 - 0 [parallel-6] INFO groovy - [10:37:00.552] (10:37:00.552) 3 - 1 [parallel-6] INFO groovy - [10:37:00.949] (10:37:00.949) 3 - 2
元の順序をキープ(concatMap)、各要素は指定時刻に生成(flatMap)される flatMapSequential
flatMapSequential は concatMap と flatMap を組み合わせたようなもので、 要素の順序は元の順序(1st -> 2nd の順序)をキープしつつ、データのエミットタイミングもキープされる動作をとる。なお元の順序を飛び越してしまったデータはバッファリングされる。
def disp = flux .flatMapSequential(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
出力は次のようになる
[parallel-8] INFO groovy - [12:06:06.070] (12:06:06.069) 0 - 0 [parallel-8] INFO groovy - [12:06:06.455] (12:06:06.455) 0 - 1 [parallel-8] INFO groovy - [12:06:06.855] (12:06:06.855) 0 - 2 [parallel-8] INFO groovy - [12:06:06.856] (12:06:06.304) 1 - 0 [parallel-8] INFO groovy - [12:06:06.856] (12:06:06.702) 1 - 1 [parallel-9] INFO groovy - [12:06:07.104] (12:06:07.104) 1 - 2 [parallel-9] INFO groovy - [12:06:07.105] (12:06:06.551) 2 - 0 [parallel-9] INFO groovy - [12:06:07.105] (12:06:06.953) 2 - 1 [parallel-10] INFO groovy - [12:06:07.350] (12:06:07.350) 2 - 2 [parallel-10] INFO groovy - [12:06:07.351] (12:06:06.805) 3 - 0 [parallel-10] INFO groovy - [12:06:07.351] (12:06:07.205) 3 - 1 [parallel-11] INFO groovy - [12:06:07.605] (12:06:07.605) 3 - 2
データ 1-0 は データ 0-1 よりも先にエミットされるが、バッファリングされて 0-2 の後に出てきているのがわかる
おわり