Avro でスキーマ管理されたデータを BigQuery で取り込めるデータに変換する
Kafka に入っている Avro フォーマット(スキーマは Schema Registry で管理されている)のデータを BigQuery に取り込みたいとします。その場合、おそらく Google BigQuery Sink Connector を使うのが良いでしょう。
ところが、諸事情により Ruby で Kafka からデータを逐次取得し、BigQuery にデータを取り込みたいとします。その場合、fluentd のある環境であれば、取り出したデータを fluentd に送り、fluent-plugin-bigquery で取り込むのが手軽でしょう。
さて、ここで問題になるのは次の 2 点です。
- Avro の map はどう表現するのか?
- Avro の union type はどう表現するのか?
ざっくりした仕様は Avro conversions に載っていて、単純なスキーマであれば変換するのは簡単なんですが、データがネストしている場合なども考慮し出すと変換処理が非常に複雑になります。というわけで、Avro のスキーマ情報を基に Hash
オブジェクトを BigQuery で取り込めるデータに変換する gem を作りました。
https://github.com/abicky/typed_data
使用例
README に書いてあるとおりですが、次のように Converter
にスキーマ情報を渡して、convert
メソッドに Hash
オブジェクトを渡せばよしなに変換してくれます。
require "typed_data"
schema = {
"name" => "Record",
"type" => "record",
"fields" => [
{
"name" => "int_field",
"type" => "int",
},
{
"name" => "int_or_string_field",
"type" => ["int", "string"],
},
{
"name" => "array_field",
"type" => {
"type" => "array",
"items" => "int",
},
},
{
"name" => "union_type_array_field",
"type" => {
"type" => "array",
"items" => ["int", "string"],
},
},
{
"name" => "nested_map_field",
"type" => {
"type" => "map",
"values" => {
"type" => "map",
"values" => ["int", "string"],
},
},
},
],
}
converter = TypedData::Converter.new(schema)
converter.convert({
"int_field" => 1,
"int_or_string_field" => "string",
"array_field" => [1, 2],
"union_type_array_field" => [1, "2"],
"nested_map_field" => {
"nested_map" => {
"key1" => 1,
"key2" => "2",
},
},
})
上記のコードだと、最終的に次のオブジェクトに変換されます。
{"int_field"=>1,
"int_or_string_field"=>{"int_value"=>nil, "string_value"=>"string"},
"array_field"=>[1, 2],
"union_type_array_field"=>
[{"int_value"=>"1", "string_value"=>nil},
{"int_value"=>nil, "string_value"=>"2"}],
"nested_map_field"=>
[{"key"=>"nested_map",
"value"=>
[{"key"=>"key1", "value"=>{"int_value"=>"1", "string_value"=>nil}},
{"key"=>"key2", "value"=>{"int_value"=>nil, "string_value"=>"2"}}]}]}
この形式は、実際に Avro フォーマットのデータを作成し、BigQuery に取り込んだ場合に作成されるテーブルのレコードと同じ形式です。
cf. spec/typed_data/converter_spec.rb#L15-L60
最終的に Google BigQuery Sink Connector でデータを取り込むように切り替える予定の場合、union type を表現する際のフォーマットを次のように変更すると良いです。
converter = TypedData::Converter.new(schema)
converter.union_type_key_formatter = ->(type) { type.split("_").first }
これによって、最初の例のデータは次のように変換されます。
{"int_field"=>1,
"int_or_string_field"=>{"int"=>nil, "string"=>"string"},
"array_field"=>[1, 2],
"union_type_array_field"=>
[{"int"=>"1", "string"=>nil}, {"int"=>nil, "string"=>"2"}],
"nested_map_field"=>
[{"key"=>"nested_map",
"value"=>
[{"key"=>"key1", "value"=>{"int"=>"1", "string"=>nil}},
{"key"=>"key2", "value"=>{"int"=>nil, "string"=>"2"}}]}]}
gem を開発した理由
BigQuery は Avro のデータを直接取り込むことができるので、以下のような選択肢もありました。
- fluent-plugin-avro を使う
- fluent-plugin-bigquery の
write
の時に Avro フォーマットに変換する
fluent-plugin-avro を使うと 1 レコードずつ Avro フォーマットに変換するので、1 レコードずつスキーマ情報を持つことになって微妙かなと思いやめました。あと、試してないですが、1 レコードずつ Avro フォーマットに変換すると、複数レコードをまとめて取り込むのが難しくなるんじゃないかと思います。
fluent-plugin-bigquery を改修するのに関しては、Avro のスキーマ情報から BigQuery のテーブルを作成したいニッチな人のためにわざわざ変更を入れるのは微妙だと思ってやめました。BigQuery に送るデータサイズが小さくなることぐらいメリットがないでしょうし。
また、そもそも、Avro フォーマットのデータを取り込むことで作成されたテーブルだと Google BigQuery Sink Connector が使えないはずなので、それも微妙だと思いました。
そんなわけで、今回 gem を作ることにしました。変換規則が自分の中で整理しきれてないこともあって、どういう結果になるのか理解するのが難しい箇所もありますが、テストを厚めに書いてあるので大丈夫でしょう。
そのうち次の機能も提供したいと思っています。
- Avro のスキーマから BigQuery のスキーマを生成するコマンドの提供
- 変換したオブジェクトを元に戻すコマンドの提供
bq query --format json
の出力結果を入力に JSON に変換できると読みやすいかなと
おまけ: Google BigQuery Sink Connector のフォーマットの確認方法
まず、Kafka に環境構築方法ですが、https://github.com/confluentinc/cp-docker-images の examples 以下に用途に応じた docker-compose.yml の例があるので、それを使うとサクッと環境構築できると思いますが、macOS の場合は次のような手順でも Google BigQuery Sink Connector を試せる環境を構築できます。
brew install confluent-platform
sudo install -d -m 0755 -o $USER -g admin $(brew --prefix)/logs
zookeeper-server-start -daemon $(brew --prefix)/etc/kafka/zookeeper.properties
kafka-server-start -daemon $(brew --prefix)/etc/kafka/server.properties
schema-registry-start -daemon $(brew --prefix)/etc/schema-registry/schema-registry.properties
cd $(brew --prefix)/bin/
confluent-hub install wepay/kafka-connect-bigquery:latest
Kafka の起動や connector のインストールが終わったら、$(brew --prefix)/share/confluent-hub-components/wepay-kafka-connect-bigquery/etc/connector.properties
の次の項目を変更すれば最低限動かすことができます。
- topics
- project
- datasets
- keyfile
設定を変更したら起動します。
connect-standalone \
$(brew --prefix)/etc/schema-registry/connect-avro-standalone.properties \
$(brew --prefix)/share/confluent-hub-components/wepay-kafka-connect-bigquery/etc/connector.properties
Kafka にデータを送るのは kafka-avro-console-producer
を使うのが手軽でしょうが、例えば Ruby だと次のようなコードで送ることができます。
require 'tmpdir'
require 'avro_turf/messaging'
require 'kafka'
CLIENT_ID = ENV["CLIENT_ID"]
TOPIC = ENV["TOPIC"]
SCHEMA_NAME = ENV["SCHEMA_NAME"]
BROKERS = ['localhost:9092']
SCHEMA_REGISTRY_URL = 'http://localhost:8081'
SCHEMA = <<-JSON
{
...
}
JSON
data = {
...
}
Dir.mktmpdir do |dir|
kafka = Kafka.new(BROKERS, client_id: CLIENT_ID, logger: logger)
avsc = File.join(dir, "#{SCHEMA_NAME}.avsc")
File.write(avsc, SCHEMA)
avro = AvroTurf::Messaging.new(registry_url: SCHEMA_REGISTRY_URL, schemas_path: dir)
# kafka-avro-console-consumer and Google BigQuery Sink Connecter require a subject in the format "<topic>-value"
msg = avro.encode(data, subject: "#{TOPIC}-value", schema_name: SCHEMA_NAME)
kafka.deliver_message(msg, topic: TOPIC)
end
Kafka トピックにデータが入ったら Google BigQuery Sink Connector が BigQuery のテーブルを作成します。
BigQuery が作成したテーブルと Google BigQuery Sink Connector が作成したテーブルを比較するには次のようにスキーマ情報の diff を取ればよいです。
diff -u \
<(bq show --schema --format prettyjson <created_by_bigquery>) \
<(bq show --schema --format prettyjson <created_by_connector>)
diff を取ると union type の各型を表すキーの命名規則が異なっていることがわかります。