Stateful Kafka Streams アプリケーションで consumer が増減すると意図しない偏りが発生する理由 〜 StreamsPartitionAssignor はいかにタスクを割り当てるのか 〜

Stateful Kafka Streams アプリケーション、つまり state store を使った Kafka Streams アプリケーションでは consumer が増減するとタスクの割当が極端に偏ることがあります。その理由について、WordCount の例を使って解説し、運用上の注意点についてまとめます。
なお、Kafka Streams のバージョンは 2.7.2 です。

サンプルアプリケーション

WordCount は次のように入力値を単語に分割し、単語ごとに処理した回数を数えるシンプルなアプリケーションです。

KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
      .groupBy((key, value) -> value)
      .count(Materialized.as("counts-store"))
      .toStream()
      .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

言葉で説明するのは簡単ではありますが、単語ごとに数を数えるために repartition 用の topic に書き出したり、数えた結果を state store に保存したりしています。

これは topology.describe() で内容を確認するとよくわかります。

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
      --> counts-store-repartition-filter
      <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: counts-store-repartition-filter (stores: [])
      --> counts-store-repartition-sink
      <-- KSTREAM-KEY-SELECT-0000000002
    Sink: counts-store-repartition-sink (topic: counts-store-repartition)
      <-- counts-store-repartition-filter

  Sub-topology: 1
    Source: counts-store-repartition-source (topics: [counts-store-repartition])
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store])
      --> KTABLE-TOSTREAM-0000000007
      <-- counts-store-repartition-source
    Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000008
      <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008 (topic: streams-wordcount-output)
      <-- KTABLE-TOSTREAM-0000000007

これを Kafka Streams Topology Visualizer で可視化すると次のようになります。

WordCount では state store に今までの結果を保存することで、新しい値が来た時にこれまでの結果を取り出し、カウントアップすることができています。このように、state store を使ったアプリケーションは stateful アプリケーションと呼ばれます。

Rebalance

Kafka topic を複数の consumer で処理する場合(グループを形成する場合)、1 partition を 1 consumer だけが処理するよう partition が割り当てられます。よって、consumer が増減すると partition の割り当ても変わります。これを rebalance と呼びます。
rebalance については丁寧な説明がいくらでも見つかると思いますが、例えば Static Membership: Rebalance Strategy Designed for the Cloud - Confluent を参照してもらうとイメージが付きやすいと良いと思います。

なお、この発表で説明されている static membership は、consumer のプロセスを再起動した場合などに rebalance が行われないよう、割り当てる partition を固定する機能です。後述する stateful アプリケーションの問題とも関連する話ですが、consumer が増減した場合とは別の話なので今回は詳しく触れません。

Stateful なアプリケーションの問題

state store は consumer がローカルに持つデータストアで、他の consumer からはアクセスできません。よって、rebalance が行われると state store を復元する必要があります。そのために使われるのが changelog topic と呼ばれる Kafka topic です。これは state store の redo ログみたいなもので、state store に書き込んだ内容を Kafka topic にも書き込むためのものです。
よって、大量のデータが state store に書き込まれると、復元にはそれに応じた時間がかかってしまいます。例えば 1 partition の state store の復元に 1 時間かかるとして、1 consumer が増減しただけで全 partition のアサインが変わると全ての処理が 1 時間止まることになります。

なお、WordCount では次のように streams-wordcount-counts-store-changelog という changelog topic が自動的に作成されます。

% kafka-topics --bootstrap-server localhost:9092 --list
__consumer_offsets
streams-plaintext-input
streams-wordcount-counts-store-changelog
streams-wordcount-counts-store-repartition
streams-wordcount-output

StreamsPartitionAssignor

StreamsPartitionAssignor は現在 Kafka Streams で利用されている partition assignor です。この partition assignor は前述の問題を軽減するために、state store を所有している consumer に partition を割り当てるようになっています。
このロジックを理解するには StreamsPartitionAssignor#subscriptionUserData, StreamsPartitionAssignor#assign を起点に読むと良いですが、中でも特に重要なメソッドは次の 2 つです。

