Changing the number of reducers for Hadoop-based batch Ingestion

Hi,

I am trying to ingest data into Druid using the Hadoop Ingestion method. The total size of the data to be ingested varies between 15 - 28 GB and the segment size varies from 1.5 - 3 GB.

In the “jobProperties” field of the ingestion task template, we can configure the number of reducers (“mapreduce.job.reduces” field). But when I run the ingestion job,I find(from the logs) that the number of reducers is not changing; it is fixed at 1. Also, the job does not finish, it gets stuck at 90-95 % after 8-10 minutes.

Is there some configuration etc. which has to be changed/set?

And do I need to partition the segments using hash based partitioning? If yes, how do I decide the target partition size?

Here is a sample ingestion template :

{

“type”: “index_hadoop”,

“id” : “THIS_WILL_BE_REPLACED_RUNTIME”,

“spec”: {

“dataSchema”: {

“dataSource”: “druid_ingest”,

“parser”: {

“type”: “hadoopyString”,

“parseSpec”: {

“format”: “jsonLowercase”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: [

“dim_1”,

“dim_2”,

… “dim_n”,

],

“dimensionExclusions”: [

“timestamp”

]

}

}

},

“metricsSpec”: [

{

“type”: “count”,

“name”: “records”

},

{

“type”: “doubleSum”,

“name”: “metric_1”,

“fieldName”: “m1”

},

… {

“type”: “doubleSum”,

“name”: “metric_n”,

“fieldName”: “m_n”

}

],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “HOUR”,

“queryGranularity” : “NONE”,

“intervals” : [ “2016-09-24T22:00:00.000-07:00/2016-09-24T23:00:00.000-07:00” ]

}

},

“ioConfig” : {

“type” : “hadoop”,

“inputSpec” : {

“type” : “static”,

“paths” :

}

},

“tuningConfig” : {

“type”: “hadoop”,

“jobProperties” : {

“hdp.version”: “2.2.9.0-3393”,

“mapreduce.job.user.classpath.first”: “true”,

“mapreduce.map.memory.mb” : 2048,

“mapreduce.map.java.opts” : “-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8”,

“mapreduce.reduce.memory.mb” : 6144,

“mapreduce.reduce.java.opts” : “-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8”,

“mapreduce.job.reduces” : 21,

“mapreduce.job.jvm.numtasks” : 20,

“mapreduce.reduce.shuffle.parallelcopies” : 50,

“mapreduce.reduce.shuffle.input.buffer.percent” : 0.5,

“mapreduce.task.io.sort.mb” : 250,

“mapreduce.task.io.sort.factor” : 100,

“mapreduce.jobtracker.handler.count” : 64,

“mapreduce.tasktracker.http.threads” : 20,

“mapreduce.output.fileoutputformat.compress” : false,

“mapreduce.output.fileoutputformat.compress.type” : “BLOCK”,

“mapreduce.output.fileoutputformat.compress.codec” : “org.apache.hadoop.io.compress.DefaultCodec”,

“mapreduce.map.output.compress” : true,

“mapreduce.map.output.compress.codec” : “org.apache.hadoop.io.compress.DefaultCodec”,

“mapreduce.map.speculative” : false,

“mapreduce.reduce.speculative” : false,

“mapreduce.task.timeout” : 1800000

}

}

},

“hadoopDependencyCoordinates”: [“org.apache.hadoop:hadoop-client:2.6.0.2.2.9.9-2”]

}

Hi,

First note that the job property “mapreduce.job.reduces” will not change the number of reducer.

Druid will set the number of reducer based on the strategy of partitionSpec that the user provide.

User can provide a targetPartitionSize (number of row per one physical segment) and druid will compute the exact number of reducer by counting the number of rows then divided by the provided partition size.

usually you want to have about 5M row per one segment.

Otherwise if you really want to set your self the number of partition hence the number of reducer you need to provide numShards instead.

So in your case i can see that you are going with the default one which is targetPartitionSize = 5M. I would recommend to change that to a smaller value and it will increase the number of reducer.

Also i see that you are providing a queryGranularity=NONE, please note that this can lead to very slow ingestion process. In fact druid can achieve significant boost by rolling up the data to a minute level for instance.

Hi,

Apologies for replying so late. And decreasing the targetpartitionsize did work. So, thanks for that. But how do I find out what should be the ideal Target partition size for my scenario such that the code takes minimum time to run ?

That’s a very interesting question and the best answer is to start with the recommended one then benchmark it based on your own platform, hardware config, data size and access patterns etc…

That’s a very interesting question and the best answer is to start with the recommended one then benchmark it based on your own platform, hardware config, data size and access patterns etc…

B-Slim
///_///_///_///_///__

Hi,

Apologies for replying so late. And decreasing the targetpartitionsize did work. So, thanks for that. But how do I find out what should be the ideal Target partition size for my scenario such that the code takes minimum time to run ?

You received this message because you are subscribed to the Google Groups “Druid User” group.

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/e115e74e-a05c-4d51-89de-6fa42f4ad764%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Okay…Is there a correlation between increasing the number of shards and query performance? Right now, with my segment size of approx. 9- 10 million rows, the number of shards being created is between 19-21 and running time for ingestion is 8-13 mins. But I am not sure whether increasing the number of shards any further would be a good idea as far as query latency is concerned.