RSocket で request-stream のサーバーおよびクライアントを組み立てるときのコード例
サーバー
サーバーの構築
final Disposable server = RSocketFactory.receive() .acceptor(new MySocketAcceptor()) .transport(TcpServerTransport.create("localhost", 7000)) .start() .subscribe();
SocketAcceptor
request-stream をみたすサーバーの SocketAcceptor
が返す RSocket
は requestStream(Payload)
を返す
以下の例は、 クライアントの requestStream
にて送られてきた Payload
をそのまま何度も返すサーバーのもの。
class MySocketAcceptor implements SocketAcceptor { @Override public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) { return Mono.just(new MyRSocket()); } } class MyRSocket extends AbstractRSocket { @Override public Flux<Payload> requestStream(Payload payload) { return Flux.interval(Duration.ofMillis(200L)) .map(i -> DefaultPayload.create(payload.getData())); } }
クライアント
クライアントはまず Mono<RSocket>
を生成する。これは request-response の場合と変わらない。
final Mono<RSocket> clientMono = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 7000)) .start();
取得した RSocket
のインスタンスの requestStream
メソッドでリクエストすると、 Flux
RSocket rsocket = ... final Flux<Payload> responseStream = rsocket.requestStream(DefaultPayload.create("hello stream"));