mike-neckのブログ

JavaかJavaFXかJavaEE(なんかJava8が多め)

Stream の実行順

今更なテーマだが、たまに忘れてしまうのでメモ


次のようなインターフェースとクラスがあるものとする。

RequestValidation

ある要求 T を受け付けるか拒否するか判定するインターフェース。判定した結果リクエストを受け付けられない場合は R を返す。リクエストを受け付けられる場合は empty になる。

interface RequestValidation<T, R> {
  Optional<R> test(T input);
}

UserPropertyUpdateRequest

あるシステムのユーザーの属性を更新する要求。

class UserPropertyUpdateRequest {
  @Nullable final String firstName;
  @Nullable final String lastName;
  @Nullable final String timeZone;
  // コンストラクターは省略
}

UserPropertyUpdateFailure

ユーザーの属性更新が受け付けられないというチェック結果。

class UserPropertyUpdateFailure {
  @NotNull final String field;
  @NotNull final String message;
  // コンストラクターは省略
}

このときに、次のようなユーザーの属性を更新するための値の仕様があるものとする。

  • firstName に空文字も空白も認められない
  • lastName に空文字も空白も認められない
  • timeZonejava.time.ZoneId で解決できるもののみ登録する
  • firstName / lastName / timeZone とも null の場合はそれぞれの値を更新しない

それぞれの仕様を表す RequestValidation を作ると次のような感じになる。

RequestValidation<UserPropertyUpdateRequest, UserPropertyUpdateFailure> firstNameValidation = req -> {
  String fn = req.firstName;
  if (fn == null) return Optional.empty();
  if (fn.isEmpty() || fn.isBlank()) return Optional.of(new UserPropertyUpdateFailure("firstName", "blank or empty"));
  return Optional.empty();
};
RequestValidation<UserPropertyUpdateRequest, UserPropertyUpdateFailure> lastNameValidation = req -> {
  String ln = req.lastName;
  if (ln == null) return Optional.empty();
  if (ln.isEmpty() || ln.isBlank()) return Optional.of(new UserPropertyUpdateFailure("lastName", "blank or empty"));
  return Optional.empty();
};
RequestValidation<UserPropertyUpdateRequest, UserPropertyUpdateFailure> timeZoneValidation = req -> {
  String tz = req.timeZone;
  if (tz == null) return Optional.empty();
  try {
    ZoneId.of(tz);
    return Optional.empty();
  } catch (Exception e) {
    return Optional.of(new UserPropertyUpdateFailure("timeZone", "unknown timezone"));
  }
};

これらのチェックを行う際に、一つだけ見つかったら他のチェックはしないような処理を書きたいとする。そこで、最初は次のように書くのだが…

Optional<UserPropertyUpdateRequest> result1 = firstNameValidation.test(request);
if (result1.isPresent()) {
  return someResultValue;
}
Optional<UserPropertyUpdateRequest> result2 = lastNameValidation.test(request);
if (result2.isPresent()) {
  return someResultValue;
}
//...

このように書くと、せっかくインターフェースを導入した意味がなくなる。そこで、 Stream を使う。

Optional<UserPropertyUpdateFailure> result = Stream.of(firstNameValidation, lastNameValidation, timeZoneValidation)
  .map(validation -> validation.test(request))
  .filter(Optional::isPresent)
  .map(Optional::get)
  .findFirst();

さて、この Stream は本当に最初の一つが見つかったら処理を終了してくれるだろうか…という不安もあって、 Stream の処理順が気になってしまった。

なお、 Stream はすべての処理を一つにまとめた処理をつくって、それを一つ一つの要素に対して最後適用していく。したがって findFirst で最初に見つかったものがあれば、それ以降の要素に対しては処理を行わない。つまり、一つだけ見つかったら他のチェックはしないように処理を書くという最初の目的は達成されている。

確認

念のために確認する。

次のようなメソッドを定義して、 Stream を作る。

RequestValidation<UserPropertyUpdateRequest, UserPropertyUpdateFailure> wrapLog(
    String name,
    RequestValidation<UserPropertyUpdateRequest, UserPropertyUpdateFailure> delegate) {
  return req -> {
    System.out.println(name);
    return delegate.test(req);
  };
}

