mike-neckのブログ

Java or Groovy or Swift or Golang

resilience4j の RateLimiter を少しだけいじった

resilience4j の RateLimiter を少しだけいじったので、そのメモ。 ここの実装はダメな実装になってるので、ちゃんとした使い方を調べてる人は他をあたった方が良い(強く推奨)。

resilience4j は Java8 用に設計された耐障害性のためのライブラリーで次の機能が提供されている。

  • サーキットブレーカー
  • レートリミッター
  • 隔壁(Bulkhead)
  • リトライ

そのうち、今回は RateLimiter をすこしだけさわった。


RateLimiter の取得

RateLimiter を取得するには、まず RateLimiterConfigRateLImiter の設定を RateLimiterRegistry に与えて、 RateLimiterRegistry からキーとなる名前を指定して取得する。キーはアクセストークン + パス の組み合わせなどのレートを設定したい値を使うらしい。

RateLimiterConfig config = RateLimiterConfig.custom()
    .limitRefreshPeriod(Duration.ofSeconds(2))
    .limitForPeriod(2)
    .timeoutDuration(Duration.ofMillis(100L))
    .build();
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);

RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("test");

試してみる

単なる for 文で試してみます。

@Test
void rateLimiterWithFor() throws InterruptedException {
  RateLimiterConfig config = RateLimiterConfig.custom()
      .limitRefreshPeriod(Duration.ofSeconds(2))
      .limitForPeriod(2)
      .timeoutDuration(Duration.ofMillis(100L))
      .build();
  RateLimiterRegistry registry = RateLimiterRegistry.of(config);
  List<Names> list = List.of(Names.EVEN, Names.ODD);
  for (int i = 0; i < 20; i++) {
    final int id = i;
    for (Names names : list) {
      RateLimiter rateLimiter = registry.rateLimiter(names.name());
      Runnable runnable = RateLimiter.decorateRunnable(
          rateLimiter,
          () -> logger.info("id: {}, name: {}", id, names));
      Try.runRunnable(runnable)
          .onFailure(
              e -> logger.info("error, id: {}, name: {}, error: {}",
                  id,
                  names,
                  e.getClass().getSimpleName()));
    }
    Thread.sleep(100L);
  }
}

enum Names {
  EVEN,
  ODD,
  ;
}

実行結果から、 2秒間で 2回通していることが伺える

