Hive on Tez で Parquet を Snappy で圧縮すると起きる Container is running beyond physical memory limits. を解消する

Hive で次のようなエラーに遭遇することがあるかと思います。

Container [pid=10828,containerID=container_1548924716595_1044_01_000024] is running beyond physical memory limits. Current usage: 3.8 GB of 3 GB physical memory used; 6.4 GB of 15 GB virtual memory used. Killing container.

このエラーは意外に厄介で、ちょっとググっただけだと「設定ミスが原因です」的なものばかりで自分の求めている回答に辿り着けませんでした。
結論から言うと、次のように Hive のパラメータをよしなにセットしてやれば解消します。

-- yarn.nodemanager.resource.memory-mb / 論理コア数
SET hive.tez.container.size=3072;
-- parquet.block.size の 2 倍以上あれば良さそう
SET hive.tez.java.opts=-XX:MaxDirectMemorySize=256m;

これは何のエラー?

YARN では NodeManager がコンテナの状態を監視しているんですが、タスクに割り当てたメモリを超過した場合に出るエラーです。
cf. ContainersMonitorImpl.java#L510-L522

上記のコードを見ればわかるとおり、yarn.nodemanager.pmem-check-enabled が true の時しか起きないエラーなので、yarn.nodemanager.pmem-check-enabled を false にすれば解消すると説明している人もいます。まぁ間違ってはいない。

どうしてこのエラーが起きるか?

Hive で Tez を利用する場合、map タスクのコンテナも reduce タスクのコンテナもデフォルトでは mapreduce.map.memory.mb に指定されたメモリを割り当てます。
cf. DagUtils.java#L435-L443

そして、デフォルトでは Java のオプションとして -Xmx にコンテナのメモリの 0.8 倍の値が指定されます。
cf. DagUtils.java#L1301-L1312

つまり、ヒープ領域だけを使っていればプロセスのメモリ使用量がコンテナに割り当てたメモリ量を超えることはまずないわけです。
というわけで、ヒープ外のメモリを使っている箇所が怪しいわけですが、このエラーが起きる場合はたいていコンテナのログは次のような内容で終わっていることと、Hadoop, Hive, Tez, Parquet のリポジトリを ByteBuffer.allocateDirect で検索すると SnappyCompressor で direct buffer が使われていることから、SnappyCompressor がメモリを使いまくっていると考えられます。

2019-02-24 13:51:27,290 [INFO] [TezChild] |compress.CodecPool|: Got brand-new compressor [.snappy]
2019-02-24 13:51:27,291 [INFO] [TezChild] |write.ParquetRecordWriterWrapper|: real writer: org.apache.parquet.hadoop.ParquetRecordWriter@7b6ea8c3
2019-02-24 13:51:27,323 [INFO] [TezChild] |exec.FileSinkOperator|: FS[7]: records written - 1
2019-02-24 13:51:27,324 [INFO] [TezChild] |exec.FileSinkOperator|: FS[7]: records written - 10
2019-02-24 13:51:27,325 [INFO] [TezChild] |exec.FileSinkOperator|: FS[7]: records written - 100
2019-02-24 13:51:27,336 [INFO] [TezChild] |exec.FileSinkOperator|: FS[7]: records written - 1000
2019-02-24 13:51:27,429 [INFO] [TezChild] |exec.FileSinkOperator|: FS[7]: records written - 10000
2019-02-24 13:51:27,925 [INFO] [TezChild] |exec.FileSinkOperator|: FS[7]: records written - 100000

実際、Spark ではありますが、同じような悩みを抱えている人がいました。

[SPARK-4073] Parquet+Snappy can cause significant off-heap memory usage - ASF JIRA

解決策

前述の Spark の issue では -XX:MaxDirectMemorySize を指定すれば良いというアドバイスが出ていたのでそれを指定してみます。SnappyCompressor のコードと主な利用箇所をざっくり読んだ感じだと、parquet.block.size の 2 倍以上の direct buffer が確保できれば問題にはならないんじゃないかと思います。
cf. SnappyCompressor.java, ColumnChunkPageWriteStore.java#L89

そんなわけで、冒頭で言及したパラメータを指定することで解決するはずです。

-- yarn.nodemanager.resource.memory-mb / 論理コア数
SET hive.tez.container.size=3072;
-- parquet.block.size の 2 倍以上あれば良さそう
SET hive.tez.java.opts=-XX:MaxDirectMemorySize=256m;

hive.tez.java.opts だけを指定すれば良さそうなものですが、hive.tez.container.size を指定しないと無視する実装になっているので、両方指定しなければなりません。
cf. DagUtils.java#L471-L484

なお、hive.tez.container.size を指定しなければ mapreduce.map.memory.mb がコンテナのメモリ量を決めるのに使われるわけですが、Amazon Elastic MapReduce (EMR) を使っている場合、mapreduce.map.memory.mbyarn.nodemanager.resource.memory-mb / 論理コア数 にセットされます。

$ # 論理コア数
$ grep processor /proc/cpuinfo | wc -l
8
$ # yarn.nodemanager.resource.memory-mb
$ grep -A1 'yarn.nodemanager.resource.memory-mb' /etc/hadoop/conf/yarn-site.xml
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>24576</value>
$ # mapreduce.map.memory.mb
$ grep -A1 'mapreduce.map.memory.mb' /etc/hadoop/conf/mapred-site.xml
    <name>mapreduce.map.memory.mb</name>
    <value>3072</value>

yarn.nodemanager.resource.memory-mb / 論理コア数 より大きな値を指定するとコアが余ってしまうので、必要でない限りこれより大きな値は指定しない方が良いです。

参考