Hadoop StreamingでHDFS上のレコード数を数える
ローカルファイルの行数を数える時は wc コマンドを使いますよね。
ふと、HDFS上のファイルの行数を数える時は何が良いのか考えてみました。
Pigだと次のような感じです。
$ pig -e "a = load 'inputdir'; b = group a all; c = foreach b generate COUNT_STAR(a); dump c"
これでも良さそうですが、レコードの数を数えるのが combiner と reducer なので、たいていの場合は mapper からの出力を一度ファイルに書き出す必要があり、膨大なファイルを扱う際に時間がかかります。
※グループ化する前に1つ目のフィールドだけを取り出すと若干速くなると思います
→ ファイルに書き出される前に combine 処理が行われるはずなのでファイルIOはほとんど問題じゃないみたいですがとりあえず時間がかかります
そこで、Hadoop Streaming を使って次のように実行してみると Pig に比べてかなり速くなります。
$ tempfile=$(mktemp -u)
$ hadoop jar hadoop-streaming.jar -D mapred.reduce.tasks=1 -mapper wc -reducer "awk '{sum += \$1} END {print sum}'" -input inputdir -output $tempfile
$ hadoop dfs -cat $tempfile/*
$ hadoop dfs -rmr $tempfile
まぁ MapReduce の思想に反したやり方な気はしますが・・・
そんなこんなで次のようなファイル作成して /usr/local/bin にでも置いておくと幸せになれるかもしれません。
wc_hdfs
#!/bin/bash
condition=""
fs="\t"
while getopts c:F: OPT; do
case $OPT in
c ) condition=$OPTARG;;
F ) fs=$OPTARG;;
esac
done
shift $(($OPTIND - 1))
if [ $# -ne 1 ]; then
echo "usage: wc_hdfs [-c condition [-F fs]] inputdir"
exit 1
fi
inputdir=$1
if [ -z "$condition" ]; then
mapper=wc
else
mapper="awk -F $fs '$condition {lines++; words += NF; chars += length(\$0) + 1} END {print lines, words, chars}'"
fi
tempfile=$(mktemp -u)
hadoop jar hadoop-streaming.jar -D mapred.reduce.tasks=1 \
-mapper "$mapper" \
-reducer "awk '{sum1 += \$1; sum2 += \$2; sum3 += \$3;} END {print sum1,sum2,sum3}'" \
-input $inputdir -output $tempfile >/dev/null &&
hadoop dfs -cat $tempfile/* &&
hadoop dfs -rmr $tempfile >/dev/null
設置して実際に使ってみましょう
$ chmod +x wc_hdfs
$ sudo mv wc_hdfs /usr/local/bin/
$ wc_hdfs inputdir
316262217 1265048448 9697306421
いい感じですね!!
2011/08/07 追記:
mapper に awk を指定してフィルタリングできるようにしました
$ wc_hdfs -c '$1 != ""' inputdir
316261899 1265047596 9697299107
※-Fを指定すればカンマ区切りのファイルにも適用できます
※単語数はnull(空文字列)も含まれてしまうのでwcの出力結果とは若干異なります
ちなみに Hadoop Streaming の公式ドキュメント には
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer /bin/wc
という記述が見られますが、これだとおそらくファイルIOに加えてネットワーク転送まで発生するので不適切だと思います。
※これを実行するとエラーで落ちるんですが・・・