mike-neckのブログ

Java or Groovy or Swift or Golang

Reactor の Flux を flatten する3つのパターン

ここ最近 Reactor をいじって一定間隔の無限ストリームを生成するなどしていたこともあって、まとめることにした。タイトルに数字が入っていると読まれやすいっぽいので、タイトルには「3つのパターン」と書いたが、実際に flatMapflatMapSequentialconcatMap の3つなので、あながち間違っているわけではない。

f:id:mike_neck:20180221005726p:plain


Flux 導入

Flux は 0 以上のデータを持つデータストリーム。例えば 0.25 秒おきに 0 から 1 ずつ増えるデータを記述すると次のようになる。

@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE')

import reactor.core.publisher.*

def flux = Flux.interval(Duration.ofMillis(250)).take(4)

では、この Flux<Long> の数値を文字列として表示したいので、 String になるように map する。

def flux = Flux.interval(Duration.ofMillis(250)).take(4)

flux.map { i -> "test $i" }

次に、時刻を表示したい。そこで、この String と現在の時刻を文字列に埋め込んで表示する。

def flux = Flux.interval(Duration.ofMillis(250)).take(4)

flux.map { i -> "test $i" }
    .map { s -> "[${LocalDate.now()}] $s" }
    .subscribe { logger.info(it) }

Flux の中の処理はメインスレッドで subscribe を呼び出してから開始されるので、メインスレッドよりも遅れて実行される。そのため先に終わったメインスレッドが Flux の中の処理を待ち合わせるために、 CountDownLatch などの同期化するための機構を埋め込まないとならない。 CountDownLatchFlux の処理が終わったあとに countDown されるように onTerminate に処理を加えるなどすると、次のようなプログラムになる。

@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE')
@Grab('org.slf4j:slf4j-simple:1.7.25')

import java.util.concurrent.*
import java.time.*
import java.time.format.*
import reactor.core.publisher.*
import reactor.core.scheduler.*
import org.slf4j.*

def logger = LoggerFactory.getLogger('groovy')

Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4)

def latch = new CountDownLatch(1)

def disp = flux
    .map { i -> "test $i" }
    .map { s -> "[${LocalTime.now()}] $s" }
    .doOnTerminate { latch.countDown() }
    .subscribe { logger.info(it) }

latch.await()
disp.dispose()

これを実行すると、次のように表示される。

[parallel-6] INFO groovy - [02:00:41.573] test 0
[parallel-6] INFO groovy - [02:00:41.822] test 1
[parallel-6] INFO groovy - [02:00:42.075] test 2
[parallel-6] INFO groovy - [02:00:42.322] test 3

これでまずは Flux の導入ができた。


flatten する

以下では上で作った Flux を起点に一定間隔でデータを投げる Flux を作っていく。各 Flux の起点から 400 ms 感覚でエミットされるデータを3つずつ取るものとする。次の表はデータがエミットされる時刻の一覧(単位は ms)

1st 2nd-0 2nd-1 2nd2
0 0 400 800
1 250 650 1050
2 500 900 1300
3 750 1150 1550

次のようなクラスを導入して、どのデータがいつエミットされたのかを記録する

class Sample {
  final long first
  final long second
  final LocalTime created
  Sample(first, second) {
    this.first = first
    this.second = second
    this.created = LocalTime.now()
  }
  @Override String toString() { "($created) $first - $second" }
}

また、次の関数によって long から Sample を作る

def longToSample = {long fst ->
    Flux.interval(Duration.ofMillis(400))
        .take(3)
        .map { long snd -> new Sample(fst, snd) }
}

発生した時刻をキーにマージする flatMap

flatMap は発生した時刻をキーにしてマージしていく。したがって、想定されるデータ列は先程の表から、 0-0(0 ms) -> 1-0(250 ms) -> 0-1(400 ms) -> 2-0(500 ms) -> 1-1(650 ms) -> 3-0(750 ms) -> 0-2(800 ms)... となることが予想される。

Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4)

def disp = flux
    .flatMap(longToSample)
    .map { s -> "[${LocalTime.now()}] $s" }
    .doOnTerminate { latch.countDown() }
    .subscribe { logger.info(it) }

latch.await()
disp.dispose()

この場合、出力は次のようになる

