Hadoop StreamingでHDFS上のレコード数を数える

ローカルファイルの行数を数える時は wc コマンドを使いますよね。
ふと、HDFS上のファイルの行数を数える時は何が良いのか考えてみました。

Pigだと次のような感じです。

$ pig -e "a = load 'inputdir'; b = group a all; c = foreach b generate COUNT_STAR(a); dump c"

これでも良さそうですが、レコードの数を数えるのが combiner と reducer なので、たいていの場合は mapper からの出力を一度ファイルに書き出す必要があり、膨大なファイルを扱う際に時間がかかります。
※グループ化する前に1つ目のフィールドだけを取り出すと若干速くなると思います
→ ファイルに書き出される前に combine 処理が行われるはずなのでファイルIOはほとんど問題じゃないみたいですがとりあえず時間がかかります

そこで、Hadoop Streaming を使って次のように実行してみると Pig に比べてかなり速くなります。

$ tempfile=$(mktemp -u)
$ hadoop jar hadoop-streaming.jar -D mapred.reduce.tasks=1 -mapper wc -reducer "awk '{sum += \$1} END {print sum}'" -input inputdir -output $tempfile
$ hadoop dfs -cat $tempfile/*
$ hadoop dfs -rmr $tempfile

まぁ MapReduce の思想に反したやり方な気はしますが・・・

そんなこんなで次のようなファイル作成して /usr/local/bin にでも置いておくと幸せになれるかもしれません。

wc_hdfs

#!/bin/bash

condition=""
fs="\t"

while getopts c:F: OPT; do
    case $OPT in
        c ) condition=$OPTARG;;
        F ) fs=$OPTARG;;
    esac
done
shift $(($OPTIND - 1))

if [ $# -ne 1 ]; then
    echo "usage: wc_hdfs [-c condition [-F fs]] inputdir"
    exit 1
fi
inputdir=$1

if [ -z "$condition" ]; then
    mapper=wc
else
    mapper="awk -F $fs '$condition {lines++; words += NF; chars += length(\$0) + 1} END {print lines, words, chars}'"
 fi

tempfile=$(mktemp -u)
hadoop jar hadoop-streaming.jar -D mapred.reduce.tasks=1 \
    -mapper "$mapper" \
    -reducer "awk '{sum1 += \$1; sum2 += \$2; sum3 += \$3;} END {print sum1,sum2,sum3}'" \
    -input $inputdir -output $tempfile >/dev/null &&
hadoop dfs -cat $tempfile/* &&
hadoop dfs -rmr $tempfile >/dev/null

設置して実際に使ってみましょう

$ chmod +x wc_hdfs
$ sudo mv wc_hdfs /usr/local/bin/
$ wc_hdfs inputdir
316262217 1265048448 9697306421

いい感じですね!!

2011/08/07 追記:
mapper に awk を指定してフィルタリングできるようにしました

$ wc_hdfs -c '$1 != ""' inputdir
316261899 1265047596 9697299107

※-Fを指定すればカンマ区切りのファイルにも適用できます
※単語数はnull(空文字列)も含まれてしまうのでwcの出力結果とは若干異なります

ちなみに Hadoop Streaming の公式ドキュメント には

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer /bin/wc

という記述が見られますが、これだとおそらくファイルIOに加えてネットワーク転送まで発生するので不適切だと思います。
※これを実行するとエラーで落ちるんですが・・・