そして、各 RequestValidation をラップして実行してみる。

Optional<UserPropertyUpdateFailure> result = Stream.of(
    wrapLog("firstName", firstNameValidation),
    wrapLog("lastName", lastNameValidation),
    wrapLog("timeZone", timeZoneValidation)
  ).map(validation -> validation.test(request))
  .filter(Optional::isPresent)
  .map(Optional::get)
  .findFirst();

実行結果は次のとおり。

f:id:mike_neck:20181113024951p:plain

RSocket の request-stream のサーバー/クライアントを作る

RSocket で request-stream のサーバーおよびクライアントを組み立てるときのコード例

f:id:mike_neck:20181102013529p:plain


サーバー

サーバーの構築

final Disposable server = RSocketFactory.receive()
    .acceptor(new MySocketAcceptor())
    .transport(TcpServerTransport.create("localhost", 7000))
    .start()
    .subscribe();

SocketAcceptor

request-stream をみたすサーバーの SocketAcceptor が返す RSocketrequestStream(Payload) を返す

以下の例は、 クライアントの requestStream にて送られてきた Payload をそのまま何度も返すサーバーのもの。

class MySocketAcceptor implements SocketAcceptor {
  @Override
  public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
    return Mono.just(new MyRSocket());
  }
}

class MyRSocket extends AbstractRSocket {
  @Override
  public Flux<Payload> requestStream(Payload payload) {
    return Flux.interval(Duration.ofMillis(200L))
        .map(i -> DefaultPayload.create(payload.getData()));
  }
}

クライアント

クライアントはまず Mono<RSocket> を生成する。これは request-response の場合と変わらない。

final Mono<RSocket> clientMono = RSocketFactory.connect()
    .transport(TcpClientTransport.create("localhost", 7000))
    .start();

取得した RSocketインスタンスrequestStream メソッドでリクエストすると、 Flux が返ってくる。

RSocket rsocket = ...

final Flux<Payload> responseStream = rsocket.requestStream(DefaultPayload.create("hello stream"));

RSocket の Channel のクライアントとサーバーを書く

RSocket のクライアントとサーバーを書くポイント。

f:id:mike_neck:20181102013529p:plain


サーバー

サーバーは requestChannel(Publisher<Payload>) を実装する。戻り値は Flux<Payload>

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
  return Flux.from(payloads)
      .map(
          payload ->
              DefaultPayload.create("server-response"));
}

戻すのは Flux<Payload> なので、クライアントから特定の条件の Payload が届いた場合は返す Payload を2つ返すなどやってもよいと思われる(また、 filter などで除外する等)。

クライアント

クライアントは RSocket に対して、 requestChannel を送信する。

Flux<Payload> runClient() {
  Mono<RSocket> client = ...
  return client.flux().flatMap(rsocket -> rsocket.requestChannel(request()));
}

private Flux<Payload> request() {
  return Flux.interval(Duration.ofMillis(messageInterval))
      .take(10L)
      .map(count -> DefaultPayload.create("client-message"));
}

RSocket をちょっとだけ試してみた

Spring Fest で Spring の今後の話題としてあげられていたアイテムに RSocket というのがありました。

f:id:mike_neck:20181102013529p:plain