TaskManager#getTaskOffsetSums は consumer が group に join する際に coordinator (Kafka broker の 1 台) に送る userdata の生成に使われるメソッドです。この userdata には changelog の offset 情報が含まれているんですが、現在自分が処理している task に関しては changelog offset は全て Task.LATEST_OFFSET とし、処理していない task に関しては state store の checkpoint ファイルの offset 情報を使うようになっています。なお、task というのは sub-topology 番号と partition 番号の組み合わせで、例えば sub-topology 0 の partition 1 だと 0_1 になります。WordCount の例だと source topic が 1 つなのでわかりにくいですが、task 0_1 を処理する consumer は、sub-topology 0 に複数の source topic があっても、それらの topic の partition 1 だけを consume します。

HighAvailabilityTaskAssignor#assign は各 task を consumer にアサインするのに StreamsPartitionAssignor で使われているメソッドです。

簡単のため、standby replica は 0 であるものとして説明します。そうすると、メソッドの内容は次のように単純になります。

public boolean assign(final Map<UUID, ClientState> clients,
                      final Set<TaskId> allTaskIds,
                      final Set<TaskId> statefulTaskIds,
                      final AssignmentConfigs configs) {
    final SortedSet<TaskId> statefulTasks = new TreeSet<>(statefulTaskIds);
    final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);

    assignActiveStatefulTasks(clientStates, statefulTasks);

    final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);

    final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(
        statefulTasks,
        clientStates,
        configs.acceptableRecoveryLag
    );

    final Map<UUID, Set<TaskId>> warmups = new TreeMap<>();

    final int neededActiveTaskMovements = assignActiveTaskMovements(
        tasksToCaughtUpClients,
        clientStates,
        warmups,
        remainingWarmupReplicas
    );

    assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));

    final boolean probingRebalanceNeeded = neededActiveTaskMovements > 0;

    return probingRebalanceNeeded;
}

assignActiveStatefulTasks では最初に stateful task を round-robin 方式で process ID 順に client に割り振ります。client というのは userdata に含まれる process ID が同じ consumer を 1 つのグループにしたものです。TaskManager#initializeProcessId で付与されるもので、プロセスごとにユニークな ID となっています。stream thread の数 (num.stream.threads) が 2 であれば 1 client に 2 consumer が属することになります。stateful task というのは state store に依存している task で、WordCount の場合 sub-topology 1 に属する task です。
なお、client が state store を所有している場合は、state store directory 配下の kafka-streams-process-metadata というファイルに process ID が保存されているので、再起動しても process ID は変わりません。

例えば parition の数が 6、client が A, B, C の 3 の場合、過去に各 client が処理していた task とは関係なしに次のように割り当てます。

Task Client
1_0 A
1_1 B
1_2 C
1_3 A
1_4 B
1_5 C

もし client ごとに stream thread の数が異なる場合、stream thread の数が多い client により多くの partition が割り当てられるよう、round-robin 方式で割り当てた後に割り当てが調整されますが、ここでは簡単のため stream thread の数は全て同じとします。

次に、tasksToCaughtUpClients では consumer が送ってきた changelog offset 情報を使って task ごとに caught-up client の一覧を作ります。caught-up client とは、現在その task を処理している client か changelog topic の最新の latest offset と state store offset が acceptable.recovery.lag 以下である client です。例えば WordCount の例で、client A が 1_0, 1_2, 1_4 を処理していて、B が残りを処理している状態であれば、caught-up client は次のようになります。

Task Caught-up Clients
1_0 [A]
1_1 [B]
1_2 [A]
1_3 [B]
1_4 [A]
1_5 [B]

これがもし、A が B の join 前に全ての task を処理していて、state store がまだ残っていて changelog offset も acceptable.recovery.lag 以下であれば次のようになります。

Task Caught-up Clients
1_0 [A]
1_1 [A, B]
1_2 [A]
1_3 [A, B]
1_4 [A]
1_5 [A, B]

最後に assignActiveTaskMovements で、round-robin 方式で割り当てられた stateful task が caught-up client に割り当てられていなかった場合に、caught-up client のいずれかに割り当てられるように調整されます。caught-up client でなかったがために stateful task が別の client に移った client には standby task が割り当てられます。standby task は state store の復元を行う task で、standby task が動くことで caught-up client でなかった client はそのうち caught-up client になります。一定時間経つと consumer が join/leave しなくても rebalance が行われ(probing rebalance)、その時に standby task を割り当てられていた client が caught-up client になっていれば調整が不要になります。つまり、最終的には round-robin 方式の割り当てに収束します。standby task を生成できる最大数は max.warmup.replicas で制御され、デフォルトは 2 です。standby task の数が max.warmup.replicas に達すると、task が caught-up client のいずれかに移るだけで、standby task は生成されません。
なお、どの task に standby task を割り当てるかの優先順位は決まっていません。もし Assign warmups to tasks in priority order by abicky · Pull Request #697 · confluentinc/kafka がマージされれば、caught-up client の数の少ない task から順に standby task が割り当てられ、もし caught-up client の数が同じであれば task ID 順で割り当てられるようになります。

