mike-neckのブログ

JavaかJavaFXかJavaEE(なんかJava8が多め)

RSocket の request-stream のサーバー/クライアントを作る

RSocket で request-stream のサーバーおよびクライアントを組み立てるときのコード例

f:id:mike_neck:20181102013529p:plain


サーバー

サーバーの構築

final Disposable server = RSocketFactory.receive()
    .acceptor(new MySocketAcceptor())
    .transport(TcpServerTransport.create("localhost", 7000))
    .start()
    .subscribe();

SocketAcceptor

request-stream をみたすサーバーの SocketAcceptor が返す RSocketrequestStream(Payload) を返す

以下の例は、 クライアントの requestStream にて送られてきた Payload をそのまま何度も返すサーバーのもの。

class MySocketAcceptor implements SocketAcceptor {
  @Override
  public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
    return Mono.just(new MyRSocket());
  }
}

class MyRSocket extends AbstractRSocket {
  @Override
  public Flux<Payload> requestStream(Payload payload) {
    return Flux.interval(Duration.ofMillis(200L))
        .map(i -> DefaultPayload.create(payload.getData()));
  }
}

クライアント

クライアントはまず Mono<RSocket> を生成する。これは request-response の場合と変わらない。

final Mono<RSocket> clientMono = RSocketFactory.connect()
    .transport(TcpClientTransport.create("localhost", 7000))
    .start();

取得した RSocketインスタンスrequestStream メソッドでリクエストすると、 Flux が返ってくる。

RSocket rsocket = ...

final Flux<Payload> responseStream = rsocket.requestStream(DefaultPayload.create("hello stream"));