前方互換性・後方互換性とは何なのか? 〜Schema Registry における例〜
Schema evolution において前方互換性・後方互換性を意識することは重要ですが、どっちがどっちか混乱することがあるのでメモとして残しておきます。
一般的な説明
Wikipedia の互換性の説明がとても簡潔に表現されていると思うので引用します。
前方互換性の例として、よく白黒テレビがカラーテレビ放送を受信して白黒で表示できることが挙げられますが、ポイントとしては、前方互換性を考慮して白黒テレビが作られているわけではなく、前方互換性を考慮してカラーテレビ放送規格 (NTSC) が定められていることです。初めて「前方互換性」という言葉を知った時は「将来のカラーテレビ放送規格のことも見据えて白黒テレビを作るのは無理では?」と思ったものです。
一方、後方互換性に関してもテレビを例にすると、カラーテレビが白黒テレビ放送を受信して白黒で表示できることが挙げられ、この場合は後方互換性を考慮してカラーテレビを作ることになるのかなと思います。
このように、前方互換性を保つために工夫しなければいけないのはデータで、後方互換性を保つために工夫しなければいけないのはシステムであることが個人的に混乱する理由でした。
Schema Registry における例
Kafka のデータの schema を管理するのに Schema Registry というミドルウェアがあり、schema のバージョン管理ができます。
新しい schema バージョンを登録する際の制約として compatibility を設定することができ、それぞれの compatibility に対してどのような schema の変更が許容され、どのようなオペレーションが必要かが定められています。例えば BACKWARD(1 つ前のバージョンとの後方互換性)、FORWARD(1 つ前のバージョンとの前方互換性)は次のようになっています。
Compatibility Type | Changes allowed | Check against which schemas | Upgrade first |
---|---|---|---|
BACKWARD |
|
Last version | Consumers |
FORWARD |
|
Last version | Producers |
“Delete fields” は optional fields も required fields も削除でき、”Add fields” は optional fields も required fields も削除できます。
BACKWARD
後方互換性とは「古いシステム向けのデータなどが新しいシステムでも全て使用できること」でした。Kafka の文脈において、古いシステム向けのデータとは、古い schema で生成されたデータで、データを処理する「新しいシステム」とは consumer です。全く新しい consumer の追加かもしれませんし、consumer のバージョンアップかもしれません。
つまり、新しい実装の cosumer が 1 つ前の schema バージョンで生成されたデータも最新バージョンで生成されたデータも処理できる必要があるのがこの設定です。
最新の schema バージョンで追加されたフィールドが必ず存在する前提で consumer が実装されていた場合、1 つ前の schema バージョンで生成されたデータは処理できません。よって、フィールドの追加に関してはオプショナルなフィールドしか追加できません。
一方で、最新バージョンで削除されるフィールドがあるなら、それに依存しないように consumer を実装すれば良いので、フィールドの削除に関する条件は存在しません。ただし、現行の consumer が削除されるフィールドに依存していると問題になるので、新しい schema のデータが生成される前に consumer を更新しなければいけません。Upgrade first の項目が Consumers になっているのはこれが理由です。オプショナルなフィールドの追加であれば consumer から更新しても producer から更新しても大丈夫です。
なお、Avro においてオプショナルなフィールドというのは default 属性を持ったフィールドであり、値が nullable なフィールドではありません。例えば、int のオプショナルなフィールドを default 0 で追加した場合、consumer は古い schema バージョンで生成されたデータを読み込む際に 0 という値がセットされているものとして処理します。
FORWARD
前方互換性とは「新しいシステム向けのデータなどが古いシステムでも使用できること」でした。Kafka の文脈において、新しいシステム向けのデータとは、新しい schema で生成されたデータで、データを処理する「古いシステム」とは既に存在する consumer であり、consumer に変更を加えないことを意味します。
つまり、新しい schema バージョンで生成されたデータであっても既存の consumer が意図した処理を行える必要があるのがこの設定です。
最新のバージョンで何かしらのフィールドが追加されたとしても、古い consumer はそれを無視すれば良いだけなので、フィールドは自由に追加できます。一方で、古い consumer が利用しているフィールドが削除されると問題が発生するので、オプショナルなフィールドしか削除できません。なお、オプショナルなフィールドだとしても実は特定のオプション機能を提供するには必要なフィールドである可能性があるため、オプショナルなフィールドの削除は「consumer が正しく実装されていれば削除してもエラーが起きないことが期待される」ぐらいの意味になるかと思います。
新しいフィールドを追加するということは、そのフィールドに依存した consumer を追加・修正するケースでしょうから、そのフィールドが追加されたデータを処理することを保証するため Upgrade first の項目は Producers になっています。
ただし、producer を先に更新しても、consumer の commit offset が古過ぎると古い schema のデータを処理してしまうかもしれないので注意が必要です。
型の変更はどう解釈されるか?
後述する WikipediaFeedAvroLambdaExample で利用されるデータの schema を見てみます。
$ curl -s localhost:8081/subjects/WikipediaFeed-value/versions/latest/schema | jq .
{
"type": "record",
"name": "WikiFeed",
"namespace": "io.confluent.examples.streams.avro",
"fields": [
{
"name": "user",
"type": {
"type": "string",
"avro.java.string": "String"
}
},
{
"name": "is_new",
"type": "boolean"
},
{
"name": "content",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
}
]
}
compatibility は BACKWARD です。
$ curl -H "Content-Type: application/vnd.schemaregistry.v1+json" localhost:8081/config/WikipediaFeed-value -w "\n"
{"error_code":40401,"message":"Subject 'WikipediaFeed-value' not found."}
$ curl -H "Content-Type: application/vnd.schemaregistry.v1+json" localhost:8081/config -w "\n"
{"compatibilityLevel":"BACKWARD"}
例えば user を string または int の union type の場合に compatibility を満たすか確認してみます。
$ cat <<JSON | curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" localhost:8081/compatibility/subjects/WikipediaFeed-value/versions/latest -d @- -w "\n"
{
"schema": "{
\"namespace\": \"io.confluent.examples.streams.avro\",
\"type\": \"record\",
\"name\": \"WikiFeed\",
\"fields\":[
{\"name\": \"user\", \"type\": [\"string\", \"int\"]},
{\"name\": \"is_new\", \"type\": \"boolean\"},
{\"name\": \"content\", \"type\": [\"string\", \"null\"]}
]
}"
}
JSON
{"is_compatible":true}
backward compatible みたいです。古い schema のデータであれば user は string であり、新しい schema のデータであれば string か int なので、新しい consumer は string と int 両方の user を処理することが想定されていると考えられるので新旧両方の schema のデータに対応していると言えます。よって後方互換性の条件を満たしていると言えるでしょう。
compatibility を forward にした上で再度確認します。
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" localhost:8081/config/WikipediaFeed-value -d '{"compatibility":"FORWARD"}' -w "\n"
{"compatibility":"FORWARD"}
$ cat <<JSON | curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" localhost:8081/compatibility/subjects/WikipediaFeed-value/versions/latest -d @- -w "\n"
{
"schema": "{
\"namespace\": \"io.confluent.examples.streams.avro\",
\"type\": \"record\",
\"name\": \"WikiFeed\",
\"fields\":[
{\"name\": \"user\", \"type\": [\"string\", \"int\"]},
{\"name\": \"is_new\", \"type\": \"boolean\"},
{\"name\": \"content\", \"type\": [\"string\", \"null\"]}
]
}"
}
JSON
{"is_compatible":false}
forward compatible ではないと判定されました。古い consumer では int の user を処理できないので当然でしょう。
では、user を int に変更するとどうなるか確認してみます。
$ curl -X DELETE -H "Content-Type: application/vnd.schemaregistry.v1+json" localhost:8081/config/WikipediaFeed-value -w "\n"
{"compatibilityLevel":"FORWARD"}
$ cat <<JSON | curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" localhost:8081/compatibility/subjects/WikipediaFeed-value/versions/latest -d @- -w "\n"
{
"schema": "{
\"namespace\": \"io.confluent.examples.streams.avro\",
\"type\": \"record\",
\"name\": \"WikiFeed\",
\"fields\":[
{\"name\": \"user\", \"type\": \"int\"},
{\"name\": \"is_new\", \"type\": \"boolean\"},
{\"name\": \"content\", \"type\": [\"string\", \"null\"]}
]
}"
}
JSON
backward compatible ではないと判定されました。string から string, int の union type への変更だと backward compatible でしたが、int への変更だと backward compatible ではないようです。これは新しい consumer が string の user ではなく int の user に依存しているとみなされるからでしょう。古い schema のデータは string の user なので処理できません。よって後方互換性の定義を満たしません。
このように、type の変更はどのような type に変更するかで結果が変わるのが compatibility を理解する上で難しいところです。
おまけ 〜Kafka Streams アプリケーションでデータの Schema が変わった時の挙動を確認する〜
Kafka Streams アプリケーションでデータの schema が変わった時の挙動をサクッと確認したい場合、kafka-streams-examples の WikipediaFeedAvroLambdaExample を動かしてみるのが手軽です。
最初に次のようなコマンドで必要なトピックを作成します。
kafka-topics --create --topic WikipediaFeed --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
kafka-topics --create --topic WikipediaStats --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
次に、jar ファイルを作成します。
./mvnw -Dmaven.test.skip=true package
IDE で気になる処理にブレークポイントを設定した上で、次コマンドで WikipediaFeed にデータを投入すればデバッグし放題です。
java -cp target/kafka-streams-examples-7.5.3-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver
WikipediaFeedAvroExampleDriver は schema registry でエラーになっていると stuck するので、WikipediaFeedAvroLambdaExample を動かしているのに何も出力されない場合は schema registry のログを確認してみると良いです。
Java のような静的型付き言語でどのようにデータから特定のクラスのインスタンスを生成しているかというと、SpecificData#newRecord で Avro schema の情報からクラスを特定してインスタンスを生成し、SpecificRecordBase#put でフィールドをセットするみたいですね。
WikipediaFeed
だと #put
は次のような定義になっています。
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: user = value$ != null ? value$.toString() : null; break;
case 1: is_new = (java.lang.Boolean)value$; break;
case 2: content = value$ != null ? value$.toString() : null; break;
default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
}
}
上記の実装を見ると、フィールドの順番が変わるとまずいように見えますが、SpecificDatumReader で consumer の保持している schema 情報を使ってよしなに処理するようにしているみたいです。