ここで、round-robin 方式で割り当てた task と caught-up client が次のようになっている状態で client C が join した場合を考えます。

Task Client
1_0 A
1_1 B
1_2 C
1_3 A
1_4 B
1_5 C
Task Caught-up Clients
1_0 [A]
1_1 [B]
1_2 [A]
1_3 [B]
1_4 [A]
1_5 [B]

task 1_2 は client C に割り当てられていますが、C は caught-up client ではないので、この task 1_2 は A に割り当てられ、C には 1_2 の standby task が割り当てられます。同様に、task 1_5 は B に割り当てられ、C には 1_5 の standby task が割り当てられます。task 1_3, 1_4 も同様です。ただし、standby task を 4 つ割り当てるには max.warmup.replicas を 4 にしなければなりません。デフォルトだと 2 つしか standby task が作成されません。

下表の Source Client は round-robin 方式で各 task に割り当てられた client で、Destination Client は caught-up client を考慮した最終的な割り当てです。

Task Source Client Destination Client
1_0 A A
1_1 B B
1_2 C A
1_3 A B
1_4 B A
1_5 C B
Standby Task Client
1_2 C
1_3 A
1_4 B
1_5 C

あとはステートレスな task を割り当てたり、client に割り当てられた task を consumer に割り当てたり、再度 rebalance を行うべき時間を計算したりするだけです。

過去に使われていた assignor や StreamsPartitionAssignfor で使われている HighAvailabilityTaskAssignor の導入経緯などについては Kafka Streamsのタスク割り当てアルゴリズム - shinichy’s blog で触れられているので、興味のある方はそちらを参照してください。当該記事でも触れられていますが(というか一連のコードを読み終わってから知ったんですが)、KIP-441: Smooth Scaling Out for Kafka Streams は今回触れた仕組みのコンセプトが書かれているので、一読してからコードを読むと捗るかもしれません。

実験

WordCount のプロパティに次のような設定を追加します。

props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 60_000);
props.put(StreamsConfig.STATE_DIR_CONFIG, System.getenv().getOrDefault("STATE_DIR", "/tmp/kafka-streams"));
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

partition 数は 6 で topic を作成します。

for topic in streams-plaintext-input streams-wordcount-output; do
  kafka-topics --bootstrap-server localhost:9092 --create --topic $topic --partitions 6
done

これで環境変数 STATE_DIR=/tmp/kafka-streams-0STATE_DIR=/tmp/kafka-streams-1 をそれぞれ指定して 2 つの client を起動すると、stateful task の source topic となっている streams-wordcount-counts-store-repartition の partition の割当は次のようになります。

% kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group streams-wordcount | grep repartition | sort | cut -c19-189
streams-wordcount-counts-store-repartition 0          -               0               -               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-1
streams-wordcount-counts-store-repartition 1          -               0               -               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2
streams-wordcount-counts-store-repartition 2          -               0               -               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-2
streams-wordcount-counts-store-repartition 3          -               0               -               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1
streams-wordcount-counts-store-repartition 4          -               0               -               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-1
streams-wordcount-counts-store-repartition 5          -               0               -               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2

上記の情報から stateful task の割り当ても把握することができます。

Task Client Thread
1_0 4b61083e-57d0-457d-951f-d06556c148ca 1
1_1 7156cef1-307e-49c0-a5bb-0f11d26a4933 2
1_2 4b61083e-57d0-457d-951f-d06556c148ca 2
1_3 7156cef1-307e-49c0-a5bb-0f11d26a4933 1
1_4 4b61083e-57d0-457d-951f-d06556c148ca 1
1_5 7156cef1-307e-49c0-a5bb-0f11d26a4933 2

次に、適当な内容を topic に produce します。

