mike-neckのブログ

JavaかJavaFXかJavaEE(なんかJava8が多め)

Future を Mono に変換する

CompletableFuture<T> から Mono<T> に関しては、 Mono#fromFuture(CompletableFuture) というメソッドがあるのだが、 Future<T> からは Mono<T> への変換メソッドはないし、もっと言えば Future<T> から CompletableFuture<T> への変換メソッドもない。

最初は次のような変換コードを書いていたが、あまり良さそうに見えない。

final Future<T> future = ...;
final CompletableFuture<T> completableFuture = CompletableFuture.supplyAsync(() -> {
    try {
        return future.get();
    } catch(final InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    }
});
final Mono<T> mono = Mono.fromFuture(completableFuture);

いろいろ調べてたら、 MonoProcessor<T> というものがあって、これは Subscriber<T> を実装しているので、 onNext(T) というメソッドから値を渡せるし、 onError(Throwable) というメソッドから例外を伝播させられるようだ。

これを使うと次のようになる。

final Future<T> future = ...;
final MonoProcessor<T> processor = MonoProcessor.create();
try {
  processor.onNext(future.get());
} catch(final InterruptedException | ExecutionException e) {
  processor.onError(e);
}
final Mono<T> mono = processor;

非常にシンプルにできた。


2018/05/06 0:12 追記

@making さんから、上のコードがブロックしていると指摘を受けた上で、次のようにするとよいとアドバイスをうけた。

final Future<T> future = ...;
final Mono<T> mono = Mono.fromCallable(future::get)
    .subscribeOn(Schedulers.fromExecutore(executor));