23:25:08.496 [main] INFO com.example.RateLimiterTest - id: 0, name: EVEN
23:25:08.500 [main] INFO com.example.RateLimiterTest - id: 0, name: ODD
23:25:08.604 [main] INFO com.example.RateLimiterTest - id: 1, name: EVEN
23:25:08.604 [main] INFO com.example.RateLimiterTest - id: 1, name: ODD
23:25:08.807 [main] INFO com.example.RateLimiterTest - error, id: 2, name: EVEN, error: RequestNotPermitted
23:25:08.912 [main] INFO com.example.RateLimiterTest - error, id: 2, name: ODD, error: RequestNotPermitted
23:25:09.117 [main] INFO com.example.RateLimiterTest - error, id: 3, name: EVEN, error: RequestNotPermitted
23:25:09.221 [main] INFO com.example.RateLimiterTest - error, id: 3, name: ODD, error: RequestNotPermitted
23:25:09.423 [main] INFO com.example.RateLimiterTest - error, id: 4, name: EVEN, error: RequestNotPermitted
23:25:09.528 [main] INFO com.example.RateLimiterTest - error, id: 4, name: ODD, error: RequestNotPermitted
23:25:09.734 [main] INFO com.example.RateLimiterTest - error, id: 5, name: EVEN, error: RequestNotPermitted
23:25:09.836 [main] INFO com.example.RateLimiterTest - error, id: 5, name: ODD, error: RequestNotPermitted
23:25:10.039 [main] INFO com.example.RateLimiterTest - error, id: 6, name: EVEN, error: RequestNotPermitted
23:25:10.141 [main] INFO com.example.RateLimiterTest - error, id: 6, name: ODD, error: RequestNotPermitted
23:25:10.347 [main] INFO com.example.RateLimiterTest - error, id: 7, name: EVEN, error: RequestNotPermitted
23:25:10.451 [main] INFO com.example.RateLimiterTest - error, id: 7, name: ODD, error: RequestNotPermitted
23:25:10.556 [main] INFO com.example.RateLimiterTest - id: 8, name: EVEN
23:25:10.556 [main] INFO com.example.RateLimiterTest - id: 8, name: ODD
23:25:10.660 [main] INFO com.example.RateLimiterTest - id: 9, name: EVEN
23:25:10.661 [main] INFO com.example.RateLimiterTest - id: 9, name: ODD
23:25:10.867 [main] INFO com.example.RateLimiterTest - error, id: 10, name: EVEN, error: RequestNotPermitted
23:25:10.968 [main] INFO com.example.RateLimiterTest - error, id: 10, name: ODD, error: RequestNotPermitted
23:25:11.173 [main] INFO com.example.RateLimiterTest - error, id: 11, name: EVEN, error: RequestNotPermitted
23:25:11.274 [main] INFO com.example.RateLimiterTest - error, id: 11, name: ODD, error: RequestNotPermitted
23:25:11.476 [main] INFO com.example.RateLimiterTest - error, id: 12, name: EVEN, error: RequestNotPermitted
23:25:11.579 [main] INFO com.example.RateLimiterTest - error, id: 12, name: ODD, error: RequestNotPermitted
23:25:11.784 [main] INFO com.example.RateLimiterTest - error, id: 13, name: EVEN, error: RequestNotPermitted
23:25:11.885 [main] INFO com.example.RateLimiterTest - error, id: 13, name: ODD, error: RequestNotPermitted
23:25:12.086 [main] INFO com.example.RateLimiterTest - error, id: 14, name: EVEN, error: RequestNotPermitted
23:25:12.191 [main] INFO com.example.RateLimiterTest - error, id: 14, name: ODD, error: RequestNotPermitted
23:25:12.393 [main] INFO com.example.RateLimiterTest - error, id: 15, name: EVEN, error: RequestNotPermitted
23:25:12.483 [main] INFO com.example.RateLimiterTest - id: 15, name: ODD
23:25:12.585 [main] INFO com.example.RateLimiterTest - id: 16, name: EVEN
23:25:12.585 [main] INFO com.example.RateLimiterTest - id: 16, name: ODD
23:25:12.686 [main] INFO com.example.RateLimiterTest - id: 17, name: EVEN
23:25:12.789 [main] INFO com.example.RateLimiterTest - error, id: 17, name: ODD, error: RequestNotPermitted
23:25:12.990 [main] INFO com.example.RateLimiterTest - error, id: 18, name: EVEN, error: RequestNotPermitted
23:25:13.092 [main] INFO com.example.RateLimiterTest - error, id: 18, name: ODD, error: RequestNotPermitted
23:25:13.295 [main] INFO com.example.RateLimiterTest - error, id: 19, name: EVEN, error: RequestNotPermitted
23:25:13.401 [main] INFO com.example.RateLimiterTest - error, id: 19, name: ODD, error: RequestNotPermitted

Spring アプリケーションに取り込んでみる

サーバーのコード

io.github.resilience4j:resilience4j-spring-boot2:0.15.0 は使わなかった…

RateLimiterRegistry などを bean 登録して、 WebFilter でラップする。レートは 30 秒間で 6 回のリクエストとしている。

@SpringBootApplication
public class App {

  public static void main(String[] args) {
    SpringApplication.run(App.class, args);
  }

  @Bean
  RateLimiterConfig rateLimiterConfig() {
    return RateLimiterConfig.custom()
        .limitRefreshPeriod(Duration.ofSeconds(30L))
        .limitForPeriod(6)
        .timeoutDuration(Duration.ofMillis(10L))
        .build();
  }

  @Bean
  RateLimiterRegistry rateLimiterRegistry(RateLimiterConfig rateLimiterConfig) {
    return RateLimiterRegistry.of(rateLimiterConfig);
  }
}