echo 'a b c d e f g h i j k l m n o p q r s t u v w x y z' | kafka-console-producer --bootstrap-server localhost:9092 --topic streams-plaintext-input

offset が commit されるまで待つと offset の情報は次のようになりました。

% kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group streams-wordcount | grep repartition | sort | cut -c19-189
streams-wordcount-counts-store-repartition 0          2               2               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-1
streams-wordcount-counts-store-repartition 1          2               2               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2
streams-wordcount-counts-store-repartition 2          8               8               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-2
streams-wordcount-counts-store-repartition 3          6               6               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1
streams-wordcount-counts-store-repartition 4          4               4               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-1
streams-wordcount-counts-store-repartition 5          4               4               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2

ちなみにこの時の checkpoint ファイルの内容は次のようになっています。

% cat /tmp/kafka-streams-0/streams-wordcount/1_*/.checkpoint
0
1
streams-wordcount-counts-store-changelog 0 1
0
1
streams-wordcount-counts-store-changelog 2 7
0
1
streams-wordcount-counts-store-changelog 3 5
0
1
streams-wordcount-counts-store-changelog 4 3

checkpoint ファイルの内容は上から version(現状 0 のみ), expectedSize, topic partition offset です。
cf. OffsetCheckpoint

次に、環境変数 STATE_DIR=/tmp/kafka-streams-2 で別の client を起動します。この client ID は bbb83e43-b1af-4b2d-a60a-94a7091684d0 になりました。task は client ID をソートした上で round-robin 方式で割り当てるので、caught-up client を考慮しなければ次のような割り当てになります。

Task Client
1_0 bbb83e43-b1af-4b2d-a60a-94a7091684d0
1_1 4b61083e-57d0-457d-951f-d06556c148ca
1_2 7156cef1-307e-49c0-a5bb-0f11d26a4933
1_3 bbb83e43-b1af-4b2d-a60a-94a7091684d0
1_4 4b61083e-57d0-457d-951f-d06556c148ca
1_5 7156cef1-307e-49c0-a5bb-0f11d26a4933

ところが、1_0, 1_1, 1_2, 1_3 が割り当てられる client はそれらの state store を所有していない、つまり caught-up client じゃないので、1_0, 1_1, 1_2, 1_3 に関しては既に state store を所有している client に task を割り当てます。
最終的な割り当ては次のような leader のログから把握することができます。max.warmup.replicas が 2 なので、standby task は 1_0 と 1_2 だけです。

13:48:02.571 [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1-consumer] Assigned tasks [0_0, 0_1, 1_0, 0_2, 1_1, 0_3, 1_2, 0_4, 1_3, 0_5, 1_4, 1_5] including stateful [1_0, 1_1, 1_2, 1_3, 1_4, 1_5] to clients as: 
bbb83e43-b1af-4b2d-a60a-94a7091684d0=[activeTasks: ([0_0, 0_1, 0_2, 0_3]) standbyTasks: ([1_0])]
4b61083e-57d0-457d-951f-d06556c148ca=[activeTasks: ([0_4, 1_0, 1_2, 1_4]) standbyTasks: ([])]
7156cef1-307e-49c0-a5bb-0f11d26a4933=[activeTasks: ([0_5, 1_1, 1_3, 1_5]) standbyTasks: ([1_2])].
13:48:02.571 [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1-consumer] Requesting followup rebalance be scheduled by streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1-consumer-47d53591-da40-49ff-9c3c-b3054cf71902 for 1649652542571 ms to probe for caught-up replica tasks.

ログからも、probing.rebalance.interval.ms だけ経過した後(13:49:02)に rebalance するようスケジューリングされたことがわかります。この時点ではまだ割り当ては変わっていません。

% kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group streams-wordcount | grep repartition | sort | cut -c19-189
streams-wordcount-counts-store-repartition 0          2               2               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-1
streams-wordcount-counts-store-repartition 1          2               2               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2
streams-wordcount-counts-store-repartition 2          8               8               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-2
streams-wordcount-counts-store-repartition 3          6               6               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1
streams-wordcount-counts-store-repartition 4          4               4               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-1
streams-wordcount-counts-store-repartition 5          4               4               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2

1 分後のログは次のようになっていました。次のログから、1_3, 1_1 が standby task になっていることがわかります。

