Druid Batch Ingestion (type="index_hadoop") running in local mode - Increase memory settings?

I am current running my Druid (0.8.3) Batch Ingestion in “local” mode due to library incompatibilities with the Hadoop Distribution I am using (Hortanworks Ambari HDP 2.4.0 - Hadoop 2.7.1 ish).

Unfortunately, I am getting the following OutOfMemoryError exception due to the larger amount of data I am trying to index. How do I increase the memory settings for this task?

2016-06-08T01:42:07,991 ERROR [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_datasource01_2016-06-07T23:18:03.684Z, type=

index_hadoop, dataSource=datasource01}]

java.lang.RuntimeException: java.lang.reflect.InvocationTargetException

at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]

at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:138) ~[druid-indexing-service-0.8.3.jar:0.8.3]

at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:206) ~[druid-indexing-service-0.8.3.jar:0.8.3]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:285) [druid-indexing-service-0.8.3.jar:0.8.3]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:265) [druid-indexing-service-0.8.3.jar:0.8.3]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_71]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_71]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_71]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_71]

Caused by: java.lang.reflect.InvocationTargetException

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_71]

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_71]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_71]

at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_71]

at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:135) ~[druid-indexing-service-0.8.3.jar:0.8.3]

… 7 more

Caused by: java.lang.OutOfMemoryError: Java heap space

``

Note: In this case I am ingesting 2.7GB of csv data, and this process which utilizes a local running directory (/tmp/hadoop-) that grows to around 18GB at it’s peak.

Background

Remote Hadoop Cluster ( Specifically Map/Reduce ) Issues:

Could the answer be as simple as updating the Java -Xmx Option for the following Druid property?

druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC

``

Would setting the job properties of my Druid Batch Ingestion task be a way to achieve increased memory settings?

Foe example.

“tuningConfig”: {

“type”: “hadoop”,

“partitionsSpec”: {

“targetPartitionSize”: 0

},

“numBackgroundPersistThreads” : 1,

“jobProperties”: {

“mapreduce.map.memory.mb”: 2048,

“mapreduce.map.java.opts”: “-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8”,

“mapreduce.reduce.memory.mb”: 6144,

“mapreduce.reduce.java.opts”:"-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8"

}

}

``

A couple of links that suggest this could work:

This is what I have discovered.

I noticed that if I set the Druid logging level to INFO, and viewed the running Druid task logs, a map reduce property was logged (“mapreduce.task.io.sort.mb”). By setting the relevant configuration values and executing a simple Druid Batch Ingestion task, this logging output allowed me to easily discover if my Hadoop property override was successful.

Druid Batch Ingestion Task - Local MapReduce Default Value:

2016-06-10T20:16:52,308 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - mapreduce.task.io.sort.mb: 100

``

Basically there are a couple of different levels of precedence. If the three values given below are set, the “jobProperties” value will be chosen if it exists, than the “druid.indexer.fork.property” value if it exists, then the “druid.indexer.running.javaOpts” value if it exists, otherwise the default value will be used.

Level 1 (Highest): In the Druid Batch Ingestion task:

“tuningConfig”: {

“type”: “hadoop”,

“partitionsSpec”: {

“targetPartitionSize”: 0

},

“numBackgroundPersistThreads” : 1,

“useCombiner”: true,

“jobProperties”: {

“mapreduce.task.io.sort.mb”: 201

}

}

``

Level 2: In the Druid configuration (druid/config/_common/common.runtime.properties file):

druid.indexer.fork.property.hadoop.mapreduce.task.io.sort.mb=202

``

Level 3: In the Druid configuration (druid/config/_common/common.runtime.properties file):

druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -Dhadoop.mapreduce.task.io.sort.mb=203

``

Druid hadoop based ingestion simply sends a job to a Hadoop cluster. The bottleneck is likely on the hadoop cluster. Local hadoop ingestion isn’t really designed to handle ingesting a large volume of data and a remote hadoop cluster is required. What is the error you are seeing with HDP?

For now the “java.lang.OutOfMemoryError: Java heap space” exception I was receiving seems to have been addressed with the following configuration. I used http://druid.io/docs/latest/configuration/hadoop.html for inspiration.

“tuningConfig” : {

“type” : “hadoop”,

“partitionsSpec” : {

“targetPartitionSize” : 0

},

“numBackgroundPersistThreads” : 1,

“useCombiner” : true,

“jobProperties” : {

“mapreduce.map.memory.mb” : 2048,

“mapreduce.map.java.opts” : “-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8”,

“mapreduce.reduce.memory.mb” : 6144,

“mapreduce.reduce.java.opts” : “-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8”,

“mapreduce.job.reduces” : 21,

“mapreduce.job.jvm.numtasks” : 20,

“mapreduce.reduce.shuffle.parallelcopies” : 50,

“mapreduce.reduce.shuffle.input.buffer.percent” : 0.5,

“mapreduce.task.io.sort.mb” : 250,

“mapreduce.task.io.sort.factor” : 100,

“mapreduce.jobtracker.handler.count” : 64,

“mapreduce.tasktracker.http.threads” : 20,

“mapreduce.output.fileoutputformat.compress” : false,

“mapreduce.output.fileoutputformat.compress.type” : “BLOCK”,

“mapreduce.output.fileoutputformat.compress.codec” : “org.apache.hadoop.io.compress.DefaultCodec”,

“mapreduce.map.output.compress” : true,

“mapreduce.map.output.compress.codec” : “org.apache.hadoop.io.compress.DefaultCodec”,

“mapreduce.map.speculative” : false,

“mapreduce.reduce.speculative” : false,

“mapreduce.task.timeout” : 1800000

}

}

``

Answers duplicated with https://groups.google.com/forum/#!topic/druid-user/SFYlum_wu38.

Do not use local hadoop ingestion for anything beyond a small POC data set and expect good performance.