CompletableFuture<T>
から Mono<T>
に関しては、 Mono#fromFuture(CompletableFuture)
というメソッドがあるのだが、 Future<T>
からは Mono<T>
への変換メソッドはないし、もっと言えば Future<T>
から CompletableFuture<T>
への変換メソッドもない。
最初は次のような変換コードを書いていたが、あまり良さそうに見えない。
final Future<T> future = ...; final CompletableFuture<T> completableFuture = CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch(final InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); final Mono<T> mono = Mono.fromFuture(completableFuture);
いろいろ調べてたら、 MonoProcessor<T>
というものがあって、これは Subscriber<T>
を実装しているので、 onNext(T)
というメソッドから値を渡せるし、 onError(Throwable)
というメソッドから例外を伝播させられるようだ。
これを使うと次のようになる。
final Future<T> future = ...; final MonoProcessor<T> processor = MonoProcessor.create(); try { processor.onNext(future.get()); } catch(final InterruptedException | ExecutionException e) { processor.onError(e); } final Mono<T> mono = processor;
非常にシンプルにできた。
2018/05/06 0:12 追記
@making さんから、上のコードがブロックしていると指摘を受けた上で、次のようにするとよいとアドバイスをうけた。
final Future<T> future = ...; final Mono<T> mono = Mono.fromCallable(future::get) .subscribeOn(Schedulers.fromExecutore(executor));
これ、いずれの方法も、future.getをここで呼ぶとblockするのでよくないです。Futureはnon-blockingではないので、non-blockingな文脈で使いたいなら、threadをdispatchしないといけないです。こんな感じ(画像)。logを出して、onNextのスレッド名を出すとわかりやすいです。 pic.twitter.com/oxjgBRCyTn
— Toshiaki Maki (@making) 2018年4月29日