Kafka indexing task duplicate data issue

Hi!
I found an issue with kafka indexing in druid 0.10.0 . The issue is regarding rollup.
Here is an example to explain the issue,
I created a kafka topic with 3 partition and replication factor 3. Then I inserted below data with druid indexing specification task in which rollup is true (default value):
data:-
{“CREATED_TIMESTAMP” : “11052017010000”,“d1” : “dim1”,“d2” : “dim2”,“m1” : “20”,“m2” : “20”}
{“CREATED_TIMESTAMP” : “11052017010000”,“d1” : “dim1”,“d2” : “dim2”,“m1” : “20”,“m2” : “20”}
{“CREATED_TIMESTAMP” : “11052017010000”,“d1” : “dim1”,“d2” : “dim2”,“m1” : “20”,“m2” : “20”}
{“CREATED_TIMESTAMP” : “11052017010000”,“d1” : “dim1”,“d2” : “dim2”,“m1” : “20”,“m2” : “20”}
{“CREATED_TIMESTAMP” : “11052017010000”,“d1” : “dim1”,“d2” : “dim2”,“m1” : “20”,“m2” : “20”}
{“CREATED_TIMESTAMP” : “11052017010000”,“d1” : “dim1”,“d2” : “dim2”,“m1” : “20”,“m2” : “20”}
{“CREATED_TIMESTAMP” : “11052017010000”,“d1” : “dim1”,“d2” : “dim2”,“m1” : “20”,“m2” : “20”}
{“CREATED_TIMESTAMP” : “11052017010000”,“d1” : “dim1”,“d2” : “dim2”,“m1” : “20”,“m2” : “20”}
druid specification:
{
“type”: “kafka”,
“dataSchema”: {
“dataSource”: “TEST_DB1”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“timestampSpec”: {
“column”: “CREATED_TIMESTAMP”,
“format”: “ddMMyyyyHHmmss”
},
“dimensionsSpec”: {
“dimensions”: ,
“dimensionExclusions”: [
“CREATED_TIMESTAMP”
]
}
}
},
“metricsSpec”: [
{
“name”: “m1”,
“type”: “doubleMax”,
“fieldName”: “m1”
},
{
“name”: “m2”,
“type”: “doubleMax”,
“fieldName”: “m2”
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “HOUR”,
“queryGranularity”: “fifteen_minute”
}
},
“tuningConfig”: {
“type”: “kafka”,
“maxRowsPerSegment”: 10000000,
“maxRowsInMemory”: 100000
},
“ioConfig”: {
“topic”: “FlinkTest1”,
“consumerProperties”: {
“bootstrap.servers”: “localhost:9092”
},
“taskCount”: 1,
“replicas”: 1,
“startPartitions”: {
“topic”: “FlinkTest1”
}
}
}
So when i queried data I am getting three rows of 11052017010000 time. Query and output are displayed below:
{
“query”: “SELECT __time,d1,d2,m1,m2 FROM TEST_DB1”
}
Output:
[
{
“__time”: “2017-05-11T01:00:00.000Z”,
“d1”: “dim1”,
“d2”: “dim2”,
“m1”: 20.0,
“m2”: 20.0
},
{
“__time”: “2017-05-11T01:00:00.000Z”,
“d1”: “dim1”,
“d2”: “dim2”,
“m1”: 20.0,
“m2”: 20.0
},
{
“__time”: “2017-05-11T01:00:00.000Z”,
“d1”: “dim1”,
“d2”: “dim2”,
“m1”: 20.0,
“m2”: 20.0
}
]
Now when we create a new topic with 1 partition and replication factor 1, and inserted same data with same specification, I got rolled up result and single record for the data*.
So* as per my understanding, the druid creates the consumers according to number of partition of topic and perform rollup on each consumer, which is not right. It should be rollup over the whole data which it is receiving.

If I forgot anything to achieve this using given specs please let me know, otherwise please do the needful for this.

With streaming ingestion Druid doesn’t “guarantee” rollup. It might do partial rollup like you saw. The idea is that you could do one/both of these if you want better rollup ratios:

  • Produce your messages to Kafka based on dimensions, so messages that should roll up end up in the same Kafka partition

  • Reindex later on using a batch “index” or “index_hadoop” task in the background, which can guarantee rollup

Hi! Gian,

Thanks for your response.

I had applied your suggestion to push the data in same partition. My producer is producing the record in the same partition, Still I can see the duplicate data in Druid.

Please guide me if I am doing something wrong.