ここ最近 Reactor をいじって一定間隔の無限ストリームを生成するなどしていたこともあって、まとめることにした。タイトルに数字が入っていると読まれやすいっぽいので、タイトルには「3つのパターン」と書いたが、実際に flatMap
、 flatMapSequential
、 concatMap
の3つなので、あながち間違っているわけではない。
Flux
導入
Flux
は 0 以上のデータを持つデータストリーム。例えば 0.25
秒おきに 0
から 1
ずつ増えるデータを記述すると次のようになる。
@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE') import reactor.core.publisher.* def flux = Flux.interval(Duration.ofMillis(250)).take(4)
では、この Flux<Long>
の数値を文字列として表示したいので、 String
になるように map
する。
def flux = Flux.interval(Duration.ofMillis(250)).take(4) flux.map { i -> "test $i" }
次に、時刻を表示したい。そこで、この String
と現在の時刻を文字列に埋め込んで表示する。
def flux = Flux.interval(Duration.ofMillis(250)).take(4) flux.map { i -> "test $i" } .map { s -> "[${LocalDate.now()}] $s" } .subscribe { logger.info(it) }
Flux
の中の処理はメインスレッドで subscribe
を呼び出してから開始されるので、メインスレッドよりも遅れて実行される。そのため先に終わったメインスレッドが Flux
の中の処理を待ち合わせるために、 CountDownLatch
などの同期化するための機構を埋め込まないとならない。 CountDownLatch
を Flux
の処理が終わったあとに countDown
されるように onTerminate
に処理を加えるなどすると、次のようなプログラムになる。
@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE') @Grab('org.slf4j:slf4j-simple:1.7.25') import java.util.concurrent.* import java.time.* import java.time.format.* import reactor.core.publisher.* import reactor.core.scheduler.* import org.slf4j.* def logger = LoggerFactory.getLogger('groovy') Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def latch = new CountDownLatch(1) def disp = flux .map { i -> "test $i" } .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
これを実行すると、次のように表示される。
[parallel-6] INFO groovy - [02:00:41.573] test 0 [parallel-6] INFO groovy - [02:00:41.822] test 1 [parallel-6] INFO groovy - [02:00:42.075] test 2 [parallel-6] INFO groovy - [02:00:42.322] test 3
これでまずは Flux
の導入ができた。
flatten する
以下では上で作った Flux
を起点に一定間隔でデータを投げる Flux
を作っていく。各 Flux
の起点から 400 ms
感覚でエミットされるデータを3つずつ取るものとする。次の表はデータがエミットされる時刻の一覧(単位は ms
)
1st | 2nd-0 | 2nd-1 | 2nd2 |
---|---|---|---|
0 | 0 |
400 |
800 |
1 | 250 |
650 |
1050 |
2 | 500 |
900 |
1300 |
3 | 750 |
1150 |
1550 |
次のようなクラスを導入して、どのデータがいつエミットされたのかを記録する
class Sample { final long first final long second final LocalTime created Sample(first, second) { this.first = first this.second = second this.created = LocalTime.now() } @Override String toString() { "($created) $first - $second" } }
また、次の関数によって long
から Sample
を作る
def longToSample = {long fst -> Flux.interval(Duration.ofMillis(400)) .take(3) .map { long snd -> new Sample(fst, snd) } }
発生した時刻をキーにマージする flatMap
flatMap
は発生した時刻をキーにしてマージしていく。したがって、想定されるデータ列は先程の表から、 0-0(0 ms) -> 1-0(250 ms) -> 0-1(400 ms) -> 2-0(500 ms) -> 1-1(650 ms) -> 3-0(750 ms) -> 0-2(800 ms)...
となることが予想される。
Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def disp = flux .flatMap(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
この場合、出力は次のようになる
[parallel-5] INFO groovy - [10:32:07.413] (10:32:07.413) 0 - 0 [parallel-6] INFO groovy - [10:32:07.647] (10:32:07.647) 1 - 0 [parallel-5] INFO groovy - [10:32:07.798] (10:32:07.798) 0 - 1 [parallel-7] INFO groovy - [10:32:07.900] (10:32:07.900) 2 - 0 [parallel-6] INFO groovy - [10:32:08.047] (10:32:08.046) 1 - 1 [parallel-8] INFO groovy - [10:32:08.151] (10:32:08.151) 3 - 0 [parallel-5] INFO groovy - [10:32:08.199] (10:32:08.199) 0 - 2 [parallel-7] INFO groovy - [10:32:08.297] (10:32:08.297) 2 - 1 [parallel-6] INFO groovy - [10:32:08.446] (10:32:08.446) 1 - 2 [parallel-8] INFO groovy - [10:32:08.551] (10:32:08.551) 3 - 1 [parallel-7] INFO groovy - [10:32:08.696] (10:32:08.696) 2 - 2 [parallel-8] INFO groovy - [10:32:08.948] (10:32:08.948) 3 - 2
複数の系列で並行になんらかの処理を行いたい場合に用いるとよさそうである
元の順序をキープして新たな間隔でエミットする concatMap
concatMap
でまとめられた Flux
は元の順序(1st
)を保持しつつ、新しく指定された間隔でエミットされる。想定される順番は 0-0 -> 0-1 -> 0-2 -> 1-0 -> 1-1 -> ...
となり、それらの間隔が 400 ms
となる(最初の 250 ms
は無視される)。
Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def disp = flux .concatMap(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
この場合の出力は次のようになる
[parallel-3] INFO groovy - [10:36:56.553] (10:36:56.553) 0 - 0 [parallel-3] INFO groovy - [10:36:56.939] (10:36:56.939) 0 - 1 [parallel-3] INFO groovy - [10:36:57.342] (10:36:57.342) 0 - 2 [parallel-4] INFO groovy - [10:36:57.743] (10:36:57.743) 1 - 0 [parallel-4] INFO groovy - [10:36:58.145] (10:36:58.145) 1 - 1 [parallel-4] INFO groovy - [10:36:58.543] (10:36:58.543) 1 - 2 [parallel-5] INFO groovy - [10:36:58.948] (10:36:58.948) 2 - 0 [parallel-5] INFO groovy - [10:36:59.346] (10:36:59.346) 2 - 1 [parallel-5] INFO groovy - [10:36:59.748] (10:36:59.748) 2 - 2 [parallel-6] INFO groovy - [10:37:00.153] (10:37:00.153) 3 - 0 [parallel-6] INFO groovy - [10:37:00.552] (10:37:00.552) 3 - 1 [parallel-6] INFO groovy - [10:37:00.949] (10:37:00.949) 3 - 2
元の順序をキープ(concatMap
)、各要素は指定時刻に生成(flatMap
)される flatMapSequential
flatMapSequential
は concatMap
と flatMap
を組み合わせたようなもので、 要素の順序は元の順序(1st
-> 2nd
の順序)をキープしつつ、データのエミットタイミングもキープされる動作をとる。なお元の順序を飛び越してしまったデータはバッファリングされる。
def disp = flux .flatMapSequential(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
出力は次のようになる
[parallel-8] INFO groovy - [12:06:06.070] (12:06:06.069) 0 - 0 [parallel-8] INFO groovy - [12:06:06.455] (12:06:06.455) 0 - 1 [parallel-8] INFO groovy - [12:06:06.855] (12:06:06.855) 0 - 2 [parallel-8] INFO groovy - [12:06:06.856] (12:06:06.304) 1 - 0 [parallel-8] INFO groovy - [12:06:06.856] (12:06:06.702) 1 - 1 [parallel-9] INFO groovy - [12:06:07.104] (12:06:07.104) 1 - 2 [parallel-9] INFO groovy - [12:06:07.105] (12:06:06.551) 2 - 0 [parallel-9] INFO groovy - [12:06:07.105] (12:06:06.953) 2 - 1 [parallel-10] INFO groovy - [12:06:07.350] (12:06:07.350) 2 - 2 [parallel-10] INFO groovy - [12:06:07.351] (12:06:06.805) 3 - 0 [parallel-10] INFO groovy - [12:06:07.351] (12:06:07.205) 3 - 1 [parallel-11] INFO groovy - [12:06:07.605] (12:06:07.605) 3 - 2
データ 1-0
は データ 0-1
よりも先にエミットされるが、バッファリングされて 0-2
の後に出てきているのがわかる
おわり