ruby-kafka の consumer group で instance type や availability zone によって特定の partition の lag が大きくなる問題を解消する gem を作った
ruby-kafka を使った consumer のアプリケーションにおいて、次のような前提があるとします。
- spot instance を使いたいため、複数の instance type、availability zone (以降 AZ) を使いたい
- ピーク時とそうでない時で必要な consumer の数がかなり違うので auto scaling させたい
- DB に書き込む処理がある
- instance の CPU を有効活用したい(2 vCPU の instance には 2 consumer 配置したい)
- instance の数はできるだけ抑えたい
ruby-kafka の consumer group はラウンドロビン方式で partition を consumer に割り当てます。各 consumer に配分される partition の数はだいたい同じになるため、consumer によってスループットが異なる場合、特定の partition の lag が大きくなってしまうことがあります。また、仮にスループットが同じであったとしても、CPU を有効活用するために同じ instance にいくつも consumer がある場合、consumer 単位で見ると partition 数の差は高々 1 であっても、instance 単位で見るとその差は大きくなります。例えば 4 vCPU の instance が 4 つ、各 instance に 4 consumer、partition が 20 の場合、均等に partition を配分すると 1 consumer 当たり 4 〜 5 の partition を処理することになりますが、5 partition する consumer が全て 1 instance に集中すると、instance 単位で見ると、8 partiton 処理する instance が 1 つと 4 partition 処理する instance が 3 つと、その差は 2 倍に広がります。
CPU バウンドな処理の場合は instance type と instance に配置する consumer の数を統一することで lag の偏りを抑えることもできるでしょう。ところが、DB に書き込む処理をする場合、instance の AZ が DB の writer の AZ と同じかどうかでスループットに極端な差が出ます。仮に利用する AZ を 1 つに限定したとしても、instance 単位で偏る問題を解消するには partition 数が consumer 数で割り切れるように auto scaling させる必要があり、難しい問題です。
というわけで、partition の割り当てを良い感じにすることで偏りを緩和する gem を作りました。
https://github.com/abicky/ruby-kafka-ec2
使い方
次のような基準で partition を割り当てるとします。
- r5 instance には r4 instance より 1.2 倍の partition を割り当てる
- m5 instance には r4 instance より 1.35 倍の partition を割り当てる
- DB の writer と同じ AZ に存在する instance には、そうでない instance より 4 倍の partition を割り当てる
benchmark/consume_messages.rb からの抜粋ですが、次のように書くことでこの基準に沿った partition の割り当てが可能です。
require "aws-sdk-rds"
require "kafka/ec2"
rds = Aws::RDS::Client.new(region: "ap-northeast-1")
assignment_strategy_factory = Kafka::EC2::MixedInstanceAssignmentStrategyFactory.new(
instance_family_weights: {
"r4" => 1.00,
"r5" => 1.20,
"m5" => 1.35,
},
availability_zone_weights: ->() {
db_cluster = rds.describe_db_clusters(filters: [
{ name: "db-cluster-id", values: ["ruby-kafka-ec2-benchmark"] },
]).db_clusters.first
db_instance_id = db_cluster.db_cluster_members.find { |m| m.is_cluster_writer }.db_instance_identifier
db_instance = rds.describe_db_instances(filters: [
{ name: "db-cluster-id", values: ["ruby-kafka-ec2-benchmark"] },
{ name: "db-instance-id", values: [db_instance_id] },
]).db_instances.first
if db_instance.availability_zone == "ap-northeast-1a"
{
"ap-northeast-1a" => 1,
"ap-northeast-1c" => 0.25,
}
else
{
"ap-northeast-1a" => 0.25,
"ap-northeast-1c" => 1,
}
end
},
)
consumer = Kafka::EC2.with_assignment_strategy_factory(assignment_strategy_factory) do
kafka.consumer(group_id: KAFKA_CONSUMER_GROUP_ID)
end
availability_zone_weights
は Proc ではなく Hash でも良いですが、failover などで writer の AZ が変わった時に面倒なので、assign の度に writer の AZ を確認するようにしています。
ベンチマーク
予め 200 個の partition それぞれに 2,000 個のメッセージを入れ、最後に FIN というメッセージを入れた状態で次のコードを実行してみました。
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
processed_count = 0
partition_count = 0
end_time = nil
consumer.each_message do |message|
if message.value == "FIN"
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
partition_count += 1
logger.info "[#{metadata}] Received FIN message"
next
end
JSON.parse(message.value)["events"].each do |event|
Time.iso8601(event["timestamp"])
end
client.query("SELECT * FROM ruby_kafka_ec2_benchmark.events").to_a
processed_count += 1
logger.info "[#{metadata}] #{processed_count} messages were consumed" if (processed_count % 10_000).zero?
end
duration = end_time - start_time
logger.info "[#{metadata}] Complete (duration: #{duration}, partition_count: #{partition_count}, processed_count: #{processed_count})"
上記のコードは JSON.parse
と Time.iso8601
で多少 CPU を使いますが、一番時間のかかる処理は SELECT * FROM ruby_kafka_ec2_benchmark.events
です。1
結果は https://github.com/abicky/ruby-kafka-ec2/tree/v0.1.0/benchmark#result のとおり、素の ruby-kafka だと最も早く終わった consumer が 54.1 秒、最も遅かった consumer が 258.9 秒と、200 秒以上の差があります。一方、ruby-kafka-ec2 を使うと、最も早く終わった consumer が 79.0 秒、最も遅かった consumer が 114.4 秒と、かなり偏りが緩和されていることがわかります。
余談
ruby-kafka の実装上、partition assignor をカスタマイズできないのでモンキーパッチを当てています。ruby-kafka に変更があっても影響を受けにくいように実装したつもりですが、Kafka::Protocol::JoinGroupRequest
を書き換えているところは鬼畜な気がするので、近い将来 ruby-kafka に partition assignor をカスタマイズできるような PR を送りたいですね。
追記
PR を送って無事マージされました Support custom assignment strategy by abicky · Pull Request #846 · zendesk/ruby-kafka
ruby-kafka-ec2 にも反映済みです
https://github.com/abicky/ruby-kafka-ec2/commit/444ad89a63968d930d9e4b8f68ce8a5cd9e18b7f
-
これで取得されるのは 1 レコードだけのつもりだったんですが、試行錯誤で produce_messages.rb を 20 回ぐらい実行した気がするので、20 レコードぐらい取得しているかもしれません… ↩