13:49:02.654 [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1-consumer] Assigned tasks [0_0, 0_1, 1_0, 0_2, 1_1, 0_3, 1_2, 0_4, 1_3, 0_5, 1_4, 1_5] including stateful [1_0, 1_1, 1_2, 1_3, 1_4, 1_5] to clients as: 
bbb83e43-b1af-4b2d-a60a-94a7091684d0=[activeTasks: ([0_0, 0_2, 0_4, 1_0]) standbyTasks: ([1_3])]
4b61083e-57d0-457d-951f-d06556c148ca=[activeTasks: ([0_1, 0_3, 0_5, 1_4]) standbyTasks: ([1_1])]
7156cef1-307e-49c0-a5bb-0f11d26a4933=[activeTasks: ([1_1, 1_2, 1_3, 1_5]) standbyTasks: ([])].

ログからもわかるとおり、元々 bbb83e43-b1af-4b2d-a60a-94a7091684d0 が 1_0 を処理するはずだったので、state store が復元されたことでそのまま処理するようになりました。同様に、1_2 は 7156cef1-307e-49c0-a5bb-0f11d26a4933 が処理するようになりました。一方で、7156cef1-307e-49c0-a5bb-0f11d26a4933 に stateful task が集中してしまったことがわかります。state store を既に 3 つ所有していた 7156cef1-307e-49c0-a5bb-0f11d26a4933 に 1_2 standby task が割り当てられたことで、7156cef1-307e-49c0-a5bb-0f11d26a4933 が 4 つの state store を所有するようになったのが原因です。

% kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group streams-wordcount | grep repartition | sort | cut -c19-189
streams-wordcount-counts-store-repartition 0          2               2               0               streams-wordcount-bbb83e43-b1af-4b2d-a60a-94a7091684d0-StreamThread-1
streams-wordcount-counts-store-repartition 1          2               2               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2
streams-wordcount-counts-store-repartition 2          8               8               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1
streams-wordcount-counts-store-repartition 3          6               6               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1
streams-wordcount-counts-store-repartition 4          4               4               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-1
streams-wordcount-counts-store-repartition 5          4               4               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2

この後、probing.rebalance.interval.ms だけ経過した後、rebalance が起こり、round-robin 方式の割り当てどおりになったため、standby task もなくなり、この状態に収束しました。

13:50:05.739 [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1-consumer] Assigned tasks [0_0, 0_1, 1_0, 0_2, 1_1, 0_3, 1_2, 0_4, 1_3, 0_5, 1_4, 1_5] including stateful [1_0, 1_1, 1_2, 1_3, 1_4, 1_5] to clients as: 
bbb83e43-b1af-4b2d-a60a-94a7091684d0=[activeTasks: ([0_0, 0_3, 1_0, 1_3]) standbyTasks: ([])]
4b61083e-57d0-457d-951f-d06556c148ca=[activeTasks: ([0_1, 0_4, 1_1, 1_4]) standbyTasks: ([])]
7156cef1-307e-49c0-a5bb-0f11d26a4933=[activeTasks: ([0_2, 0_5, 1_2, 1_5]) standbyTasks: ([])].
-- snip --
13:50:05.800 [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1-consumer] Finished stable assignment of tasks, no followup rebalances required.

consumer の割り当ては次のとおりです。

% kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group streams-wordcount | grep repartition | sort | cut -c19-189
streams-wordcount-counts-store-repartition 0          2               2               0               streams-wordcount-bbb83e43-b1af-4b2d-a60a-94a7091684d0-StreamThread-1
streams-wordcount-counts-store-repartition 1          2               2               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-2
streams-wordcount-counts-store-repartition 2          8               8               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-1
streams-wordcount-counts-store-repartition 3          6               6               0               streams-wordcount-bbb83e43-b1af-4b2d-a60a-94a7091684d0-StreamThread-2
streams-wordcount-counts-store-repartition 4          4               4               0               streams-wordcount-4b61083e-57d0-457d-951f-d06556c148ca-StreamThread-1
streams-wordcount-counts-store-repartition 5          4               4               0               streams-wordcount-7156cef1-307e-49c0-a5bb-0f11d26a4933-StreamThread-2

