mike-neckのブログ

JavaかJavaFXかJavaEE(なんかJava8が多め)

Swift-NIO で HTTP Client を書いてみる

f:id:mike_neck:20180609045321p:plain

この前の土日に Swift-NIO を使って HTTP Client を書いてみたが、うまく動かなかった。


Bootstrap

基本的には Netty と同じ。 ClientBootstrap のイニシャライザーを呼び出して、 EventLoopGroupChannelOptionChannelInitializer(Channel -> EventLoopFuture のこと)を渡して、 connect(host:port:) すればつながるはず。

import NIO

let eventLoopGroup = MultiTHreadedEventLoopGroup(numberOfThreads: 1)
let bootstrap = ClientBootstrap(group:eventLoopGroup)
    .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
    .channelInitializer(channelInitializer())
let future = bootstrap.connect(host: "api.github.com", port: 443)
future.then { channel in
    var httpHeadPart = HTTPRequestHead(version: HTTPVersion(major:1, minor: 1), method: HTTPMethod.GET, uri: "http://api.github.com/search/repositories?q=swift&per_page=3")
    httpHeadPart.headers = HTTPHeaders([
        ("Host", "api.github.com"),
        ("Connection", "close"),
        ("Accept-Encoding", "gzip"),
        ("User-Agent", "Swift-NIO"),
        ("Accept", "application/json")
    ])
    return channel.writeAndFlush(HTTPClientRequestPart.head(httpRequestPart))
}

ChannelInitializer

ここもわりとすんなり書けた。

Netty の HttpObjectAggregator に該当するものがないようにも思われるが、 ユーザーコードが受け取るのは HTTPClientResponsePart(= HTTPPart<HTTPResponseHead, ByteBuffer>) となる模様。型からすると、レスポンスが集約されているように思われる。

func channelInitializer() -> (Channel) -> EventLoopFuture<Swift.Void> {
  return { channel in
      let pipeline = channel.pipeline
      let sslHandler = try! OpenSSLClientHandler(context: sslContext)
      _ = pipeline.add(handler: sslHandler, first: true)
      _ = pipeline.addHTTPClientHandlers()
      return pipeline.add(handler: MyClientHandler(), first: false)
  }
}

これでいけるはずだと思っていたが、実際には TLS のところで unableToValiadteCertificate Error が発生してしまって、何もできない…

URLSession による HTTP 接続と libcurl

f:id:mike_neck:20180609045321p:plain

swift で HTTP でAPIを呼び出すときなどは URLSession#dataTask(with:,completionHandler:) を使うのが一般的らしい。

guard let url = URL(string: "http://localhost:8080/api") else {
  throw MyURLError.invalidURL
}
URLSession.shared.dataTask(with: url) {(data, response, error) in
  if let e = error {
    print("error: \(e)")
  }
  if let r = response as? HTTPURLResponse {
    print("status: \(r.statusCode)")
    print("Content-Type: \(r.allHeaderFields["Content-Type"] ?? "")")
  }
  if let d = data {
    let body = String(data: d, encoding: String.Encoding.utf8) ?? ""
    print("body: \(body)")
  }
}

疑問: URLSession ってどんな非同期なの?

URLSession + URLSessionDataTask の組み合わせによる、一見非同期に見えるこの処理は本当に非同期なのか、それとも非同期っぽい同期コードなのかを知りたい。ここで、

  • ちゃんと非同期
    • select ないしは kqueue を使って、イベントループがネットワークのイベントを拾ってから処理をおこなっている
    • 大量のリクエストを出しても性能が劣化しない(いくつものリクエストをさばける)
    • Java で NIO 使った場合はこちらになる
  • 非同期っぽい同期
    • 同期的なネットワーク呼び出しを GCD でディスパッチして、バックグラウンドのスレッドに固まってもらうことで、 main スレッドは固まらないようにしている
    • 準備したスレッドプールの数を超えるリクエストを出すと、全体としての性能が劣化する
    • Java で NIO 使わない場合は、こちらになる

とする。3連休は自宅でダラダラ過ごしながら、どんな感じの非同期なのか疑問に思った我々は swift-corelibs-foundation と swift-corelibs-libdispatch に挑んだ。


結論: ちゃんと非同期

結論としては、前者であるが、 selectkqueue を直接使っているわけではない。

具体的には URLSession の通信部分は libcurl をつかっていて、一つひとつのリクエストは libcurl の easy handle API を利用している。それらを URLSession が保持している libcurl の multi handle API で処理している形である。