RateLimiterOperator という resilience4j のクラスが Publisher<T> を返すのを Mono<T> に限定するクラス

@Component
public class MonoLimiterAdapterRegistry {

  private final RateLimiterRegistry rateLimiterRegistry;

  MonoLimiterAdapterRegistry(RateLimiterRegistry rateLimiterRegistry) {
    this.rateLimiterRegistry = rateLimiterRegistry;
  }

  <T> UnaryOperator<Mono<T>> getAdapter(String key) {
    Supplier<RateLimiter> rateLimiterSupplier = () -> rateLimiterRegistry.rateLimiter(key);
    return mono -> Mono.from(RateLimiterOperator.<T>of(rateLimiterSupplier.get()).apply(mono));
  }
}

WebFilter を生成する Configuration クラス。

仕様としては API KEY(X-API-KEY) ごとにリクエストの制限が発生するような形としている。

なお、レートリミットのパーミッションが取れない場合は例外が発生するので、 WebExceptionHandler にて TOO_MANY_REQUEST(429) を返す必要がある。

@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {

  private static final Logger logger = LoggerFactory.getLogger(WebConfig.class);

  private final MonoLimiterAdapterRegistry monoLimiterAdapterRegistry;
  private final AppController appController;
  private final ObjectMapper objectMapper;

  public WebConfig(
      MonoLimiterAdapterRegistry monoLimiterAdapterRegistry,
      AppController appController,
      ObjectMapper objectMapper) {
    this.monoLimiterAdapterRegistry = monoLimiterAdapterRegistry;
    this.appController = appController;
    this.objectMapper = objectMapper;
  }

  @Bean
  @Order(0)
  WebFilter apiRateLimiterFilter() {
    return (exchange, chain) -> {
      ServerHttpRequest request = exchange.getRequest();
      String key = ApiPathAndApiKey.from(request)
          .map(ApiPathAndApiKey::rateLimiterKey)
          .orElse("anonymous");
      UnaryOperator<Mono<ServerWebExchange>> adapter =
          monoLimiterAdapterRegistry.getAdapter(key);
      return adapter.apply(Mono.just(exchange))
          .doOnEach(sig -> logForFilter(request))
          .flatMap(chain::filter);
    };
  }

  private static void logForFilter(ServerHttpRequest request) {
    logger.info(
        "api rate limiter filter - framework-id: {}, request-id: {} , path: {}, x-api-key: {}",
        request.getId(),
        request.getHeaders().get("X-REQUEST-ID"),
        request.getPath(),
        request.getHeaders().get("X-API-KEY"));
  }

  @Bean
  @Order(-2)
  WebExceptionHandler requestNotPermitted() {
    return (exchange, ex) -> Mono.<Void>error(ex)
        .doOnError(
            RequestNotPermitted.class,
            e -> logger.info("api rate reached limit, path: {}, api: {}",
                exchange.getRequest().getPath(),
                exchange.getRequest().getHeaders().get("X-API-KEY")))
        .onErrorResume(
            RequestNotPermitted.class,
            requestNotPermitted -> writeRequestNotPermitted(exchange.getResponse(), exchange.getRequest()));
  }

  private Mono<Void> writeRequestNotPermitted(ServerHttpResponse response, ServerHttpRequest request) {
    DataBuffer buffer = response.bufferFactory().allocateBuffer();
    Map<String, String> obj = Map.of("path", request.getPath().value(), "message", "reached at rate limit.");
    Mono<DataBuffer> data = Mono.fromCallable(() -> buffer.write(writeJson(obj), StandardCharsets.UTF_8));
    if (response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS)) {
      return response.writeWith(data);
    }
    return Mono.empty();
  }
}

クライアントのコード

reactor-netty を使って、定期的にリクエストを送るようにしてみた。クラス名にもしたが、 3 秒に 1 回リクエストを送っている。

public class Runner1Per3Sec {

  private static final Logger logger = LoggerFactory.getLogger(Runner1Per3Sec.class);