なお、別の consumer が処理していた task が割り当てられた場合、いきなり active task になるのではなく standby task として登録され、すぐに rebalance を行うようになっているようです。どうも [KAFKA-10078] Partition may skip assignment with static members and incremental rebalances - ASF JIRA が関係しているようですが、詳細は追っていません。

Stateful Kafka Streams アプリケーションの運用上の注意点

これまで見てきた内容から次のようなことが言えます。

  • client を増やしてもすぐには新しい client に stateful task が割り当てられない
  • 1 client だけ別 process ID の client に変えると stateful task の割り当てがコロコロ変わる可能性がある
  • Changelog topic の代わりに source topic を使っている場合はアプリケーションを止めている間に caught-up client が減る可能性がある

それぞれ解説していきます。

client を増やしてもすぐには新しい client に stateful task が割り当てられない

stateful Kafka Streams アプリケーションでは、client が増減しても、その直後は state store を所有している client(caught-up client)に stateful task を割り当てるようになっています。その際、最大 max.warmup.replicas だけ standby task が作られます。最終的には round-robin 方式の割り当てに収束していきますが、一度の rebalance で移る task は最大で max.warmup.replicas だけと言えます。また、実験で 7156cef1-307e-49c0-a5bb-0f11d26a4933 に 6 つ中 4 つの stateful task が集中したように、その過程で偏りが悪化することもあります。

できるだけ早く新しい clinet に stateful task を割り当てたい時にできることは次のどれかと言えます。

  • 起動時に KafkaStreams#cleanUp で state store のデータをクリアするようにして再起動する
  • max.warmup.replicas を増やして再起動する
    • 復元中の state store があると最初からやり直しになるので注意
  • probing.rebalance.interval.ms を短くして再起動する
    • デフォルトだと standby task によって caught-up client が増えても 10 分経たないと rebalance が起きない

state store を復元している間処理が止まることを許容できるなら state store のクリア、許容できないなら残りの選択肢の組み合わせになるでしょう。一方で、max.warmup.replicas を増やすと standby task が増え、Kafka broker や consumer の負荷が増えるので、良い塩梅にする必要があります。

1 client だけ別 process ID の client に変えると stateful task の割り当てがコロコロ変わる可能性がある

StreamsPartitionAssignor(というより HighAvailabilityTaskAssignor) は stateful task を round-robin 方式で process ID 順に client に割り当てることについて言及しました。例えば round-robin 方式による割り当て結果が次のようになっている状態で client C を client D に置き換えれば、process ID のソート順が変わらないので C に割り当てられていた stateful task が D に移るだけです。

Task Client
1_0 A
1_1 B
1_2 C
1_3 A
1_4 B
1_5 C

つまり、次のような割り当てに収束します。

Task Client
1_0 A
1_1 B
1_2 D
1_3 A
1_4 B
1_5 D

ところが、A を D に置き換えると、最終的には次の割り当てに収束します。

Task Client
1_0 B
1_1 C
1_2 D
1_3 B
1_4 C
1_5 D

全ての stateful task が別の client に移動することになるので、それまで standby task が動き続けることになります。

これを防ぐには、置き換え前の client の state store directory に存在する kafka-streams-process-metadata を置き換え後の client の state store directory に配置した状態で置き換えると良いはずです(未検証)

Changelog topic の代わりに source topic を使っている場合はアプリケーションを止めている間に caught-up client が減る可能性がある

Topology optimization を有効にすると、source topic を changelog topic の代わりに使うことができます。

Right now there are two possible optimizations, reusing the source topic as a changelog topic for a KTable created directly from an input topic.

通常、changelog topic は state store を使っている Kafka Streams アプリケーションからしか利用されないので、そのアプリケーションを動かしていない限りレコードも produce されず、どれだけ止めていても latest offset と state store offset の差は変わりません。一方で、source topic を state store に紐付けている場合、他のアプリケーションがその topic にレコードを produce する可能性があります。その結果、acceptable.recovery.lag を超えた lag が生じ、stateful task の割り当てに変化が生じる可能性があります。stateful task の割り当てが round-robin 方式の割り当てに収束していれば caught-up client が 0 になっても同じ割り当てになって、state store を一から復元することは避けられるはずですが(未検証)、もし収束していない状態で(standby task が動いている状態)で caught-up client が 0 になると、state store の有無を考慮せず round-robin 方式で task が割り当てられるため、復元が終わるまで処理が止まる task が出てきます。