以下、読んだ内容をざっくりと…(C言語が読めないし、 libcurl よく知らないし、 swift も大して読めないので不正確かもしれない)

  • URLSession
    • 複数のリクエストもふくめて、 リクエスト処理をあらわす URLSessionTask(_TaskRegistry で管理されている) やそれをハンドルする DispatchQueue を管理しているクラス
    • インスタンス生成時に、その URLSession が管理する multi handle API を表す _MultiHandleインスタンスを生成する
    • dataTask などのメソッドでは URLSessionTaskインスタンスと、 completion handler をラップした _TaskRegistry._Behaviour_TaskRegistry に加える
  • URLSessionTask
    • リクエストの処理をユーザーが制御するためのクラスで、 ファイルを伴わないリクエスト -> URLSessionDataTask 、 Multi Part でファイルをアップロードするリクエスト -> URLSessionUploadTask 、 ファイルをダウンロードするリクエスト -> URLSessionDownloadTask などのサブクラスがある
    • これらのクラスはイニシャライザーでリクエストの URL に基づき URLProtocol を生成・保持する
    • URLSessionTaskresume 関数にて、 URLProtocolstartLoading 関数を経由して、 HTTPURLProtocolconfigureEasyHandle 関数にて libcurl の easy handle API のラッパーである _EasyHandle にリクエストのパラメーターを渡していく
  • URLProtocol
    • 多くの場合は HTTPURLProtocolインスタンスが生成されるが、ほとんどの関数はスーパークラスである _NativeProtocol のものが使われている
    • リダイレクトの場合の制御や、データ読み込み完了の際に呼び出される関数が提供されている
    • これらのコールバックが提供されているのは、 _NativeProtocol_EasyHandleDelegate プロトコルに準拠しているため
  • _MultiHandle
    • selectkqueue のような役割を提供している
    • ソケットのステータスが変更された場合に呼び出されるコールバック関数を提供する
    • curl multi API に監視対象の _SocketSources が登録されていない場合には、新しい _SocketSources を生成・登録する
    • ソケットの状態に基づき、 _ReadSource_WriteSource を登録する
    • _MultiHandle の中にイベントループがあり、 curl multi API から curl_multi_info_readを呼び出す。その関数で _EasyHandle が見つかった場合(つまり読み込み完了した場合)は _EasyHandlecompletedTransfer(withError:) を呼び出す。これは URLProtocol(_NativeProtocol) の transferCompleted(withError:) にディスパッチされて、パラメーターの error: NSError の有無によって、 failWith(:error, request:)(エラーの場合)または comleteTask() または redirectFor(request:) が呼び出される。 completeTask が呼ばれた場合は _ProtocolClienturlProtocolDidFinishLoading(_:URLProtocol) が呼び出されて、 URLSession が抱えている DispatchQueue 上で URLSessionTask から取得した completion handler(つまりユーザーが dataTask に与えたハンドラー) にレスポンスを渡す。処理が終わったあとは _TaskRegistry から URLSessionTask を取り除く

_EasyHandleDelegate

protocol _EasyHandleDelegate {
  // レスポンスを受け取ったとき
  func didReceive(data: Data) -> _EasyHandle._Action
  // レスポンスを受け取ったとき
  func didReceive(headerData data: Data, contentLength: Int64) -> _EasyHandle._Action
  // POST のボディを書き込めるとき
  func fill(writeBuffer buffer: UnsafeMutableBufferPointer<Int8>) -> _EasyHandle._WriteBufferResult
  // 書き込み完了後
  func transferCompleted(withError error: NSError?)
  // input stream のシーキング(? CURLOPT_SEEKFUNCTION で渡すオプションのコールバック e.g. https://curl.haxx.se/libcurl/c/CURLOPT_SEEKFUNCTION.html)
  func seekInputStream(to position: UInt64) throws
  // プログレスを更新 (? CURLOPT_XFERINFOFUNCTION で渡すオプションのコールバック e.g. https://curl.haxx.se/libcurl/c/CURLOPT_XFERINFOFUNCTION.html)
  func updateProgressMeter(with propgress: _EasyHandle._Progress)
}

以上から、動作は libcurl の multi handle API をベースにしたイベント駆動の作りになっており、一つの URLSession で何個でもリクエストを投げられるようになっている

周回遅れ(3周くらい)で Kubernetes 入門してみた

Kubernetes の名前は 2年ちょっと前くらいから耳にはしていたけど、全然触ったことないわりに、最近方方から聞こえてくるので触ってみることにした。

f:id:mike_neck:20180712010003p:plain

なお、基本的な内容はこちらのものを、ただコピペしてくだけの作業…

docs.microsoft.com


いくつかチュートリアルを続ける際に困った点

Azure を操作するために Azure-cliMac にインストールしていたのですが、新しいマシンを購入したので移行したところ、 az コマンドで Azure にログインできなくなりました。

$ az login
Note, we have launched a browser for you to login. For old experience with device code, use "az login --use-device-code"
Please ensure you have network connection. Error detail: HTTPSConnectionPool(host='login.microsoftonline.com', port=443): Max retries exceeded with url: /common/oauth2/token (Caused by SSLError("Can't connect to HTTPS URL because the SSL module is not available."))

