partitionSpec is producing very different segment sizes although the data is even for dimension

Hello,

I have a process that dumps weekly aggregates from a hadoop cluster and dumps a TSV on the HDFS. I then ingest that data into Druid using a hadoop batch ingestion so it can be delivered to our different UIs with the known snappiness of Druid.

The columns are already aggregated and are of relatively low cardinality (biggest dimension is around 300) except for the column I am partitioning on which is higher (6785). I am using the assumeGrouped = true property and the resulting segments I am getting vary from 75MB to 1.5GB!

Could you please point me to the right direction to get more evenly distributed segments? Thanks!

Info follows:

Druid version 0.9.1.1

The ingestion runs in our cluster of 30 nodes.

Here is the ingestion json that I submit:

{
“type”: “index_hadoop”,
“spec”: {
“dataSchema”: {
“dataSource”: “wcml_audience_metrics”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “tsv”,
“timestampSpec”: {
“column”: “period_start_date”,
“format”: “yyyy-MM-dd HH:mm:ss”
},
“columns”: [
“publisher_id”,
“publisher_type”,
“period_type”,
“period_start_date”,
“period_end_date”,
“gender”,
“geo_area_code”,
“daypart_key”,
“age”,
“m1”,
“m2”
],
“dimensionsSpec”: {
“dimensions”: [
“publisher_id”,
“publisher_type”,
“period_type”,
“period_start_date”,
“period_end_date”,
“gender”,
“geo_area_code”,
“daypart_key”,
“age”
]
}
}
},
“metricsSpec”: [
{
“type”: “doubleSum”,
“name”: “m1”,
“fieldName”: “m1”
},
{
“type”: “longSum”,
“name”: “m2”,
“fieldName”: “m2”
},
{
“name”: “count”,
“type”: “count”
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “week”,
“queryGranularity”: “week”,
“intervals”: [
“2016-08-11/2016-08-19”
]
}
},
“ioConfig”: {
“type”: “hadoop”,
“inputSpec”: {
“type”: “static”,
“paths”:"/user/cnaggar/datamart_2016-08-11_201612160005/part-*"
}
},
“tuningConfig”: {
“partitionsSpec”: {
“type”: “hashed”,
“numShards”: 16,
“partitionDimensions”:[“publisher_id”],
“assumeGrouped”: true
},
“type”: “hadoop”,
“ignoreInvalidRows”: false,
“buildV9Directly”: true,
“useCombiner”: true,
“jobProperties”: {
“mapreduce.job.reduces”:21,
“mapreduce.job.jvm.numtasks”:20,
“mapreduce.map.memory.mb”:2048,
“mapreduce.map.java.opts”:"-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
“mapreduce.reduce.memory.mb”:25500,
“mapreduce.reduce.java.opts”: “-server -Xmx10024m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps”,
“mapreduce.reduce.shuffle.parallelcopies”:50,
“mapreduce.reduce.shuffle.input.buffer.percent”:0.5,
“mapreduce.task.io.sort.mb”:256,
“mapreduce.task.io.sort.factor”:100,
“mapreduce.output.fileoutputformat.compress”:false,
“mapreduce.map.output.compress”:true,
“mapreduce.output.fileoutputformat.compress.type”:“BLOCK”,
“mapreduce.map.output.compress.codec”:“org.apache.hadoop.io.compress.Lz4Codec”,
“mapreduce.output.fileoutputformat.compress.codec”:“org.apache.hadoop.io.compress.GzipCodec”,
“mapreduce.map.speculative”:false,
“mapreduce.reduce.speculative”:false,
“mapreduce.job.user.classpath.first”:true,
“mapreduce.job.queuename”: “processing”

  }
},
"hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.6.0"]

}
}

What happens if you set “assumeGrouped” : false? Maybe your data is not actually grouped in the way that Druid expects.

Hi Gian,

Thanks a lot for your reply.

I tried again today with the flag set to false and I have the same results :frowning:

Hi,
I guess that the data may not be evenly distributed with publisher_id as the partition key and that could be the reason for the hashed partition strategy to not distribute the data evenly. Try specifying more dimensions in partitionDimensions which will make the data distribution more event.

Will try and let you know. Thanks!