realtime node loss data after "recovery"

Hello,
I am testing a tiny druid cluster which has only one realtime node. and the realtime node is consuming a single topic from kafka.

I push 100000 record to the kafka server, and which record is like this:

{
id: “123”,

count: 1, // always be 1

timestamp: 1432869878400,

}

``

and then fire a query to the broker:

{

“queryType”: “timeseries”,

“dataSource”: “ssp1”,

“granularity”: “all”,

“aggregations”: [

{ “type”: “longSum”, “name”: “counts”, “fieldName” : “count” }

],

“intervals”: [

“2000-01-01T00:00:00.000Z/2016-01-01T00:00:00.000Z”

]

}

``

got:

[

{

“result”: {

“counts”: 100000

},

“timestamp”: “2015-05-29T09:41:32.000Z”

}

]

``

now I noticed the topic offset saved in zookeeper is 1887. then kill the realtime node(use kill pid), fire the query a again, got an empty result.

restart realtime node, query again, got:

[

{

“result”: {

“counts”: 98114

},

“timestamp”: “2015-05-29T09:41:32.000Z”

}

]

``

that means Druid lost some record.

it is a bug or normal for Druid? or something I miss?

my realtime configuration show below.

runtime.properties:

druid.port=3035

druid.service=realtime
druid.processing.buffer.sizeBytes=100000000

druid.processing.numThreads=10

druid.realtime.specFile=/opt/druid/config/realtime_ssp1/ssp1.spec

``

ssp1.spec:

[

{

“dataSchema” : {

“dataSource” : “ssp1”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “timestamp”,

“format” : “auto”

},

“dimensionsSpec” : {

“dimensions”: [“id”],

“dimensionExclusions” : ,

“spatialDimensions” :

}

}

},

“metricsSpec” : [{

“type”: “longSum”,

“name”: “counts”,

“fieldName”: “count”

}],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “fifteen_minute”,

“queryGranularity” : “second”

}

},

“ioConfig” : {

“type” : “realtime”,

“firehose”: {

“type”: “kafka-0.8”,

“consumerProps”: {

“zookeeper.connect”: “127.0.0.1:2181”,

“zookeeper.connection.timeout.ms” : “15000”,

“zookeeper.session.timeout.ms” : “15000”,

“zookeeper.sync.time.ms” : “5000”,

“group.id”: “druid_realtime”,

“fetch.message.max.bytes” : “1048586”,

“auto.offset.reset”: “smallest”,

“auto.commit.enable”: “false”

},

“feed”: “ssp1”

},

“plumber”: {

“type”: “realtime”

}

},

“tuningConfig”: {

“type” : “realtime”,

“maxRowsInMemory”: 50000,

“intermediatePersistPeriod”: “PT10s”,

“windowPeriod”: “PT10M”,

“basePersistDirectory”: “/data1/druid/realtime/basePersist”,

“rejectionPolicy”: {

“type”: “none”

},

“shardSpec”: {

“type”: “linear”,

“partitionNum”: 0

}

}

}

]

``

Yes, that is possible currently. But, there is work-in-progress on the proposal (https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/druid-development/9HB9hCcqvuI) that will fix this behavior.

In the meantime, you can play with “maxRowsInMemory” and “intermediatePersistPeriod”.

– Himanshu

Druid’s realtime ingestion is currently best effort and this type of situation is possible. As Himanshu mentioned, the code for the proposal listed in the link is in code review and should make it in soonish.

got it.
Himanshu and Fangjin, thank you for your information!