Spring Fest で Spring の今後の話題としてあげられていたアイテムに RSocket というのがありました。
公式ページ (https://rsocket.io/) によると、 RSocket はTCP、WebSocket、Aeron(UDPのプロトコル)のようなバイトストリームによる伝送するためのバイナリープロトコルだそうです。RSocket には 4 つの伝送形態(リクエスト+レスポンス、リクエスト+ストリーム、fire-and-fogot(送信したら終わり)、Channel(双方向ストリーム))があるようです。頻繁に切断、再接続などをおこなうようなモバイルとサーバーのコネクションなども考慮されているようです。
今のところ、このプロトコルを利用できるライブラリーは Java 、 C++ 、 JavaScript 、 Kotlin だけのようです(リポジトリーには rsocket-py 、 rsocket-go というのもあるけど、 2017 年に更新が終わっている…)。
ちなみに、 RSocket で検索すると、 RDMA socket API というのが出てきますが、これとは別物です。
で、とりあえず Hello World ということで、 rsocket-java のレポジトリーにあった example プロジェクトの リクエスト + レスポンスのサンプルをコピペして、改変してみました。
enum Client {} enum Server {} public class HelloWorld { private static final Logger logger = LoggerFactory.getLogger(HelloWorld.class); private static final Logger serverLogger = LoggerFactory.getLogger(Server.class); private static final Logger clientLogger = LoggerFactory.getLogger(Client.class); public static void main(String[] args) throws Exception { final Disposable serverDisposable = RSocketFactory.receive() .acceptor( (setup, sendingSocket) -> { serverLogger.info( "setup: {}, data: {}", setup.dataMimeType(), setup.getDataUtf8()); final AtomicInteger atomicInteger = new AtomicInteger(0); return Mono.just( new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { serverLogger.info( "request payload: {} {}", payload.getMetadataUtf8(), payload.getDataUtf8()); final int current = atomicInteger.getAndIncrement(); final Payload response = DefaultPayload.create( String.format( "response(%d)[%s]", current, DateTimeFormatter.ISO_INSTANT.format(Instant.now())), payload.getDataUtf8()); return Mono.just(response); // Mono.error 返したらアプリケーション全体が死んだのでコメントアウト // if (current % 2 == 0) { // final Payload response = // DefaultPayload.create(String.format("response: %d, time: %s", current, // // DateTimeFormatter.ISO_INSTANT.format(Instant.now()))); // return Mono.just(response); // } else { // return Mono.error(new // Throwable(String.format("request: %d", current))); // } } }); }) .transport(() -> TcpServerTransport.create(7000)) .start() .subscribe(); final CountDownLatch latch = new CountDownLatch(1); final long start = System.currentTimeMillis(); logger.info("start at {}", DateTimeFormatter.ISO_INSTANT.format(Instant.now())); final Disposable clientDisposable = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 7000)) .start() .flux() .flatMap( rsocket -> createPayloads() .flatMap(rsocket::requestResponse) .map( payload -> String.format( "metadata: %s, body: %s", payload.getMetadataUtf8(), payload.getDataUtf8()))) .doOnTerminate(latch::countDown) .subscribe(clientLogger::info); latch.await(); logger.info( "finish in {} ms at {}", System.currentTimeMillis() - start, DateTimeFormatter.ISO_INSTANT.format(Instant.now())); clientDisposable.dispose(); serverDisposable.dispose(); } private static Flux<Payload> createPayloads() { return Flux.interval(Duration.ofMillis(125L)) .take(32) .flatMap( id -> Flux.interval(Duration.ofMillis(2L)) .take(2000) .map(sub -> String.format("%d - %d", id, sub))) .map(message -> DefaultPayload.create(message, StandardCharsets.UTF_8)); } }
コードの前半がサーバーのレスポンスを返すところで、後半が 2ミリ秒に1回、 8 秒間に合計 64000 回のリクエストを送信するクライアント部分です。
これを実行したところ、8.1 秒くらいで 64000 リクエストがさばけてすべてのレスポンスが返ってきたようです。
以下は実行したときのログのごく一部。
[reactor-tcp-nio-6] INFO com.example.Server - request payload: 24 - 1708 [reactor-tcp-nio-5] INFO com.example.Client - metadata: 20 - 1958, body: response(59374)[2018-11-01T15:55:00.973Z] [reactor-tcp-nio-6] INFO com.example.Server - request payload: 26 - 1583 [reactor-tcp-nio-5] INFO com.example.Client - metadata: 22 - 1833, body: response(59375)[2018-11-01T15:55:00.973Z] [reactor-tcp-nio-6] INFO com.example.Server - request payload: 28 - 1458 [reactor-tcp-nio-5] INFO com.example.Client - metadata: 24 - 1708, body: response(59376)[2018-11-01T15:55:00.973Z] [reactor-tcp-nio-6] INFO com.example.Server - request payload: 30 - 1333 [reactor-tcp-nio-5] INFO com.example.Client - metadata: 26 - 1583, body: response(59377)[2018-11-01T15:55:00.973Z] [reactor-tcp-nio-5] INFO com.example.Client - metadata: 28 - 1458, body: response(59378)[2018-11-01T15:55:00.973Z] [reactor-tcp-nio-5] INFO com.example.Client - metadata: 30 - 1333, body: response(59379)[2018-11-01T15:55:00.973Z]