  public static void main(String[] args) throws Exception {
    HttpClient httpClient = HttpClient.create();
    HttpClient client = httpClient.baseUrl("http://localhost:8080");

    CountDownLatch latch = new CountDownLatch(1);

    Disposable disposable = Flux.interval(Duration.ofSeconds(3L))
        .take(30L)
        .flatMap(l -> toResult(l,
            client.headers(headers ->
                headers.add("X-API-KEY", "test")
                    .add("X-REQUEST-ID", l))
            .get().uri("/api")))
        .doOnComplete(latch::countDown)
        .doOnError(e -> latch.countDown())
        .subscribe(result ->
            logger.info("id: {}, status: {}, body: {}",
                result.id, result.status, result.response));

    try (AutoCloseable ignore = disposable::dispose) {
      latch.await();
    }
  }

  private static Mono<Result> toResult(long id, HttpClient.ResponseReceiver<?> receiver) {
    return receiver.responseSingle((response, byteBufMono) ->
        byteBufMono.asString(StandardCharsets.UTF_8)
            .map(json -> new Result(id, response.status().code(), json)));
  }

  static class Result {
    final long id;
    final int status;
    final String response;

    Result(long id, int status, String response) {
      this.id = id;
      this.status = status;
      this.response = response;
    }
  }
}

実行結果

クライアント側のログ

サーバーのレートは 30 秒で 6 回、クライアントの送信頻度は 3 秒に 1 回(30 秒に 10 回)なので、おおよそ 10 回中 6 回が成功、 4 回が失敗のような感じになる。

[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 0, status: 200, body: {"method":"get","time":"2019-05-26T23:10:33.678972Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 1, status: 200, body: {"method":"get","time":"2019-05-26T23:10:36.383594Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 2, status: 200, body: {"method":"get","time":"2019-05-26T23:10:39.383387Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 3, status: 200, body: {"method":"get","time":"2019-05-26T23:10:42.386557Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 4, status: 200, body: {"method":"get","time":"2019-05-26T23:10:45.383297Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 5, status: 200, body: {"method":"get","time":"2019-05-26T23:10:48.383205Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 6, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 7, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 8, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 9, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 10, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 11, status: 200, body: {"method":"get","time":"2019-05-26T23:11:06.382195Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 12, status: 200, body: {"method":"get","time":"2019-05-26T23:11:09.387756Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 13, status: 200, body: {"method":"get","time":"2019-05-26T23:11:12.385488Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 14, status: 200, body: {"method":"get","time":"2019-05-26T23:11:15.383575Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 15, status: 200, body: {"method":"get","time":"2019-05-26T23:11:18.383269Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 16, status: 200, body: {"method":"get","time":"2019-05-26T23:11:21.383341Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 17, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 18, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 19, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 20, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 21, status: 200, body: {"method":"get","time":"2019-05-26T23:11:36.383870Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 22, status: 200, body: {"method":"get","time":"2019-05-26T23:11:39.383836Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 23, status: 200, body: {"method":"get","time":"2019-05-26T23:11:42.383771Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 24, status: 200, body: {"method":"get","time":"2019-05-26T23:11:45.383159Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 25, status: 200, body: {"method":"get","time":"2019-05-26T23:11:48.383385Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 26, status: 200, body: {"method":"get","time":"2019-05-26T23:11:51.386154Z","path":"/api"}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 27, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 28, status: 429, body: {"path":"/api","message":"reached at rate limit."}
[reactor-http-nio-4] INFO com.example.Runner1Per3Sec - id: 29, status: 429, body: {"path":"/api","message":"reached at rate limit."}

30 回中 18 回が成功していることがわかる。


注意

resilience4j の RateLimiter はオンメモリーで実行するタイプで、複数のサーバー間で状態を共有する方法は提供されていない。

github.com

これにはいくつか理由がある様子。

  • RateLimiter は速く動くべき
  • (redis 等の)キャッシュによる共有は複雑さとレイテンシーをもたらす
    • デメリットがメリットを上回る
  • resilience4j の RateLimiter はクライアントサイドのレートリミッターであり、サーバーサイドで使われるものではない

というわけで、ここの実装はダメなことを書いてあるので、他のブログないし、記事を探したほうが良い。