これと同じメッセージを調べてみたところ、エラーメッセージにて SSL module is not available とある SSL module が標準でついているとのことで、 python を起動してインポートしてみたところ…

python
Python 3.7.0 (default, Jun 29 2018, 20:14:27) 
[Clang 9.0.0 (clang-900.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import ssl
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/ssl.py", line 98, in <module>
    import _ssl             # if we can't import it, let the error propagate
ImportError: dlopen(/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/lib-dynload/_ssl.cpython-37m-darwin.so, 2): Library not loaded: /usr/local/opt/openssl/lib/libssl.1.0.0.dylib
  Referenced from: /usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/lib-dynload/_ssl.cpython-37m-darwin.so
  Reason: image not found

とインポートに失敗しました。そこで、以前、 Mac OSX High Sierra では openssl という名前のパッケージが実際は libressl に変わっているということを思い出し、 pythonSSL モジュールは利用できないのではないかと考えました。

この件については brew でインストールすれば解決するわけですが、せっかく Apple(BSD?) が openssl をやめて libressl にしたのに、 openssl を入れるのは負けだなと思ったので、docker でコンテナを立ち上げて、そちらで az コマンドを使うようにしました。


dind

Kubernetes を扱うということは、 Docker もあつかうということで、 az コマンドだけでなくて、 docker コマンドも同じコンテナで使えないとうまくいかなそうです。たいして Docker を勉強していなかったので、 dind(Docker in Docker) というのがあるらしいので、そちらのイメージをベースに azure-cli を入れられるようにコンテナを作りました。

FROM docker:18.03.1-dind

RUN mkdir /root/.azure /azure-cli
VOLUME /root/.azure
WORKDIR /azure-cli

RUN \
  apk add --no-cache curl libffi python3-dev bash openssh ca-certificates jq openssl && \
  apk add --no-cache --virtual .build-deps gcc make openssl-dev libffi-dev musl-dev && \
  update-ca-certificates && \
  ln -s /usr/bin/python3.6 /usr/bin/python && \
  ln -s /usr/bin/pip3 /usr/bin/pip && \
  pip install --no-cache-dir --upgrade jmespath-terminal && \
  curl -L https://aka.ms/InstallAzureCli | \
    sed -e "s/tmp_XXXX/tmp_XXXXXX/g" \
      -e "s/\/dev\/tty/config.txt/g" > install.sh && \
  chmod +x install.sh && \
  echo /azure-cli/lib >> config.txt && \
  echo /azure-cli/bin >> config.txt && \
  echo y >> config.txt && \
  ./install.sh && \
  cat /azure-cli/lib/az.completion > ~/.bashrc && \
  runDeps="$( \
    scanelf --needed --nobanner --recursive /usr/local \
        | awk '{ gsub(/,/, "\nso:", $2); print "so:" $2 }' \
        | sort -u \
        | xargs -r apk info --installed \
        | sort -u \
    )"  && \  
  apk add --virtual .rundeps $runDeps && \
  apk del .build-deps

WORKDIR /
ENV PATH $PATH:/azure-cli/bin

CMD bash

なお、作ったコンテナはこちらから入手可能

azure-cli-dind


入門できたこと

  • Kubernetes でアプリケーションをデプロイ
  • Pod の数を増やす
  • ローリングアップデート
  • Kubernetes クラスタのアップグレード
    • 実行したらサービス落ちた

入門できなかったこと

  • volume などのデータの永続化
  • Istio などの周辺プロダクト

以上、特に見どころのないエントリー

reactor-netty コールドリーディング(4)

ChannelOperationsHandler

  • reactor-netty が提供している ChannelHandler の実装クラス
  • 内部に ContextHandler を持ち、これを通じて処理の登録などを行っている模様

channelActive(ChannelHandlerContext)

  • ContextHandler#createOperations(Channel, Object) を通じて ChannelOperationsChannelAttribute に保存する
    • ChannelOperations#autoCreateOperations の値が true の場合のみだが、このフィールドのデフォルト値は true となっている
    • ChannelOperations.OnNew<Channel>ChannelOperations を生成するが、これは ContextHandler#newClientContext で渡されてくる
    • ContextHandler#newClientContext に渡す ChannelOperations.OnNew<Channel>HttpClient#doHandler に渡される BiFunction<NettyInbound,NettyOutbound, Publisher<Void>> をラップして作られる
    • HttpClientOperations#bindHttpHttpClientOperations を生成するファクトリーメソッド
    • HttpClientOperationsChannelOperations のサブクラス。 ChannelOperationsInboundHandler のラッパー NettyInboundOutboundHandler のラッパー NettyOutbound を実装したクラス
handler != null ? (ch, c, msg) -> {
    if(onSetup != null){
        onSetup.accept(ch);
    }
    return HttpClientOperations.bindHttp(ch, handler, c);
} : EMPTY);

channelRead

  • channelActive で作った ChannelOperations#onInboundNext に処理を委譲する