Physical memory limits and targetPartitionSize?

I’ve noticed that if my targetPartitionSize is sufficiently high then I start receiving errors that I am running beyond physical memory limits and my job (ultimately) fails. Is someone able to explain the correlation between these?

“targetPartitionSize”: 400000

“mapreduce.map.java.opts”: “-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps”

“mapreduce.reduce.java.opts”: “-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps”

mapreduce.map.memory.mb=2048

mapreduce.reduce.memory.mb=2048

yarn.scheduler.minimum-allocation-mb=2048

yarn.scheduler.maximum-allocation-mb=6144

yarn.nodemanager.resource.memory-mb=100GB

I am unable to set the targetPartitionSize high enough to achieve the 500MB-1GB segment size mentioned in the documentation. So far I have managed to get a segment size of around 50-60 MB max.

Sample of a full error message:

2017-09-27T22:07:16,224 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1506543849478_0010_r_000001_0, Status : FAILED

Container [pid=2391751,containerID=container_e20_1506543849478_0010_01_000033] is running beyond physical memory limits. Current usage: 2.0 GB of 2 GB physical memory used; 4.4 GB of 4.2 GB virtual memory used. Killing container.

Dump of the process-tree for container_e20_1506543849478_0010_01_000033 :

  • PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  • 2391751 2391749 2391751 2391751 (bash) 3 3 108662784 301 /bin/bash -c /usr/java/jdk1.8.0_141/bin/java -server -XX:NewRatio=8 -Djava.net.preferIPv4Stack=true -Dhdp.version=2.6.2.0-205 -Xmx1638m -Duser.timezone=UTC -Djava.io.tmpdir=/disk-1/hadoop/yarn/local/usercache/druid/appcache/application_1506543849478_0010/container_e20_1506543849478_0010_01_000033/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/disk-8/hadoop/yarn/log/application_1506543849478_0010/container_e20_1506543849478_0010_01_000033 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Dyarn.app.mapreduce.shuffle.logger=INFO,shuffleCLA -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle -Dyarn.app.mapreduce.shuffle.log.filesize=0 -Dyarn.app.mapreduce.shuffle.log.backups=0 org.apache.hadoop.mapred.YarnChild 10.127.3.12 23239 attempt_1506543849478_0010_r_000001_0 21990232555553 1>/disk-8/hadoop/yarn/log/application_1506543849478_0010/container_e20_1506543849478_0010_01_000033/stdout 2>/disk-8/hadoop/yarn/log/application_1506543849478_0010/container_e20_1506543849478_0010_01_000033/stderr
  • 2392004 2391751 2391751 2391751 (java) 91296 2561 4653457408 525698 /usr/java/jdk1.8.0_141/bin/java -server -XX:NewRatio=8 -Djava.net.preferIPv4Stack=true -Dhdp.version=2.6.2.0-205 -Xmx1638m -Duser.timezone=UTC -Djava.io.tmpdir=/disk-1/hadoop/yarn/local/usercache/druid/appcache/application_1506543849478_0010/container_e20_1506543849478_0010_01_000033/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/disk-8/hadoop/yarn/log/application_1506543849478_0010/container_e20_1506543849478_0010_01_000033 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Dyarn.app.mapreduce.shuffle.logger=INFO,shuffleCLA -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle -Dyarn.app.mapreduce.shuffle.log.filesize=0 -Dyarn.app.mapreduce.shuffle.log.backups=0 org.apache.hadoop.mapred.YarnChild 10.127.3.12 23239 attempt_1506543849478_0010_r_000001_0 21990232555553

Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

Is anyone able to help me understand what might be going on here? I’m happy to provide more info if needed.

There are several settings that you need to balance here to get the appropriate segment size. You’ve captured several of them, but there is at least one more that is important: rowFlushBoundary (recently renamed to maxRowsInMemory) in the tuningConfig. We have a pipeline that generates daily intervals each composed of 60-70 segments for a total size of ~60g per day, so you can definitely get to were you want to be with the appropriate tuning.

Indexing in druid has been historically composed of a couple of pieces (I haven’t been paying much attention to the most recent release, so there may be improvements here that I’m not aware of yet, so apologies if I get something not exactly right). First is a job that determines partitions, followed by an optional job that groups the data, followed by the actual index generation. Typically its this last job that has the memory issues, as it is the one that actually needs to keep large amounts of data around.

For the index generation, the basic flow is to read a row of input, determine the bucket to place it in, and then aggregate the row into that bucket (stored in a hash table). This flow repeats until rowFlushBoundary(maxRowsInMemory) buckets are present, at which point the hash table is persisted to disc and the hash table is cleared. This repeats until all the input rows have been consumed and many temporary files have been created. Historically, this aggregation has been done on-heap, meaning that your mapreduce.reduce.java.opts setting for -Xmx must be large enough to contain maxRowsInMemory rows. For us, since some of our rows can be upwards of 1m per row (we have lots of sketches), that meant tuning maxRowsInMemory down significantly compared to its default value of 75000 AND raising the reducer java heap to 3.5g.

Once all the temporary files have been generated, the indexer goes through a process of reading them and merging them together, writing the result back out to disc. This process uses mmap to read the files, which means the file sizes will count against the hadoop container size, specified by mapreduce.reduce.memory.mb. The system needs space for both the file it is reading in and the file that it is writing back out, so that that into account when sizing the container.

Depending on the version of Druid you are using, buildV9Directly may or may not be set by default. If it is NOT set, then Druid builds a V8 index file in the prior set and needs to run a second pass to read in the file and write it back out in V9 format. Again, mmap is used for this process, and you need to have space for both the file being read and the one being written.

Finally, Java doesn’t have a supported way of actually releasing the memory reserved by mmap’ed files until the garbage collector cleans them up. This means that by default, all the mmap files that I’ve mentioned up to know effectively get leaked, requiring even larger amounts of container size to be allocated. Recent versions of Druid have added code that uses an undocumented interface in Oracle’s JVM to release the mmap memory prior to GC cleanup, allowing lower amounts of container memory to be required and avoiding random OOM errors.

For our configuration, (60+ segments @ 60g each), we use

mapreduce.reduce.java.opts=-Xmx3584m

mapreduce.reduce.memory.mb=8192

targetPartitionSize:1050000
rowFlushBoundary :150000

Indexing the same data into a different datasource that drops a number of the dimensions to produce smaller segments for more efficient queries when the other dimensions aren’t needed gives 2 segments @ 800m each. For this we use

mapreduce.reduce.java.opts=-Xmx3584m

mapreduce.reduce.memory.mb=8192
targetPartitionSize:812
rowFlushBoundary:1000
The reduced rowFlushBoundary and targetPartitionSize are required because the smaller slice actually has much larger rows due to how sketches are aggregated together.

A couple other things:

  1. We don’t bother setting any of the yarn.* properties.
  2. Be very sure you are using a 64-bit java in your hadoop environment, as you will quickly run into overflow situations with large amounts of data and a 32-bit java
    Will

One stupid correction to my previous comments: we are generating 60+ segments each of 800m, not 60g. 60g is the total size of all the segments together.

Will