Avro でスキーマ管理されたデータを BigQuery で取り込めるデータに変換する

Kafka に入っている Avro フォーマット(スキーマは Schema Registry で管理されている)のデータを BigQuery に取り込みたいとします。その場合、おそらく Google BigQuery Sink Connector を使うのが良いでしょう。
ところが、諸事情により Ruby で Kafka からデータを逐次取得し、BigQuery にデータを取り込みたいとします。その場合、fluentd のある環境であれば、取り出したデータを fluentd に送り、fluent-plugin-bigquery で取り込むのが手軽でしょう。

さて、ここで問題になるのは次の 2 点です。

  • Avro の map はどう表現するのか?
  • Avro の union type はどう表現するのか?

ざっくりした仕様は Avro conversions に載っていて、単純なスキーマであれば変換するのは簡単なんですが、データがネストしている場合なども考慮し出すと変換処理が非常に複雑になります。というわけで、Avro のスキーマ情報を基に Hash オブジェクトを BigQuery で取り込めるデータに変換する gem を作りました。

https://github.com/abicky/typed_data

使用例

README に書いてあるとおりですが、次のように Converter にスキーマ情報を渡して、convert メソッドに Hash オブジェクトを渡せばよしなに変換してくれます。

require "typed_data"

schema = {
  "name" => "Record",
  "type" => "record",
  "fields" => [
    {
      "name" => "int_field",
      "type" => "int",
    },
    {
      "name" => "int_or_string_field",
      "type" => ["int", "string"],
    },
    {
      "name" => "array_field",
      "type" => {
        "type" => "array",
        "items" => "int",
      },
    },
    {
      "name" => "union_type_array_field",
      "type" => {
        "type" => "array",
        "items" => ["int", "string"],
      },
    },
    {
      "name" => "nested_map_field",
      "type" => {
        "type" => "map",
        "values" => {
          "type" => "map",
          "values" => ["int", "string"],
        },
      },
    },
  ],
}

converter = TypedData::Converter.new(schema)
converter.convert({
  "int_field" => 1,
  "int_or_string_field" => "string",
  "array_field" => [1, 2],
  "union_type_array_field" => [1, "2"],
  "nested_map_field" => {
    "nested_map" => {
      "key1" => 1,
      "key2" => "2",
    },
  },
})

上記のコードだと、最終的に次のオブジェクトに変換されます。

{"int_field"=>1,
 "int_or_string_field"=>{"int_value"=>nil, "string_value"=>"string"},
 "array_field"=>[1, 2],
 "union_type_array_field"=>
  [{"int_value"=>"1", "string_value"=>nil},
   {"int_value"=>nil, "string_value"=>"2"}],
 "nested_map_field"=>
  [{"key"=>"nested_map",
    "value"=>
     [{"key"=>"key1", "value"=>{"int_value"=>"1", "string_value"=>nil}},
      {"key"=>"key2", "value"=>{"int_value"=>nil, "string_value"=>"2"}}]}]}

この形式は、実際に Avro フォーマットのデータを作成し、BigQuery に取り込んだ場合に作成されるテーブルのレコードと同じ形式です。
cf. spec/typed_data/converter_spec.rb#L15-L60

最終的に Google BigQuery Sink Connector でデータを取り込むように切り替える予定の場合、union type を表現する際のフォーマットを次のように変更すると良いです。

converter = TypedData::Converter.new(schema)
converter.union_type_key_formatter = ->(type) { type.split("_").first }

これによって、最初の例のデータは次のように変換されます。

{"int_field"=>1,
 "int_or_string_field"=>{"int"=>nil, "string"=>"string"},
 "array_field"=>[1, 2],
 "union_type_array_field"=>
  [{"int"=>"1", "string"=>nil}, {"int"=>nil, "string"=>"2"}],
 "nested_map_field"=>
  [{"key"=>"nested_map",
    "value"=>
     [{"key"=>"key1", "value"=>{"int"=>"1", "string"=>nil}},
      {"key"=>"key2", "value"=>{"int"=>nil, "string"=>"2"}}]}]}

gem を開発した理由

BigQuery は Avro のデータを直接取り込むことができるので、以下のような選択肢もありました。

fluent-plugin-avro を使うと 1 レコードずつ Avro フォーマットに変換するので、1 レコードずつスキーマ情報を持つことになって微妙かなと思いやめました。あと、試してないですが、1 レコードずつ Avro フォーマットに変換すると、複数レコードをまとめて取り込むのが難しくなるんじゃないかと思います。

