mike-neckのブログ

JavaかJavaFXかJavaEE(なんかJava8が多め)

RSocket をちょっとだけ試してみた

Spring Fest で Spring の今後の話題としてあげられていたアイテムに RSocket というのがありました。

f:id:mike_neck:20181102013529p:plain

公式ページ (https://rsocket.io/) によると、 RSocket はTCP、WebSocket、Aeron(UDPプロトコル)のようなバイトストリームによる伝送するためのバイナリープロトコルだそうです。RSocket には 4 つの伝送形態(リクエスト+レスポンス、リクエスト+ストリーム、fire-and-fogot(送信したら終わり)、Channel(双方向ストリーム))があるようです。頻繁に切断、再接続などをおこなうようなモバイルとサーバーのコネクションなども考慮されているようです。

今のところ、このプロトコルを利用できるライブラリーは JavaC++JavaScript 、 Kotlin だけのようです(リポジトリーには rsocket-py 、 rsocket-go というのもあるけど、 2017 年に更新が終わっている…)。

ちなみに、 RSocket で検索すると、 RDMA socket API というのが出てきますが、これとは別物です。


で、とりあえず Hello World ということで、 rsocket-java のレポジトリーにあった example プロジェクトの リクエスト + レスポンスのサンプルをコピペして、改変してみました。

enum Client {}

enum Server {}

public class HelloWorld {
  private static final Logger logger = LoggerFactory.getLogger(HelloWorld.class);
  private static final Logger serverLogger = LoggerFactory.getLogger(Server.class);
  private static final Logger clientLogger = LoggerFactory.getLogger(Client.class);

  public static void main(String[] args) throws Exception {
    final Disposable serverDisposable =
        RSocketFactory.receive()
            .acceptor(
                (setup, sendingSocket) -> {
                  serverLogger.info(
                      "setup: {}, data: {}", setup.dataMimeType(), setup.getDataUtf8());
                  final AtomicInteger atomicInteger = new AtomicInteger(0);
                  return Mono.just(
                      new AbstractRSocket() {
                        @Override
                        public Mono<Payload> requestResponse(Payload payload) {
                          serverLogger.info(
                              "request payload: {} {}",
                              payload.getMetadataUtf8(),
                              payload.getDataUtf8());
                          final int current = atomicInteger.getAndIncrement();
                          final Payload response =
                              DefaultPayload.create(
                                  String.format(
                                      "response(%d)[%s]",
                                      current, DateTimeFormatter.ISO_INSTANT.format(Instant.now())),
                                  payload.getDataUtf8());
                          return Mono.just(response);
// Mono.error 返したらアプリケーション全体が死んだのでコメントアウト
                          //                            if (current % 2 == 0) {
                          //                                final Payload response =
                          // DefaultPayload.create(String.format("response: %d, time: %s", current,
                          //
                          // DateTimeFormatter.ISO_INSTANT.format(Instant.now())));
                          //                                return Mono.just(response);
                          //                            } else {
                          //                                return Mono.error(new
                          // Throwable(String.format("request: %d", current)));
                          //                            }
                        }
                      });
                })
            .transport(() -> TcpServerTransport.create(7000))
            .start()
            .subscribe();

    final CountDownLatch latch = new CountDownLatch(1);

    final long start = System.currentTimeMillis();
    logger.info("start at {}", DateTimeFormatter.ISO_INSTANT.format(Instant.now()));

    final Disposable clientDisposable =
        RSocketFactory.connect()
            .transport(TcpClientTransport.create("localhost", 7000))
            .start()
            .flux()
            .flatMap(
                rsocket ->
                    createPayloads()
                        .flatMap(rsocket::requestResponse)
                        .map(
                            payload ->
                                String.format(
                                    "metadata: %s, body: %s",
                                    payload.getMetadataUtf8(), payload.getDataUtf8())))
            .doOnTerminate(latch::countDown)
            .subscribe(clientLogger::info);

    latch.await();

    logger.info(
        "finish in {} ms at {}",
        System.currentTimeMillis() - start,
        DateTimeFormatter.ISO_INSTANT.format(Instant.now()));

    clientDisposable.dispose();
    serverDisposable.dispose();
  }

  private static Flux<Payload> createPayloads() {
    return Flux.interval(Duration.ofMillis(125L))
        .take(32)
        .flatMap(
            id ->
                Flux.interval(Duration.ofMillis(2L))
                    .take(2000)
                    .map(sub -> String.format("%d - %d", id, sub)))
        .map(message -> DefaultPayload.create(message, StandardCharsets.UTF_8));
  }
}

コードの前半がサーバーのレスポンスを返すところで、後半が 2ミリ秒に1回、 8 秒間に合計 64000 回のリクエストを送信するクライアント部分です。

これを実行したところ、8.1 秒くらいで 64000 リクエストがさばけてすべてのレスポンスが返ってきたようです。

以下は実行したときのログのごく一部。

[reactor-tcp-nio-6] INFO com.example.Server - request payload:  24 - 1708
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 20 - 1958, body: response(59374)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload:  26 - 1583
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 22 - 1833, body: response(59375)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload:  28 - 1458
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 24 - 1708, body: response(59376)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload:  30 - 1333
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 26 - 1583, body: response(59377)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 28 - 1458, body: response(59378)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 30 - 1333, body: response(59379)[2018-11-01T15:55:00.973Z]