Metric value doubled (trippled or X timed) after a realtime restart (kill then start)

This is a possible duplicate topic however I was not able to find exactly symptom from earlier posts.
Setup
8 realtime nodes consuming from a kafka queue with 8 partitions. We are using the kafka firehose documented here: http://druid.io/docs/latest/ingestion/realtime-ingestion.html#realtime-specfile, ioConfig shows below
“ioConfig” : {
“type” : “realtime”,
“firehose”: {
“type”: “kafka-0.8”,
“consumerProps”: {
“zookeeper.connect”: “localhost:2181/kafka”,
“zookeeper.connection.timeout.ms” : “15000”,
“zookeeper.session.timeout.ms” : “15000”,
“zookeeper.sync.time.ms” : “5000”,
“group.id”: “druid-example”,
“fetch.message.max.bytes” : “1048586”,
“queued.max.message.chunks” : “100000”,
“auto.offset.reset”: “smallest”,
“auto.commit.enable”: “false”
},
“feed”: “ds_1”
},
“plumber”: {
“type”: “realtime”
}
},
“tuningConfig”: {
“shardSpec”: {
“type”: “numbered”,
“partitionNum”: 0,
“partitions”: 8
},
“type” : “realtime”,
“maxRowsInMemory”: 5000000,
“intermediatePersistPeriod”: “PT3M”,
“windowPeriod”: “PT10M”,
“basePersistDirectory”: “/local/data/druid/real_time/base_persist”,
“rejectionPolicy”: {
“type”: “messageTime”
}
}
Issue:
During a realtime node restart (1 out of 8), we saw intermittent incorrect metrics value, which in almost all case was X times of the normal value see highlighted in RED, comparing the minute with other minutes for the same metrics, the value seems to be 4 times of what it is supposed to be
{
“timestamp” : “2016-01-04T04:35:00.000Z”,
“result” : {
“count” : 1,
“metrics_0” : 587.8671875,
“metrics_1” : 35775.0
}
}, {
“timestamp” : “2016-01-04T04:36:00.000Z”,
“result” : {
“count” : 1,
“metrics_0” : 581.7578125,
“metrics_1” : 36492.0
}
}, {
“timestamp” : “2016-01-04T04:37:00.000Z”,
“result” : {
** “count” : 1,
“metrics_0” : 2402.953125,
“metrics_1” : 144404.0**
}
}, {
“timestamp” : “2016-01-04T04:38:00.000Z”,
“result” : {
“count” : 1,
“metrics_0” : 580.48046875,
“metrics_1” : 36808.0
}
}, {
“timestamp” : “2016-01-04T04:39:00.000Z”,
“result” : {
“count” : 1,
“metrics_0” : 602.73046875,
“metrics_1” : 38375.0
}
}
Query:{
“queryType”: “timeseries”,
“dataSource”: “ds_1”,
“filter”: {
“type”: “selector”,
“dimension”: “some_id”,
“value”: “1234567”
},
“intervals”: [
“2016-01-04T04:35:00.000Z/2016-01-04T05:21:00.000Z”
],
“granularity”: “minute”,
“aggregations”: [
{ “type”: “count”, “name”: “count” },
{ “type”: “doubleSum”, “name”: “metric_0”, “fieldName”: “metric_0” },
{ “type”: “doubleSum”, “name”: “metric_1”, “fieldName”: “metric_1” }
]
}**
Guesses:**Looks almost definitely like some message duplication issue. Some messages were processed multiple times probably because the offset was not committed by the realtime node that was killed. Other realtime nodes processed the message and probably after the node is back it reads from where it left then processed the message again? Not sure if this is possible to happen.

The weird thing here is the “count”, it seems like there’s only one message since count = 1, then message duplication will not explain anything. Looks to me somehow the same metrics were summed together and became a new message where all the dimension values were correct but the metric values were X timed.

Appreciate someone to help out understanding the issue and potential fix, thanks in advance

Shuai

Hi Shuai, what version of Druid is this? Each time there is a persist, an offset is committed back to Kafka. This is with the idea that if you restart the node, the in-memory portion is lost, and Druid rereads again from the last offset it committed.

I noticed that you set “auto.offset.reset”: “smallest”, can you set it to “largest” and see if you still have this issue?

Thanks Fangjin for the response, we are using druid-0.8.0. We’ve tried “largest” but still seeing the issue during restart.

Current mitigation was to stop all the realtime nodes, remove the offset in zookeeper, than restart all realtime nodes (with “auto.offset.reset”: “largest”), that we will have missing data (preferred, for now) instead of duplicated messages.

The thing that bugs me is how come metrics values are doubled but message count is still 1?

Hey Shuai,

The reason you’re seeing message count 1 is because the duplicate events are being rolled up at ingestion time and are stored as a single event, so when you query for number of events you get back 1. To get the correct count, change your aggregator to use longSum instead, e.g.:

“aggregations”: [
{ “type”: “longSum”, “name”: “count”, “fieldName”: “count”},
{ “type”: “doubleSum”, “name”: “metric_0”, “fieldName”: “metric_0” },
{ “type”: “doubleSum”, “name”: “metric_1”, “fieldName”: “metric_1” }
]

That indeed was the problem, thanks Lim!