Pigで累積和を算出する

Hadoop 上の膨大なデータに対して累積和を算出したいことってあると思います。
例えば、ある日からの経過日数ごとの累積売上額とか。
Pig では SUM と違って累積和を求める UDF は組み込み関数レベルでサポートされていません。
というわけで、Pig で累積和を求める方法を検討してみました。

事前準備

今回はこんな感じのデータを扱います。

data

a	a	1	1000000000
b	a	2	8000000000
a	b	2	4000000000
a	b	1	3000000000
b	b	2	10000000000
a	a	2	2000000000
a	c	1	5000000000
b	a	1	7000000000
b	b	1	9000000000
a	c	2	6000000000

1列目と2列目がキーになっていて、3列目が順番、4列目が値です。
キーが同じデータに関して、3列目が2の場合は1の値と2の値を足す、ということがしたいです。

Pig のみで頑張ってみる

今までは無理矢理こんな感じでやっていました。
cumsum.pig

A = LOAD 'data' AS (key1: chararray, key2: chararray, num: int, value: long);

SPLIT A INTO
        B1 IF num == 1,
        B2 IF num <= 2;

C = UNION (FOREACH B1 GENERATE *, 1 AS key3), (FOREACH B2 GENERATE *, 2);

D = FOREACH (GROUP C BY (key1, key2, key3)) GENERATE
        FLATTEN(group),
        SUM(C.value);

DUMP D;

実行してみます。

$ pig -x local cumsum.pig
(a,a,1,1000000000)
(a,a,2,3000000000)
(a,b,1,3000000000)
(a,b,2,7000000000)
(a,c,1,5000000000)
(a,c,2,11000000000)
(b,a,1,7000000000)
(b,a,2,15000000000)
(b,b,1,9000000000)
(b,b,2,19000000000)

うん、問題なく算出できてますね。

ただ、このやり方には2つの問題点があります。

  1. num(3列目)の取りうる値が増える度に修正しないといけない
  2. num の取りうる値の数に比例して扱うデータサイズが増える

これはひどいですよね!!

STREAM ステートメントを使ってみる

STREAM ステートメントは Hadoop Streaming みたいに外部プログラムにデータを渡すことのできる仕組みです。
つまり、いちいち Java でコーディングしなくてもいいので、手軽に自由度の高い処理を実装することが可能です。

Perl で累積和を算出するコードを書くとこんな感じでしょうか。
cumsum.pl

例えば次のように実行すればインストールできます。

$ wget https://raw.github.com/gist/1611406 -O cumsum  # ダウンロード
$ chmod +x cumsum  # 実行権限付与
$ sudo mv cumsum /usr/local/bin  # パスの通っているところに移動

使い方はこんな感じです。

$ sort data | cumsum --keys 0 1 --values 3  # sort は厳密には sort data -k1,2 -k3n
a	a	1	1000000000
a	a	2	3000000000
a	b	1	3000000000
a	b	2	7000000000
a	c	1	5000000000
a	c	2	11000000000
b	a	1	7000000000
b	a	2	15000000000
b	b	1	9000000000
b	b	2	19000000000

ポイントはキーと順番でソートすることです。Hadoop を利用する上での基本ですね。

Pig に適用しようと思うと次のようになります。
cumsum_using_stream.pig

A = LOAD 'data' AS (key1: chararray, key2: chararray, num: int, value: long);

B = FOREACH (GROUP A BY (key1, key2)) {
        A1 = ORDER A BY num;
        GENERATE
                FLATTEN(A1);
}

DEFINE cumsum `cumsum --keys 0 1 --values 3` ship('/usr/local/bin/cumsum');
C = STREAM B THROUGH cumsum AS (key1: chararray, key2: chararray, num: int, value: long);

DUMP C;

実行してみます。

$ pig -x local cumsum_using_stream.pig
(a,a,1,1000000000)
(a,a,2,3000000000)
(a,b,1,3000000000)
(a,b,2,7000000000)
(a,c,1,5000000000)
(a,c,2,11000000000)
(b,a,1,7000000000)
(b,a,2,15000000000)
(b,b,1,9000000000)
(b,b,2,19000000000)

Pig のみで無理矢理処理した場合と同じ結果になっていますね!!
※もっと複雑で 10GB 程度あるデータでも所望の値が得られたので大丈夫だと思います

UDFないの?

ここまで書いていて思ったんですが、公式でサポートされていなくても誰かがUDFを書いてそうですよね。

・・・ありました・・・・・
UDF for cumulative statistics - ASF JIRA

ただ、詳細はこちらってかいてあるとこのリンクを辿ろうとしてもアクセスできなかったので、今はもう手に入らないのかもしれません。
発想は Perl を使ったものと同じだと思います。

何はともあれ、Pig で累積和を算出するためのまとめでした!