OOM in reduce, how to investigate?

Hi,

The second job run by hadoop indexer (not the partition determiner; the indexing job itself) is hit by memory, I attached log below.

Pls hint why it may happen and how to investigate further.

Also, I had issues with the load average on machines when multiple tasks where run on the same machine.

What is the reccomended memory setting?

Currently I have in yarn-site.xml:

yarn.nodemanager.resource.memory-mb

6192

yarn.scheduler.minimum-allocation-mb

4096

yarn.nodemanager.resource.cpu-vcores

4

nodemanager.resource.io-spindles

1

I did this only because allowing more tasks per node raised load avg to 20 or more (on a 4-core machine). And eventually the master “lost” nodes and job failed (a previous job run, not this one).

015-11-19 15:52:19,470 INFO [main] io.druid.indexer.HadoopDruidIndexerConfig: Running with config:

{

“spec” : {

“dataSchema” : {

“dataSource” : “impression”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “date_time”,

“format” : “yyyy-MM-dd HH:mm:ss”,

“missingValue” : null

},

“dimensionsSpec” : {

“dimensions” : [ “OAGEO”, “adData”, “ad_id”, “area_code”, “browser”, “campid”, “cbs”, “channel”, “channel_ids”, “cid”, “city”, “client”, “continent”, “country”, “date_time”, “device”, “device_type”, “dma”, “domain”, “f37”, “f38”, “f39”, “geo_netspeed”, “geo_organisation”, “host_name”, “https”, “ip”, “language”, “lat”, “listenerId”, “lng”, “loc”, “max_https”, "oaid ", “operating_system”, “path”, “pname”, “postal_code”, “referer”, “region”, “search_term”, “sessionId”, “session_id”, “spot_time”, “subregion”, “user_agent”, “view_key”, “viewer_id”, “zero”, “zone_id” ],

“dimensionExclusions” : ,

“spatialDimensions” :

}

}

},

“metricsSpec” : [ {

“type” : “count”,

“name” : “count”

} ],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “DAY”,

“queryGranularity” : {

“type” : “none”

},

“intervals” : [ “2015-01-01T00:00:00.000Z/2015-01-15T00:00:00.000Z” ] }

},

“ioConfig” : {

“type” : “hadoop”,

“inputSpec” : {

“type” : “static”,

“paths” : “hdfs://hadoop-master:54310/data/impressions-out*/_temporary/0/task*/part-0000*.gz”

},

“metadataUpdateSpec” : null,

“segmentOutputPath” : “hdfs://hadoop-master:54310/druid/impression”

},

“tuningConfig” : {

“type” : “hadoop”,

“workingPath” : “/tmp/druid-indexing”,

“version” : “2015-11-19T15:47:36.436Z”,

“partitionsSpec” : {

“type” : “hashed”,

“targetPartitionSize” : 5000000,

“maxPartitionSize” : 7500000,

“assumeGrouped” : false,

“numShards” : -1

},

“shardSpecs” : {

“2015-01-01T00:00:00.000Z” : [ {

“actualSpec” : {

“type” : “none”

},

“shardNum” : 0

} ],

“2015-01-02T00:00:00.000Z” : ,

“2015-01-03T00:00:00.000Z” : ,

“2015-01-04T00:00:00.000Z” : ,

“2015-01-05T00:00:00.000Z” : ,

“2015-01-06T00:00:00.000Z” : ,

“2015-01-07T00:00:00.000Z” : ,

“2015-01-08T00:00:00.000Z” : ,

“2015-01-09T00:00:00.000Z” : ,

“2015-01-10T00:00:00.000Z” : ,

“2015-01-11T00:00:00.000Z” : ,

“2015-01-12T00:00:00.000Z” : ,

“2015-01-13T00:00:00.000Z” : ,

“2015-01-14T00:00:00.000Z” :

},

“indexSpec” : {

“bitmap” : {

“type” : “concise”

},

“dimensionCompression” : null,

“metricCompression” : null

},

“leaveIntermediate” : false,

“cleanupOnFailure” : true,

“overwriteFiles” : false,

“ignoreInvalidRows” : false,

“jobProperties” : { },

“combineText” : false,

“persistInHeap” : false,

“ingestOffheap” : false,

“bufferSize” : 134217728,

“aggregationBufferRatio” : 0.5,

“useCombiner” : false,

“rowFlushBoundary” : 300000

}

}

}

2015-11-19 15:52:56,950 ERROR [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded

at org.apache.hadoop.io.Text.setCapacity(Text.java:268)

at org.apache.hadoop.io.Text.readWithKnownLength(Text.java:318)

at org.apache.hadoop.io.Text.readFields(Text.java:291)

at org.apache.hadoop.io.ArrayWritable.readFields(ArrayWritable.java:96)

at io.druid.indexer.InputRowSerde.fromBytes(InputRowSerde.java:154)

at io.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:557)

at io.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:462)

at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)

at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)

at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)

at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)

at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

To fix the OOM error on the reducer side,you can try these approaches

  1. Decrease the value of “rowFlushBoundary”

  2. Increase the heap of the reducer which can be done by setting “jobProperties” in the “tuningConfig” section of the task spec.

eg:

“jobProperties” : {“mapreduce.reduce.memory.mb”:6144,“mapreduce.reduce.java.opts”:"-Xmx4096m"}

Thanks

Rohit

hi,

in addition to rohit’s suggestions, it may be the case that your reducer ends up getting a lot of data , you should decrease the values for

"targetPartitionSize" : 5000000,
      "maxPartitionSize" : 7500000,

so that there are more partitions created reducing amount of data going to individual reducer.

see [http://druid.io/docs/0.8.2/ingestion/batch-ingestion.html](http://druid.io/docs/0.8.2/ingestion/batch-ingestion.html)

-- Himanshu

Thanks, works like a breeze!
The only thing, it is slow, meaning that with one task per server (so the load avg does not exceed 2*cores on I/O intensive moments and nodes are no longer “lost” from the cluster), to import 3-6 days takes 2-3 days. I did not compare yet with a pig solution or spark/hbase/impala, but for sure neither Druid nor Pinot do not seem to be among the cheap-ingestion solutions, which does make sense considering the indexing it happens. I import raw events, with no metric other than count. Importing 130 MB took 25 minutes (in single-task mode).

Hi,

If your events are merged by Druid, that is, if combination of your dimension values and truncated timestamps is not unique for each event then setting “useCombiner” to true would make things better. (given that you are using druid-0.8.1 or above).

– Himanshu