公式ページ (https://rsocket.io/) によると、 RSocket はTCP、WebSocket、Aeron(UDPプロトコル)のようなバイトストリームによる伝送するためのバイナリープロトコルだそうです。RSocket には 4 つの伝送形態(リクエスト+レスポンス、リクエスト+ストリーム、fire-and-fogot(送信したら終わり)、Channel(双方向ストリーム))があるようです。頻繁に切断、再接続などをおこなうようなモバイルとサーバーのコネクションなども考慮されているようです。

今のところ、このプロトコルを利用できるライブラリーは JavaC++JavaScript 、 Kotlin だけのようです(リポジトリーには rsocket-py 、 rsocket-go というのもあるけど、 2017 年に更新が終わっている…)。

ちなみに、 RSocket で検索すると、 RDMA socket API というのが出てきますが、これとは別物です。


で、とりあえず Hello World ということで、 rsocket-java のレポジトリーにあった example プロジェクトの リクエスト + レスポンスのサンプルをコピペして、改変してみました。

enum Client {}

enum Server {}

public class HelloWorld {
  private static final Logger logger = LoggerFactory.getLogger(HelloWorld.class);
  private static final Logger serverLogger = LoggerFactory.getLogger(Server.class);
  private static final Logger clientLogger = LoggerFactory.getLogger(Client.class);

  public static void main(String[] args) throws Exception {
    final Disposable serverDisposable =
        RSocketFactory.receive()
            .acceptor(
                (setup, sendingSocket) -> {
                  serverLogger.info(
                      "setup: {}, data: {}", setup.dataMimeType(), setup.getDataUtf8());
                  final AtomicInteger atomicInteger = new AtomicInteger(0);
                  return Mono.just(
                      new AbstractRSocket() {
                        @Override
                        public Mono<Payload> requestResponse(Payload payload) {
                          serverLogger.info(
                              "request payload: {} {}",
                              payload.getMetadataUtf8(),
                              payload.getDataUtf8());
                          final int current = atomicInteger.getAndIncrement();
                          final Payload response =
                              DefaultPayload.create(
                                  String.format(
                                      "response(%d)[%s]",
                                      current, DateTimeFormatter.ISO_INSTANT.format(Instant.now())),
                                  payload.getDataUtf8());
                          return Mono.just(response);
// Mono.error 返したらアプリケーション全体が死んだのでコメントアウト
                          //                            if (current % 2 == 0) {
                          //                                final Payload response =
                          // DefaultPayload.create(String.format("response: %d, time: %s", current,
                          //
                          // DateTimeFormatter.ISO_INSTANT.format(Instant.now())));
                          //                                return Mono.just(response);
                          //                            } else {
                          //                                return Mono.error(new
                          // Throwable(String.format("request: %d", current)));
                          //                            }
                        }
                      });
                })
            .transport(() -> TcpServerTransport.create(7000))
            .start()
            .subscribe();

    final CountDownLatch latch = new CountDownLatch(1);

    final long start = System.currentTimeMillis();
    logger.info("start at {}", DateTimeFormatter.ISO_INSTANT.format(Instant.now()));

    final Disposable clientDisposable =
        RSocketFactory.connect()
            .transport(TcpClientTransport.create("localhost", 7000))
            .start()
            .flux()
            .flatMap(
                rsocket ->
                    createPayloads()
                        .flatMap(rsocket::requestResponse)
                        .map(
                            payload ->
                                String.format(
                                    "metadata: %s, body: %s",
                                    payload.getMetadataUtf8(), payload.getDataUtf8())))
            .doOnTerminate(latch::countDown)
            .subscribe(clientLogger::info);

    latch.await();

    logger.info(
        "finish in {} ms at {}",
        System.currentTimeMillis() - start,
        DateTimeFormatter.ISO_INSTANT.format(Instant.now()));

    clientDisposable.dispose();
    serverDisposable.dispose();
  }

  private static Flux<Payload> createPayloads() {
    return Flux.interval(Duration.ofMillis(125L))
        .take(32)
        .flatMap(
            id ->
                Flux.interval(Duration.ofMillis(2L))
                    .take(2000)
                    .map(sub -> String.format("%d - %d", id, sub)))
        .map(message -> DefaultPayload.create(message, StandardCharsets.UTF_8));
  }
}

コードの前半がサーバーのレスポンスを返すところで、後半が 2ミリ秒に1回、 8 秒間に合計 64000 回のリクエストを送信するクライアント部分です。

これを実行したところ、8.1 秒くらいで 64000 リクエストがさばけてすべてのレスポンスが返ってきたようです。

以下は実行したときのログのごく一部。

[reactor-tcp-nio-6] INFO com.example.Server - request payload:  24 - 1708
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 20 - 1958, body: response(59374)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload:  26 - 1583
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 22 - 1833, body: response(59375)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload:  28 - 1458
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 24 - 1708, body: response(59376)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-6] INFO com.example.Server - request payload:  30 - 1333
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 26 - 1583, body: response(59377)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 28 - 1458, body: response(59378)[2018-11-01T15:55:00.973Z]
[reactor-tcp-nio-5] INFO com.example.Client - metadata: 30 - 1333, body: response(59379)[2018-11-01T15:55:00.973Z]