ruby-kafka の consumer group で instance type や availability zone によって特定の partition の lag が大きくなる問題を解消する gem を作った

ruby-kafka を使った consumer のアプリケーションにおいて、次のような前提があるとします。

  • spot instance を使いたいため、複数の instance type、availability zone (以降 AZ) を使いたい
  • ピーク時とそうでない時で必要な consumer の数がかなり違うので auto scaling させたい
  • DB に書き込む処理がある
  • instance の CPU を有効活用したい(2 vCPU の instance には 2 consumer 配置したい)
  • instance の数はできるだけ抑えたい

ruby-kafka の consumer group はラウンドロビン方式で partition を consumer に割り当てます。各 consumer に配分される partition の数はだいたい同じになるため、consumer によってスループットが異なる場合、特定の partition の lag が大きくなってしまうことがあります。また、仮にスループットが同じであったとしても、CPU を有効活用するために同じ instance にいくつも consumer がある場合、consumer 単位で見ると partition 数の差は高々 1 であっても、instance 単位で見るとその差は大きくなります。例えば 4 vCPU の instance が 4 つ、各 instance に 4 consumer、partition が 20 の場合、均等に partition を配分すると 1 consumer 当たり 4 〜 5 の partition を処理することになりますが、5 partition する consumer が全て 1 instance に集中すると、instance 単位で見ると、8 partiton 処理する instance が 1 つと 4 partition 処理する instance が 3 つと、その差は 2 倍に広がります。

CPU バウンドな処理の場合は instance type と instance に配置する consumer の数を統一することで lag の偏りを抑えることもできるでしょう。ところが、DB に書き込む処理をする場合、instance の AZ が DB の writer の AZ と同じかどうかでスループットに極端な差が出ます。仮に利用する AZ を 1 つに限定したとしても、instance 単位で偏る問題を解消するには partition 数が consumer 数で割り切れるように auto scaling させる必要があり、難しい問題です。

というわけで、partition の割り当てを良い感じにすることで偏りを緩和する gem を作りました。
https://github.com/abicky/ruby-kafka-ec2

使い方

次のような基準で partition を割り当てるとします。

  • r5 instance には r4 instance より 1.2 倍の partition を割り当てる
  • m5 instance には r4 instance より 1.35 倍の partition を割り当てる
  • DB の writer と同じ AZ に存在する instance には、そうでない instance より 4 倍の partition を割り当てる

benchmark/consume_messages.rb からの抜粋ですが、次のように書くことでこの基準に沿った partition の割り当てが可能です。

require "aws-sdk-rds"
require "kafka/ec2"

rds = Aws::RDS::Client.new(region: "ap-northeast-1")
assignment_strategy_factory = Kafka::EC2::MixedInstanceAssignmentStrategyFactory.new(
  instance_family_weights: {
    "r4" => 1.00,
    "r5" => 1.20,
    "m5" => 1.35,
  },
  availability_zone_weights: ->() {
    db_cluster = rds.describe_db_clusters(filters: [
      { name: "db-cluster-id", values: ["ruby-kafka-ec2-benchmark"] },
    ]).db_clusters.first
    db_instance_id = db_cluster.db_cluster_members.find { |m| m.is_cluster_writer }.db_instance_identifier
    db_instance = rds.describe_db_instances(filters: [
      { name: "db-cluster-id", values: ["ruby-kafka-ec2-benchmark"] },
      { name: "db-instance-id", values: [db_instance_id] },
    ]).db_instances.first

    if db_instance.availability_zone == "ap-northeast-1a"
      {
        "ap-northeast-1a" => 1,
        "ap-northeast-1c" => 0.25,
      }
    else
      {
        "ap-northeast-1a" => 0.25,
        "ap-northeast-1c" => 1,
      }
    end
  },
)
consumer = Kafka::EC2.with_assignment_strategy_factory(assignment_strategy_factory) do
  kafka.consumer(group_id: KAFKA_CONSUMER_GROUP_ID)
end

availability_zone_weights は Proc ではなく Hash でも良いですが、failover などで writer の AZ が変わった時に面倒なので、assign の度に writer の AZ を確認するようにしています。

ベンチマーク

予め 200 個の partition それぞれに 2,000 個のメッセージを入れ、最後に FIN というメッセージを入れた状態で次のコードを実行してみました。

start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
processed_count = 0
partition_count = 0
end_time = nil
consumer.each_message do |message|
  if message.value == "FIN"
    end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    partition_count += 1
    logger.info "[#{metadata}] Received FIN message"
    next
  end

  JSON.parse(message.value)["events"].each do |event|
    Time.iso8601(event["timestamp"])
  end
  client.query("SELECT * FROM ruby_kafka_ec2_benchmark.events").to_a

  processed_count += 1
  logger.info "[#{metadata}] #{processed_count} messages were consumed" if (processed_count % 10_000).zero?
end

duration = end_time - start_time
logger.info "[#{metadata}] Complete (duration: #{duration}, partition_count: #{partition_count}, processed_count: #{processed_count})"

上記のコードは JSON.parseTime.iso8601 で多少 CPU を使いますが、一番時間のかかる処理は SELECT * FROM ruby_kafka_ec2_benchmark.events です。1

結果は https://github.com/abicky/ruby-kafka-ec2/tree/v0.1.0/benchmark#result のとおり、素の ruby-kafka だと最も早く終わった consumer が 54.1 秒、最も遅かった consumer が 258.9 秒と、200 秒以上の差があります。一方、ruby-kafka-ec2 を使うと、最も早く終わった consumer が 79.0 秒、最も遅かった consumer が 114.4 秒と、かなり偏りが緩和されていることがわかります。

余談

ruby-kafka の実装上、partition assignor をカスタマイズできないのでモンキーパッチを当てています。ruby-kafka に変更があっても影響を受けにくいように実装したつもりですが、Kafka::Protocol::JoinGroupRequest を書き換えているところは鬼畜な気がするので、近い将来 ruby-kafka に partition assignor をカスタマイズできるような PR を送りたいですね。

追記

PR を送って無事マージされました Support custom assignment strategy by abicky · Pull Request #846 · zendesk/ruby-kafka
ruby-kafka-ec2 にも反映済みです
https://github.com/abicky/ruby-kafka-ec2/commit/444ad89a63968d930d9e4b8f68ce8a5cd9e18b7f

  1. これで取得されるのは 1 レコードだけのつもりだったんですが、試行錯誤で produce_messages.rb を 20 回ぐらい実行した気がするので、20 レコードぐらい取得しているかもしれません…