RSocket のクライアントとサーバーを書くポイント。
サーバー
サーバーは requestChannel(Publisher<Payload>)
を実装する。戻り値は Flux<Payload>
@Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return Flux.from(payloads) .map( payload -> DefaultPayload.create("server-response")); }
戻すのは Flux<Payload>
なので、クライアントから特定の条件の Payload
が届いた場合は返す Payload
を2つ返すなどやってもよいと思われる(また、 filter
などで除外する等)。
クライアント
クライアントは RSocket
に対して、 requestChannel
を送信する。
Flux<Payload> runClient() { Mono<RSocket> client = ... return client.flux().flatMap(rsocket -> rsocket.requestChannel(request())); } private Flux<Payload> request() { return Flux.interval(Duration.ofMillis(messageInterval)) .take(10L) .map(count -> DefaultPayload.create("client-message")); }