fluent-plugin-bigquery を改修するのに関しては、Avro のスキーマ情報から BigQuery のテーブルを作成したいニッチな人のためにわざわざ変更を入れるのは微妙だと思ってやめました。BigQuery に送るデータサイズが小さくなることぐらいメリットがないでしょうし。

また、そもそも、Avro フォーマットのデータを取り込むことで作成されたテーブルだと Google BigQuery Sink Connector が使えないはずなので、それも微妙だと思いました。

そんなわけで、今回 gem を作ることにしました。変換規則が自分の中で整理しきれてないこともあって、どういう結果になるのか理解するのが難しい箇所もありますが、テストを厚めに書いてあるので大丈夫でしょう。

そのうち次の機能も提供したいと思っています。

  • Avro のスキーマから BigQuery のスキーマを生成するコマンドの提供
  • 変換したオブジェクトを元に戻すコマンドの提供
    • bq query --format json の出力結果を入力に JSON に変換できると読みやすいかなと

おまけ: Google BigQuery Sink Connector のフォーマットの確認方法

まず、Kafka に環境構築方法ですが、https://github.com/confluentinc/cp-docker-images の examples 以下に用途に応じた docker-compose.yml の例があるので、それを使うとサクッと環境構築できると思いますが、macOS の場合は次のような手順でも Google BigQuery Sink Connector を試せる環境を構築できます。

brew install confluent-platform
sudo install -d -m 0755 -o $USER -g admin $(brew --prefix)/logs
zookeeper-server-start -daemon $(brew --prefix)/etc/kafka/zookeeper.properties
kafka-server-start -daemon $(brew --prefix)/etc/kafka/server.properties
schema-registry-start -daemon $(brew --prefix)/etc/schema-registry/schema-registry.properties
cd $(brew --prefix)/bin/
confluent-hub install wepay/kafka-connect-bigquery:latest

Kafka の起動や connector のインストールが終わったら、$(brew --prefix)/share/confluent-hub-components/wepay-kafka-connect-bigquery/etc/connector.properties の次の項目を変更すれば最低限動かすことができます。

  • topics
  • project
  • datasets
  • keyfile

設定を変更したら起動します。

connect-standalone \
  $(brew --prefix)/etc/schema-registry/connect-avro-standalone.properties \
  $(brew --prefix)/share/confluent-hub-components/wepay-kafka-connect-bigquery/etc/connector.properties

Kafka にデータを送るのは kafka-avro-console-producer を使うのが手軽でしょうが、例えば Ruby だと次のようなコードで送ることができます。

require 'tmpdir'

require 'avro_turf/messaging'
require 'kafka'

CLIENT_ID = ENV["CLIENT_ID"]
TOPIC = ENV["TOPIC"]
SCHEMA_NAME = ENV["SCHEMA_NAME"]
BROKERS = ['localhost:9092']
SCHEMA_REGISTRY_URL = 'http://localhost:8081'

SCHEMA = <<-JSON
{
   ...
}
JSON

data = {
  ...
}

Dir.mktmpdir do |dir|
  kafka = Kafka.new(BROKERS, client_id: CLIENT_ID, logger: logger)

  avsc = File.join(dir, "#{SCHEMA_NAME}.avsc")
  File.write(avsc, SCHEMA)

  avro = AvroTurf::Messaging.new(registry_url: SCHEMA_REGISTRY_URL, schemas_path: dir)

  # kafka-avro-console-consumer and Google BigQuery Sink Connecter require a subject in the format "<topic>-value"
  msg = avro.encode(data, subject: "#{TOPIC}-value", schema_name: SCHEMA_NAME)
  kafka.deliver_message(msg, topic: TOPIC)
end

Kafka トピックにデータが入ったら Google BigQuery Sink Connector が BigQuery のテーブルを作成します。

BigQuery が作成したテーブルと Google BigQuery Sink Connector が作成したテーブルを比較するには次のようにスキーマ情報の diff を取ればよいです。

diff -u \
  <(bq show --schema --format prettyjson <created_by_bigquery>) \
  <(bq show --schema --format prettyjson <created_by_connector>)

diff を取ると union type の各型を表すキーの命名規則が異なっていることがわかります。