Swift の URLSession
の中で呼び出されている curl_multi_socket_action
の動作を調べていたので、そのメモ。
続きを読む
ここ最近 Reactor をいじって一定間隔の無限ストリームを生成するなどしていたこともあって、まとめることにした。タイトルに数字が入っていると読まれやすいっぽいので、タイトルには「3つのパターン」と書いたが、実際に flatMap
、 flatMapSequential
、 concatMap
の3つなので、あながち間違っているわけではない。
Flux
導入Flux
は 0 以上のデータを持つデータストリーム。例えば 0.25
秒おきに 0
から 1
ずつ増えるデータを記述すると次のようになる。
@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE') import reactor.core.publisher.* def flux = Flux.interval(Duration.ofMillis(250)).take(4)
では、この Flux<Long>
の数値を文字列として表示したいので、 String
になるように map
する。
def flux = Flux.interval(Duration.ofMillis(250)).take(4) flux.map { i -> "test $i" }
次に、時刻を表示したい。そこで、この String
と現在の時刻を文字列に埋め込んで表示する。
def flux = Flux.interval(Duration.ofMillis(250)).take(4) flux.map { i -> "test $i" } .map { s -> "[${LocalDate.now()}] $s" } .subscribe { logger.info(it) }
Flux
の中の処理はメインスレッドで subscribe
を呼び出してから開始されるので、メインスレッドよりも遅れて実行される。そのため先に終わったメインスレッドが Flux
の中の処理を待ち合わせるために、 CountDownLatch
などの同期化するための機構を埋め込まないとならない。 CountDownLatch
を Flux
の処理が終わったあとに countDown
されるように onTerminate
に処理を加えるなどすると、次のようなプログラムになる。
@Grab('io.projectreactor:reactor-core:3.1.8.RELEASE') @Grab('org.slf4j:slf4j-simple:1.7.25') import java.util.concurrent.* import java.time.* import java.time.format.* import reactor.core.publisher.* import reactor.core.scheduler.* import org.slf4j.* def logger = LoggerFactory.getLogger('groovy') Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def latch = new CountDownLatch(1) def disp = flux .map { i -> "test $i" } .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
これを実行すると、次のように表示される。
[parallel-6] INFO groovy - [02:00:41.573] test 0 [parallel-6] INFO groovy - [02:00:41.822] test 1 [parallel-6] INFO groovy - [02:00:42.075] test 2 [parallel-6] INFO groovy - [02:00:42.322] test 3
これでまずは Flux
の導入ができた。
以下では上で作った Flux
を起点に一定間隔でデータを投げる Flux
を作っていく。各 Flux
の起点から 400 ms
感覚でエミットされるデータを3つずつ取るものとする。次の表はデータがエミットされる時刻の一覧(単位は ms
)
1st | 2nd-0 | 2nd-1 | 2nd2 |
---|---|---|---|
0 | 0 |
400 |
800 |
1 | 250 |
650 |
1050 |
2 | 500 |
900 |
1300 |
3 | 750 |
1150 |
1550 |
次のようなクラスを導入して、どのデータがいつエミットされたのかを記録する
class Sample { final long first final long second final LocalTime created Sample(first, second) { this.first = first this.second = second this.created = LocalTime.now() } @Override String toString() { "($created) $first - $second" } }
また、次の関数によって long
から Sample
を作る
def longToSample = {long fst -> Flux.interval(Duration.ofMillis(400)) .take(3) .map { long snd -> new Sample(fst, snd) } }
flatMap
flatMap
は発生した時刻をキーにしてマージしていく。したがって、想定されるデータ列は先程の表から、 0-0(0 ms) -> 1-0(250 ms) -> 0-1(400 ms) -> 2-0(500 ms) -> 1-1(650 ms) -> 3-0(750 ms) -> 0-2(800 ms)...
となることが予想される。
Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def disp = flux .flatMap(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
この場合、出力は次のようになる
[parallel-5] INFO groovy - [10:32:07.413] (10:32:07.413) 0 - 0 [parallel-6] INFO groovy - [10:32:07.647] (10:32:07.647) 1 - 0 [parallel-5] INFO groovy - [10:32:07.798] (10:32:07.798) 0 - 1 [parallel-7] INFO groovy - [10:32:07.900] (10:32:07.900) 2 - 0 [parallel-6] INFO groovy - [10:32:08.047] (10:32:08.046) 1 - 1 [parallel-8] INFO groovy - [10:32:08.151] (10:32:08.151) 3 - 0 [parallel-5] INFO groovy - [10:32:08.199] (10:32:08.199) 0 - 2 [parallel-7] INFO groovy - [10:32:08.297] (10:32:08.297) 2 - 1 [parallel-6] INFO groovy - [10:32:08.446] (10:32:08.446) 1 - 2 [parallel-8] INFO groovy - [10:32:08.551] (10:32:08.551) 3 - 1 [parallel-7] INFO groovy - [10:32:08.696] (10:32:08.696) 2 - 2 [parallel-8] INFO groovy - [10:32:08.948] (10:32:08.948) 3 - 2
複数の系列で並行になんらかの処理を行いたい場合に用いるとよさそうである
concatMap
concatMap
でまとめられた Flux
は元の順序(1st
)を保持しつつ、新しく指定された間隔でエミットされる。想定される順番は 0-0 -> 0-1 -> 0-2 -> 1-0 -> 1-1 -> ...
となり、それらの間隔が 400 ms
となる(最初の 250 ms
は無視される)。
Flux<Long> flux = Flux.interval(Duration.ofMillis(250)).take(4) def disp = flux .concatMap(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
この場合の出力は次のようになる
[parallel-3] INFO groovy - [10:36:56.553] (10:36:56.553) 0 - 0 [parallel-3] INFO groovy - [10:36:56.939] (10:36:56.939) 0 - 1 [parallel-3] INFO groovy - [10:36:57.342] (10:36:57.342) 0 - 2 [parallel-4] INFO groovy - [10:36:57.743] (10:36:57.743) 1 - 0 [parallel-4] INFO groovy - [10:36:58.145] (10:36:58.145) 1 - 1 [parallel-4] INFO groovy - [10:36:58.543] (10:36:58.543) 1 - 2 [parallel-5] INFO groovy - [10:36:58.948] (10:36:58.948) 2 - 0 [parallel-5] INFO groovy - [10:36:59.346] (10:36:59.346) 2 - 1 [parallel-5] INFO groovy - [10:36:59.748] (10:36:59.748) 2 - 2 [parallel-6] INFO groovy - [10:37:00.153] (10:37:00.153) 3 - 0 [parallel-6] INFO groovy - [10:37:00.552] (10:37:00.552) 3 - 1 [parallel-6] INFO groovy - [10:37:00.949] (10:37:00.949) 3 - 2
concatMap
)、各要素は指定時刻に生成(flatMap
)される flatMapSequential
flatMapSequential
は concatMap
と flatMap
を組み合わせたようなもので、 要素の順序は元の順序(1st
-> 2nd
の順序)をキープしつつ、データのエミットタイミングもキープされる動作をとる。なお元の順序を飛び越してしまったデータはバッファリングされる。
def disp = flux .flatMapSequential(longToSample) .map { s -> "[${LocalTime.now()}] $s" } .doOnTerminate { latch.countDown() } .subscribe { logger.info(it) } latch.await() disp.dispose()
出力は次のようになる
[parallel-8] INFO groovy - [12:06:06.070] (12:06:06.069) 0 - 0 [parallel-8] INFO groovy - [12:06:06.455] (12:06:06.455) 0 - 1 [parallel-8] INFO groovy - [12:06:06.855] (12:06:06.855) 0 - 2 [parallel-8] INFO groovy - [12:06:06.856] (12:06:06.304) 1 - 0 [parallel-8] INFO groovy - [12:06:06.856] (12:06:06.702) 1 - 1 [parallel-9] INFO groovy - [12:06:07.104] (12:06:07.104) 1 - 2 [parallel-9] INFO groovy - [12:06:07.105] (12:06:06.551) 2 - 0 [parallel-9] INFO groovy - [12:06:07.105] (12:06:06.953) 2 - 1 [parallel-10] INFO groovy - [12:06:07.350] (12:06:07.350) 2 - 2 [parallel-10] INFO groovy - [12:06:07.351] (12:06:06.805) 3 - 0 [parallel-10] INFO groovy - [12:06:07.351] (12:06:07.205) 3 - 1 [parallel-11] INFO groovy - [12:06:07.605] (12:06:07.605) 3 - 2
データ 1-0
は データ 0-1
よりも先にエミットされるが、バッファリングされて 0-2
の後に出てきているのがわかる
おわり
以前書いたエントリー
の対処法が間違えていて、ビルドもなにもできなくなった上に、 REPL で import Foundation
をやっても同じエラーが出てしまう割と深刻な状況の対処法。
(巷にある ~/Library/Developer/Xcode
を消すやつが何の役にも立たなくて、絶望しかなかった)
もしかしたら、 swift を書けるエンジニアは知ってて当たり前の方法なのかもしれんけど…
参考にしたのはこちら。
原因は brew で以前にインストールしたヘッダーファイルが何らかの原因によってインクルードされた上、brew で入れたアプリケーションからはリンクされていないような状態にあるときに、このエラーは発生する模様。
$ brew doctor Please note that these warnings are just used to help the Homebrew maintainers with debugging if you file an issue. If everything you use Homebrew for is working fine: please don't worry or file an issue; just ignore this. Thanks! Warning: Unbrewed header files were found in /usr/local/include. If you didn't put them there on purpose they could cause problems when building Homebrew formulae, and may need to be deleted. Unexpected header files: /usr/local/include/6.1.0/backward/auto_ptr.h /usr/local/include/6.1.0/backward/backward_warning.h 〜こんな感じで2400個くらいヘッダーファイルがリストされている〜 /usr/local/include/zconf.h /usr/local/include/zlib.h
で、対処はこれらのヘッダーファイルを消す。
$ brew doctor 2>&1 | grep "\.h$" | xargs sudo rm
おわり