rdkafka-ruby で Local: Broker transport failure (transport) が起きる場合の対処方法

rdkafka-ruby を使っていると “Local: Broker transport failure (transport)” というエラーに遭遇することがあります。様々な条件で発生するかもしれませんが、自分が遭遇したのは次の条件を全て満たす時です。

  • metadata.broker.list (bootstrap.servers) に指定されている broker のうち 1 台以上の調子が悪い
    • 高負荷状態であるとか、broker が起動していないとか
  • producer を生成してからすぐに produce を実行する
  • produce を実行する際に partition_key が指定されている

なお、rdkafka-ruby のバージョンは 0.10.0、librdkafka のバージョンは 1.5.0 を前提としています。

エラーが起きる場合の対処方法

次のどれかの選択肢を取ることになります。設定値の調整ではどうにもなりません。

  • produce に指定する keypartition_key が同じであれば partition_key の指定をやめる
  • 単純にリトライする
    • 個別にリトライする
        attempt = 0
        begin
          attempt += 1
          producer.produce(**kwargs)
        rescue Rdkafka::RdkafkaError => e
          raise if attempt == 3
          logger.warn("Retry to produce due to #{e} (#{e.class}) (#{attempt}th attempt)")
          retry
        end
      
    • モンキーパッチを当てる
        class Rdkafka::Producer
          prepend Module.new {
            def partition_count(*args)
              attempt ||= 0
              attempt += 1
              super
            rescue Rdkafka::RdkafkaError => e
              raise if attempt == 3
              Rdkafka::Config.logger.warn(%Q{Retry to produce due to "#{e} (#{e.class})" (#{attempt}th attempt)})
              retry
            end
          }
        end
      
  • rd_kafka_metadata のタイムアウトが 1 秒以上になるようにモンキーパッチを当てる
      module Rdkafka::Bindings
        singleton_class.prepend Module.new {
          def rd_kafka_metadata(*args, timeout)
            timeout < 1250 ? super(*args, 1250) : super
          end
        }
      end
    
  • producer を生成してから 1 秒以内に produce を実行するのを避ける
    • 例えばアプリケーション起動時に producer を生成してそれを使い回すなど

エラーが起きる理由

冒頭で述べた条件を全て満たす場合にエラーが起きる理由について説明します。

再現コード

localhost:9092 に Kafka broker が起動している場合、次のスクリプトを実行すると 2 回に 1 回程度 “Local: Broker transport failure (transport)” が起きます。

require 'rdkafka'

logger = Logger.new($stdout)

# Hack to display logs immediately
Rdkafka::Config::REQUIRED_CONFIG = { "log.queue": false }
module Rdkafka::Bindings
  def self.rd_kafka_conf_set_log_cb(*); end
  def self.rd_kafka_set_log_queue(*); end
end

producer = Rdkafka::Config.new({
  'metadata.broker.list': 'localhost:9092,example.com:9092',
  debug: 'broker,metadata',
}).producer

logger.info('Produce message')
begin
  producer.produce(
    topic:   'test',
    partition_key: 'partition_key',
  ).wait
rescue Rdkafka::RdkafkaError => e
  logger.error("#{e} (#{e.class})")
  raise
ensure
  producer.close
end

エラー発生時の backtrace は次のようになります。

/Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/metadata.rb:20:in `initialize': Local: Broker transport failure (transport) (Rdkafka::RdkafkaError)
        from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:68:in `new'
        from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:68:in `partition_count'
        from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:106:in `produce'
        from /Users/arabiki/path/to/script.rb:19:in `<main>'

backtrace の情報から、メタデータの取得に失敗していることがわかります。
メタデータの取得は partition_key を指定した場合にのみ必要で、パーティション数を取得するために実行されています。
よって、もし keypartition_key が同じであれば partition_key の指定をなくすだけでエラーを回避できます。

余談ですが、partition_key を指定すると、partitioner に何を指定していても consistent_random partitioner が使われるので、注意が必要です。この問題は rdkafka-ruby#173 がマージされると解消します。

keypartition_key に異なるものを指定しつつエラーを回避するには、metadata.broker.list に指定している broker が複数台あるのにも関わらず、1 台が異常なだけで rd_kafka_metadata に失敗する理由がわかる必要があります。つまり、rdkafka-ruby や librdkafka の内部処理まで理解する必要があります。

produce の際の rdkafka-ruby & librdkafka の内部処理

まず、producer を生成する際、rdkafka-ruby は librdkafka の rd_kafka_new を呼ぶことで handler を生成します

rd_kafka_new を実行すると、次のような処理が行われます。

  1. main thread の作成
    • この中で定期的に行いたい処理を実行している
  2. metadata.broker.list に指定されている broker の追加
  3. metadata.broker.list に指定されているうちランダムに選択した 1 台と接続
  4. 接続を確立した broker からメタデータを取得する
  5. メタデータに存在する broker を登録する
    • このタイミングで metadata.broker.list に指定されている broker は他の broker と同じ扱いになる

その後 produce を実行すると、メタデータを取得する関数である rd_kafka_metadata が呼び出されます。rdkafka-ruby の場合はタイムアウトが 250 ms でハードコーディングされていますrd_kafka_metadata の中で、問い合わせ先の broker を選択する処理である rd_kafka_broker_any_usable が呼び出され、この結果が NULL、つまり使用可能な broker が 1 台も見つからなかった場合に例のエラーになります

rd_kafka_broker_any_usable では次のような処理が行われています。

  1. state が UP か UPDATE の broker を 1 台選択
    • 選択する際の優先順位は rdkafka_broker.c#L1366-L1372 にあるとおりで、接続に成功しやすいものを優先していそう
  2. state が UP や UPDATE の broker がいない場合はランダムで選択した broker に接続を確立するよう命令する (rd_kafka_connect_any)
  3. broker の state が変わるまで待機
  4. 1 〜 3 を繰り返す

rd_kafka_new 実行直後は、librdkafka が認識している broker は metadata.broker.list に指定したものだけであり、それらの state は INIT なので rd_kafka_connect_any が呼ばれます。ただし、この関数は短時間で何度も呼び出されると無視するようになっています。短時間というのはデフォルトで 50 ms です1
正常系では broker の state は次のように遷移するので、もし INIT から他の state に遷移する際に 50 ms 以上かつタイムアウト未満かかれば、再度 rd_kafka_connect_any が呼ばれた時に他の broker が選択されますが、この確率は極めて低そうです。

  1. INIT
  2. TRY_CONNECT
  3. CONNECT
  4. APIVERSION_QUERY
  5. UP

よって、たまたま選択された broker が高負荷などの理由で、ある state(多くの場合は CONNECT)からなかなか変わらなかったらタイムアウトになるまで待ち続けることになります。

main thread では 1 秒間隔で rd_kafka_connect_any を実行しているので、1 秒経てば別の broker と接続を確立しようとします。よって、タイムアウトが 1 秒より大きいと metadata.broker.list に指定されている別の broker が選択されて、タイムアウトに引っかからない可能性があります。

ログを読み解く

再現コードを実行すると次のようなログが出力されますが、前述の挙動を頭に入れながらログを読み解いてみます。

/Users/arabiki/path/to/script.rb:6: warning: already initialized constant Rdkafka::Config::REQUIRED_CONFIG
/Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/config.rb:103: warning: previous definition of REQUIRED_CONFIG was here
%7|1635820149.745|BROKER|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread
%7|1635820149.745|BROKER|rdkafka#producer-1| [thrd:app]: example.com:9092/bootstrap: Added new broker with NodeId -1
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:app]: example.com:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1635820149.745|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xa)
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Enter main broker thread
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Received CONNECT op
%7|1635820149.745|STATE|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635820149.745|STATE|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
I, [2021-11-02T11:29:09.745968 #77599]  INFO -- : Produce message
%7|1635820149.746|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
%7|1635820149.746|METADATA|rdkafka#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1635820149.746|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1635820149.762|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Connecting to ipv4#93.184.216.34:9092 (plaintext) with socket 17
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Selected for cluster connection: application metadata request (broker has 0 connection attempt(s))
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received CONNECT op
%7|1635820150.000|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635820150.000|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
E, [2021-11-02T11:29:10.001143 #77599] ERROR -- : Local: Broker transport failure (transport) (Rdkafka::RdkafkaError)
-- snip --
/Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/metadata.rb:20:in `initialize': Local: Broker transport failure (transport) (Rdkafka::RdkafkaError)
        from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:68:in `new'
        from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:68:in `partition_count'
        from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:106:in `produce'
        from /Users/arabiki/path/to/script.rb:19:in `<main>'

次のログから、metadata.broker.list に指定された broker が追加され、broker のメイン関数が実行されたことがわかります。

%7|1635820149.745|BROKER|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread
%7|1635820149.745|BROKER|rdkafka#producer-1| [thrd:app]: example.com:9092/bootstrap: Added new broker with NodeId -1
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Enter main broker thread

次のログから、rd_kafka_connect_any で example.com:9092/bootstrap が選択され、example.com:9092/bootstrap の thread が CONNECT という命令を受け取り、接続を “確立” していることがわかります。

%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:app]: example.com:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Received CONNECT op
%7|1635820149.745|STATE|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635820149.745|STATE|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT

次のログから、produce をした時に rd_kafka_connect_any が呼ばれるも、最初に rd_kafka_connect_any が呼ばれてから 1 ms 程度しか経っていないので無視されていることがわかります。そして、タイムアウトである約 250 ms が経ってから再度 rd_kafka_connect_any が呼ばれ、localhost:9092/bootstrap が選択されたことがわかります。ただ、タイムアウトが過ぎているので、”Local: Broker transport failure (transport)” になります。

I, [2021-11-02T11:29:09.745968 #77599]  INFO -- : Produce message
%7|1635820149.746|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Selected for cluster connection: application metadata request (broker has 0 connection attempt(s))
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received CONNECT op
%7|1635820150.000|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635820150.000|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
E, [2021-11-02T11:29:10.001143 #77599] ERROR -- : Local: Broker transport failure (transport) (Rdkafka::RdkafkaError)

まとめ

以上のことから、冒頭で述べたように、設定値の調整ではエラーを回避することができず、次のいずれかの選択肢を取らなければならないことがわかったと思います。

  • produce に指定する keypartition_key が同じであれば partition_key の指定をやめる
  • 単純にリトライする
  • rd_kafka_metadata のタイムアウトが 1 秒以上になるようにモンキーパッチを当てる
  • producer を生成してから 1 秒以内に produce を実行するのを避ける

おまけ 〜CLion を使った librdkafka のデバッグ〜

IntelliJ IDEA や Visual Studio Code 等でも可能でしょうが、自分は CLion を使って librdkafka の挙動を理解しました。
librdkafka では非公式ではあるものの、CMake をサポートしています。よって、CLion に簡単にインポートすることができます。
インポートすると examples 以下のサンプルコードが CMake アプリケーションとして登録されるので、rdkafka_example に適当な引数を渡すだけでブレークポイントを設定しつつ様々な処理を行えます。
例えば今回のようにメタデータ取得の処理を理解したい場合は Program arguments に次の内容を指定します。

-L -b localhost:9092,example.com:9092 -d broker,metadata

  1. reconnect.backoff.ms を 22 以下にすれば最小 11 ms にできます