[parallel-5] INFO groovy - [10:32:07.413] (10:32:07.413) 0 - 0
[parallel-6] INFO groovy - [10:32:07.647] (10:32:07.647) 1 - 0
[parallel-5] INFO groovy - [10:32:07.798] (10:32:07.798) 0 - 1
[parallel-7] INFO groovy - [10:32:07.900] (10:32:07.900) 2 - 0
[parallel-6] INFO groovy - [10:32:08.047] (10:32:08.046) 1 - 1
[parallel-8] INFO groovy - [10:32:08.151] (10:32:08.151) 3 - 0
[parallel-5] INFO groovy - [10:32:08.199] (10:32:08.199) 0 - 2
[parallel-7] INFO groovy - [10:32:08.297] (10:32:08.297) 2 - 1
[parallel-6] INFO groovy - [10:32:08.446] (10:32:08.446) 1 - 2
[parallel-8] INFO groovy - [10:32:08.551] (10:32:08.551) 3 - 1
[parallel-7] INFO groovy - [10:32:08.696] (10:32:08.696) 2 - 2
[parallel-8] INFO groovy - [10:32:08.948] (10:32:08.948) 3 - 2

複数の系列で並行になんらかの処理を行いたい場合に用いるとよさそうである


元の順序をキープして新たな間隔でエミットする concatMap

concatMap でまとめられた Flux は元の順序(1st)を保持しつつ、新しく指定された間隔でエミットされる。想定される順番は 0-0 -> 0-1 -> 0-2 -> 1-0 -> 1-1 -> ... となり、それらの間隔が 400 ms となる(最初の 250 ms は無視される)。

Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4)

def disp = flux
    .concatMap(longToSample)
    .map { s -> "[${LocalTime.now()}] $s" }
    .doOnTerminate { latch.countDown() }
    .subscribe { logger.info(it) }

latch.await()
disp.dispose()

この場合の出力は次のようになる

[parallel-3] INFO groovy - [10:36:56.553] (10:36:56.553) 0 - 0
[parallel-3] INFO groovy - [10:36:56.939] (10:36:56.939) 0 - 1
[parallel-3] INFO groovy - [10:36:57.342] (10:36:57.342) 0 - 2
[parallel-4] INFO groovy - [10:36:57.743] (10:36:57.743) 1 - 0
[parallel-4] INFO groovy - [10:36:58.145] (10:36:58.145) 1 - 1
[parallel-4] INFO groovy - [10:36:58.543] (10:36:58.543) 1 - 2
[parallel-5] INFO groovy - [10:36:58.948] (10:36:58.948) 2 - 0
[parallel-5] INFO groovy - [10:36:59.346] (10:36:59.346) 2 - 1
[parallel-5] INFO groovy - [10:36:59.748] (10:36:59.748) 2 - 2
[parallel-6] INFO groovy - [10:37:00.153] (10:37:00.153) 3 - 0
[parallel-6] INFO groovy - [10:37:00.552] (10:37:00.552) 3 - 1
[parallel-6] INFO groovy - [10:37:00.949] (10:37:00.949) 3 - 2

元の順序をキープ(concatMap)、各要素は指定時刻に生成(flatMap)される flatMapSequential

flatMapSequentialconcatMapflatMap を組み合わせたようなもので、 要素の順序は元の順序(1st -> 2nd の順序)をキープしつつ、データのエミットタイミングもキープされる動作をとる。なお元の順序を飛び越してしまったデータはバッファリングされる。

def disp = flux
    .flatMapSequential(longToSample)
    .map { s -> "[${LocalTime.now()}] $s" }
    .doOnTerminate { latch.countDown() }
    .subscribe { logger.info(it) }

latch.await()
disp.dispose()

出力は次のようになる

[parallel-8] INFO groovy - [12:06:06.070] (12:06:06.069) 0 - 0
[parallel-8] INFO groovy - [12:06:06.455] (12:06:06.455) 0 - 1
[parallel-8] INFO groovy - [12:06:06.855] (12:06:06.855) 0 - 2
[parallel-8] INFO groovy - [12:06:06.856] (12:06:06.304) 1 - 0
[parallel-8] INFO groovy - [12:06:06.856] (12:06:06.702) 1 - 1
[parallel-9] INFO groovy - [12:06:07.104] (12:06:07.104) 1 - 2
[parallel-9] INFO groovy - [12:06:07.105] (12:06:06.551) 2 - 0
[parallel-9] INFO groovy - [12:06:07.105] (12:06:06.953) 2 - 1
[parallel-10] INFO groovy - [12:06:07.350] (12:06:07.350) 2 - 2
[parallel-10] INFO groovy - [12:06:07.351] (12:06:06.805) 3 - 0
[parallel-10] INFO groovy - [12:06:07.351] (12:06:07.205) 3 - 1
[parallel-11] INFO groovy - [12:06:07.605] (12:06:07.605) 3 - 2

データ 1-0 は データ 0-1 よりも先にエミットされるが、バッファリングされて 0-2 の後に出てきているのがわかる


おわり