Spring Fest で Spring の今後の話題としてあげられていたアイテムに RSocket というのがありました。
公式ページ (https://rsocket.io/) によると、 RSocket はTCP、WebSocket、Aeron(UDPのプロトコル)のようなバイトストリームによる伝送するためのバイナリープロトコルだそうです。RSocket には 4 つの伝送形態(リクエスト+レスポンス、リクエスト+ストリーム、fire-and-fogot(送信したら終わり)、Channel(双方向ストリーム))があるようです。頻繁に切断、再接続などをおこなうようなモバイルとサーバーのコネクションなども考慮されているようです。
今のところ、このプロトコルを利用できるライブラリーは Java 、 C++ 、 JavaScript 、 Kotlin だけのようです(リポジトリーには rsocket-py 、 rsocket-go というのもあるけど、 2017 年に更新が終わっている…)。
ちなみに、 RSocket で検索すると、 RDMA socket API というのが出てきますが、これとは別物です。
で、とりあえず Hello World ということで、 rsocket-java のレポジトリーにあった example プロジェクトの リクエスト + レスポンスのサンプルをコピペして、改変してみました。
enum Client {}
enum Server {}
public class HelloWorld {
private static final Logger logger = LoggerFactory.getLogger(HelloWorld.class);
private static final Logger serverLogger = LoggerFactory.getLogger(Server.class);
private static final Logger clientLogger = LoggerFactory.getLogger(Client.class);
public static void main(String[] args) throws Exception {
final Disposable serverDisposable =
RSocketFactory.receive()
.acceptor(
(setup, sendingSocket) -> {
serverLogger.info(
"setup: {}, data: {}", setup.dataMimeType(), setup.getDataUtf8());
final AtomicInteger atomicInteger = new AtomicInteger(0);
return Mono.just(
new AbstractRSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
serverLogger.info(
"request payload: {} {}",
payload.getMetadataUtf8(),
payload.getDataUtf8());
final int current = atomicInteger.getAndIncrement();
final Payload response =
DefaultPayload.create(
String.format(
"response(%d)[%s]",
current, DateTimeFormatter.ISO_INSTANT.format(Instant.now())),
payload.getDataUtf8());
return Mono.just(response);
}
});
})
.transport(() -> TcpServerTransport.create(7000))
.start()
.subscribe();
final CountDownLatch latch = new CountDownLatch(1);
final long start = System.currentTimeMillis();
logger.info("start at {}", DateTimeFormatter.ISO_INSTANT.format(Instant.now()));
final Disposable clientDisposable =
RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.flux()
.flatMap(
rsocket ->
createPayloads()
.flatMap(rsocket::requestResponse)
.map(
payload ->
String.format(
"metadata: %s, body: %s",
payload.getMetadataUtf8(), payload.getDataUtf8())))
.doOnTerminate(latch::countDown)
.subscribe(clientLogger::info);
latch.await();
logger.info(
"finish in {} ms at {}",
System.currentTimeMillis() - start,
DateTimeFormatter.ISO_INSTANT.format(Instant.now()));
clientDisposable.dispose();
serverDisposable.dispose();
}
private static Flux<Payload> createPayloads() {
return Flux.interval(Duration.ofMillis(125L))
.take(32)
.flatMap(
id ->
Flux.interval(Duration.ofMillis(2L))
.take(2000)
.map(sub -> String.format("%d - %d", id, sub)))
.map(message -> DefaultPayload.create(message, StandardCharsets.UTF_8));
}
}
コードの前半がサーバーのレスポンスを返すところで、後半が 2ミリ秒に1回、 8 秒間に合計 64000 回のリクエストを送信するクライアント部分です。
これを実行したところ、8.1 秒くらいで 64000 リクエストがさばけてすべてのレスポンスが返ってきたようです。
以下は実行したときのログのごく一部。
[reactor-tcp-nio-6] INFO com.example.Server - request payload: 24 - 1708
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 20 - 1958, body: response(59374)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload: 26 - 1583
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 22 - 1833, body: response(59375)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload: 28 - 1458
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 24 - 1708, body: response(59376)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload: 30 - 1333
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 26 - 1583, body: response(59377)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 28 - 1458, body: response(59378)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 30 - 1333, body: response(59379)[2018-11-01T15:55:00.973Z]