rdkafka-ruby で Local: Broker transport failure (transport) が起きる場合の対処方法
rdkafka-ruby を使っていると “Local: Broker transport failure (transport)” というエラーに遭遇することがあります。様々な条件で発生するかもしれませんが、自分が遭遇したのは次の条件を全て満たす時です。
metadata.broker.list
(bootstrap.servers
) に指定されている broker のうち 1 台以上の調子が悪い- 高負荷状態であるとか、broker が起動していないとか
- producer を生成してからすぐに
produce
を実行する produce
を実行する際にpartition_key
が指定されている
なお、rdkafka-ruby のバージョンは 0.10.0、librdkafka のバージョンは 1.5.0 を前提としています。
エラーが起きる場合の対処方法
次のどれかの選択肢を取ることになります。設定値の調整ではどうにもなりません。
produce
に指定するkey
とpartition_key
が同じであればpartition_key
の指定をやめる- 単純にリトライする
- 個別にリトライする
attempt = 0 begin attempt += 1 producer.produce(**kwargs) rescue Rdkafka::RdkafkaError => e raise if attempt == 3 logger.warn("Retry to produce due to #{e} (#{e.class}) (#{attempt}th attempt)") retry end
- モンキーパッチを当てる
class Rdkafka::Producer prepend Module.new { def partition_count(*args) attempt ||= 0 attempt += 1 super rescue Rdkafka::RdkafkaError => e raise if attempt == 3 Rdkafka::Config.logger.warn(%Q{Retry to produce due to "#{e} (#{e.class})" (#{attempt}th attempt)}) retry end } end
- 個別にリトライする
rd_kafka_metadata
のタイムアウトが 1 秒以上になるようにモンキーパッチを当てるmodule Rdkafka::Bindings singleton_class.prepend Module.new { def rd_kafka_metadata(*args, timeout) timeout < 1250 ? super(*args, 1250) : super end } end
- producer を生成してから 1 秒以内に
produce
を実行するのを避ける- 例えばアプリケーション起動時に producer を生成してそれを使い回すなど
エラーが起きる理由
冒頭で述べた条件を全て満たす場合にエラーが起きる理由について説明します。
再現コード
localhost:9092 に Kafka broker が起動している場合、次のスクリプトを実行すると 2 回に 1 回程度 “Local: Broker transport failure (transport)” が起きます。
require 'rdkafka'
logger = Logger.new($stdout)
# Hack to display logs immediately
Rdkafka::Config::REQUIRED_CONFIG = { "log.queue": false }
module Rdkafka::Bindings
def self.rd_kafka_conf_set_log_cb(*); end
def self.rd_kafka_set_log_queue(*); end
end
producer = Rdkafka::Config.new({
'metadata.broker.list': 'localhost:9092,example.com:9092',
debug: 'broker,metadata',
}).producer
logger.info('Produce message')
begin
producer.produce(
topic: 'test',
partition_key: 'partition_key',
).wait
rescue Rdkafka::RdkafkaError => e
logger.error("#{e} (#{e.class})")
raise
ensure
producer.close
end
エラー発生時の backtrace は次のようになります。
/Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/metadata.rb:20:in `initialize': Local: Broker transport failure (transport) (Rdkafka::RdkafkaError)
from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:68:in `new'
from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:68:in `partition_count'
from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:106:in `produce'
from /Users/arabiki/path/to/script.rb:19:in `<main>'
backtrace の情報から、メタデータの取得に失敗していることがわかります。
メタデータの取得は partition_key
を指定した場合にのみ必要で、パーティション数を取得するために実行されています。
よって、もし key
と partition_key
が同じであれば partition_key
の指定をなくすだけでエラーを回避できます。
余談ですが、partition_key
を指定すると、partitioner に何を指定していても consistent_random partitioner が使われるので、注意が必要です。この問題は rdkafka-ruby#173 がマージされると解消します。
key
と partition_key
に異なるものを指定しつつエラーを回避するには、metadata.broker.list
に指定している broker が複数台あるのにも関わらず、1 台が異常なだけで rd_kafka_metadata
に失敗する理由がわかる必要があります。つまり、rdkafka-ruby や librdkafka の内部処理まで理解する必要があります。
produce の際の rdkafka-ruby & librdkafka の内部処理
まず、producer を生成する際、rdkafka-ruby は librdkafka の rd_kafka_new
を呼ぶことで handler を生成します。
rd_kafka_new
を実行すると、次のような処理が行われます。
- main thread の作成
- この中で定期的に行いたい処理を実行している
metadata.broker.list
に指定されている broker の追加metadata.broker.list
に指定されているうちランダムに選択した 1 台と接続- 接続を確立した broker からメタデータを取得する
- メタデータに存在する broker を登録する
- このタイミングで
metadata.broker.list
に指定されている broker は他の broker と同じ扱いになる
- このタイミングで
その後 produce
を実行すると、メタデータを取得する関数である rd_kafka_metadata
が呼び出されます。rdkafka-ruby の場合はタイムアウトが 250 ms でハードコーディングされています。rd_kafka_metadata
の中で、問い合わせ先の broker を選択する処理である rd_kafka_broker_any_usable
が呼び出され、この結果が NULL、つまり使用可能な broker が 1 台も見つからなかった場合に例のエラーになります。
rd_kafka_broker_any_usable
では次のような処理が行われています。
- state が UP か UPDATE の broker を 1 台選択
- 選択する際の優先順位は rdkafka_broker.c#L1366-L1372 にあるとおりで、接続に成功しやすいものを優先していそう
- state が UP や UPDATE の broker がいない場合はランダムで選択した broker に接続を確立するよう命令する (
rd_kafka_connect_any
) - broker の state が変わるまで待機
- 1 〜 3 を繰り返す
rd_kafka_new
実行直後は、librdkafka が認識している broker は metadata.broker.list
に指定したものだけであり、それらの state は INIT なので rd_kafka_connect_any
が呼ばれます。ただし、この関数は短時間で何度も呼び出されると無視するようになっています。短時間というのはデフォルトで 50 ms です1。
正常系では broker の state は次のように遷移するので、もし INIT から他の state に遷移する際に 50 ms 以上かつタイムアウト未満かかれば、再度 rd_kafka_connect_any
が呼ばれた時に他の broker が選択されますが、この確率は極めて低そうです。
- INIT
- TRY_CONNECT
- CONNECT
- APIVERSION_QUERY
- UP
よって、たまたま選択された broker が高負荷などの理由で、ある state(多くの場合は CONNECT)からなかなか変わらなかったらタイムアウトになるまで待ち続けることになります。
main thread では 1 秒間隔で rd_kafka_connect_any
を実行しているので、1 秒経てば別の broker と接続を確立しようとします。よって、タイムアウトが 1 秒より大きいと metadata.broker.list
に指定されている別の broker が選択されて、タイムアウトに引っかからない可能性があります。
ログを読み解く
再現コードを実行すると次のようなログが出力されますが、前述の挙動を頭に入れながらログを読み解いてみます。
/Users/arabiki/path/to/script.rb:6: warning: already initialized constant Rdkafka::Config::REQUIRED_CONFIG
/Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/config.rb:103: warning: previous definition of REQUIRED_CONFIG was here
%7|1635820149.745|BROKER|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread
%7|1635820149.745|BROKER|rdkafka#producer-1| [thrd:app]: example.com:9092/bootstrap: Added new broker with NodeId -1
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:app]: example.com:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1635820149.745|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xa)
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Enter main broker thread
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Received CONNECT op
%7|1635820149.745|STATE|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635820149.745|STATE|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
I, [2021-11-02T11:29:09.745968 #77599] INFO -- : Produce message
%7|1635820149.746|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
%7|1635820149.746|METADATA|rdkafka#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1635820149.746|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1635820149.762|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Connecting to ipv4#93.184.216.34:9092 (plaintext) with socket 17
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Selected for cluster connection: application metadata request (broker has 0 connection attempt(s))
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received CONNECT op
%7|1635820150.000|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635820150.000|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
E, [2021-11-02T11:29:10.001143 #77599] ERROR -- : Local: Broker transport failure (transport) (Rdkafka::RdkafkaError)
-- snip --
/Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/metadata.rb:20:in `initialize': Local: Broker transport failure (transport) (Rdkafka::RdkafkaError)
from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:68:in `new'
from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:68:in `partition_count'
from /Users/arabiki/.anyenv/envs/rbenv/versions/3.0.2/lib/ruby/gems/3.0.0/gems/rdkafka-0.10.0/lib/rdkafka/producer.rb:106:in `produce'
from /Users/arabiki/path/to/script.rb:19:in `<main>'
次のログから、metadata.broker.list
に指定された broker が追加され、broker のメイン関数が実行されたことがわかります。
%7|1635820149.745|BROKER|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread
%7|1635820149.745|BROKER|rdkafka#producer-1| [thrd:app]: example.com:9092/bootstrap: Added new broker with NodeId -1
%7|1635820149.745|BRKMAIN|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Enter main broker thread
次のログから、rd_kafka_connect_any
で example.com:9092/bootstrap が選択され、example.com:9092/bootstrap の thread が CONNECT という命令を受け取り、接続を “確立” していることがわかります。
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:app]: example.com:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Received CONNECT op
%7|1635820149.745|STATE|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635820149.745|CONNECT|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635820149.745|STATE|rdkafka#producer-1| [thrd:example.com:9092/bootstrap]: example.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
次のログから、produce
をした時に rd_kafka_connect_any
が呼ばれるも、最初に rd_kafka_connect_any
が呼ばれてから 1 ms 程度しか経っていないので無視されていることがわかります。そして、タイムアウトである約 250 ms が経ってから再度 rd_kafka_connect_any
が呼ばれ、localhost:9092/bootstrap が選択されたことがわかります。ただ、タイムアウトが過ぎているので、”Local: Broker transport failure (transport)” になります。
I, [2021-11-02T11:29:09.745968 #77599] INFO -- : Produce message
%7|1635820149.746|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Selected for cluster connection: application metadata request (broker has 0 connection attempt(s))
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received CONNECT op
%7|1635820150.000|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635820150.000|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635820150.000|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
E, [2021-11-02T11:29:10.001143 #77599] ERROR -- : Local: Broker transport failure (transport) (Rdkafka::RdkafkaError)
まとめ
以上のことから、冒頭で述べたように、設定値の調整ではエラーを回避することができず、次のいずれかの選択肢を取らなければならないことがわかったと思います。
produce
に指定するkey
とpartition_key
が同じであればpartition_key
の指定をやめる- 単純にリトライする
rd_kafka_metadata
のタイムアウトが 1 秒以上になるようにモンキーパッチを当てる- producer を生成してから 1 秒以内に
produce
を実行するのを避ける
おまけ 〜CLion を使った librdkafka のデバッグ〜
IntelliJ IDEA や Visual Studio Code 等でも可能でしょうが、自分は CLion を使って librdkafka の挙動を理解しました。
librdkafka では非公式ではあるものの、CMake をサポートしています。よって、CLion に簡単にインポートすることができます。
インポートすると examples 以下のサンプルコードが CMake アプリケーションとして登録されるので、rdkafka_example に適当な引数を渡すだけでブレークポイントを設定しつつ様々な処理を行えます。
例えば今回のようにメタデータ取得の処理を理解したい場合は Program arguments に次の内容を指定します。
-L -b localhost:9092,example.com:9092 -d broker,metadata