Spike in Kinesis stream iterator age every time Druid supervisor creates a new segment

Hi,

We have a real-time application that pushes data into a Kinesis stream and then the data from the kinesis stream is ingested by Druid. The segment size of our Kinesis streaming Druid supervisor is “Hour”.

But we observed that whenever a new segment is created every one hour, the iterator age of our Kinesis stream has a very big spike which we think could be because when druid ingests data from the kinesis stream for the new segment it is trying to read from the beginning of the stream ( the earliest record in the stream), instead of reading from the last kinesis records it processed.

The druid spec is as follows in the above case:

{

“type”:“kinesis”,

“dataSchema”:{

“dataSource”:“test_revenue”,

“parser”:{

“type”:“string”,

“parseSpec”:{

“format”:“json”,

“timestampSpec”:{

“column”:“timestamp”,

“format”:“iso”

},

“dimensionsSpec”:{

“dimensions”:[

“game_name”,

“platform”,

“environment”,

“channel”,

“iap_pack”,

“country”

]

}

}

},

“metricsSpec”:[

{

“type”:“count”,

“name”:“count”

},

{

“type”:“doubleSum”,

“name”:“amount”,

“fieldName”:“amount”

},

{

“lgK”: 16,

“fieldName”: “userId”,

“name”: “spenders”,

“tgtHllType”: “HLL_8”,

“type”: “HLLSketchBuild”

}

],

“granularitySpec”:{

“segmentGranularity”:“HOUR”,

“queryGranularity”: {

“duration”: 300000,

“type”: “duration”

},

“rollup”: true

},

“transformSpec”:null

},

“tuningConfig”:{

“type”:“kinesis”,

“reportParseExceptions”:false,

“resetOffsetAutomatically”:true,

“skipSequenceNumberAvailabilityCheck”:false

},

“ioConfig”:{

“stream”:“test_druid_revenue”,

“endpoint”:“kinesis.us-east-1.amazonaws.com”,

“taskCount”:1,

“replicas”:1,

“taskDuration”:“PT1H”,

“recordsPerFetch”:2000,

“fetchDelayMillis”:1000,

“useEarliestSequenceNumber”:false,

“deaggregate”:true

}

}

But when we changed the spec to “skipSequenceNumberAvailabilityCheck : true”, there was no spike in the iterator age of the kinesis stream when druid created the next hourly segment. But this means that the “resetOffsetAutomatically” parameter will be false by default ( Druid does not allow me to have “resetOffsetAutomatically: true” when “skipSequenceNumberAvailabilityCheck: false”).

Ideally, we do “not” want to skip the sequence number check, and reset the supervisor to LATEST only when the sequence number is older than what is available in the kinesis stream (due to kinesis retention policy). But at the same time, we also do not want to see the spike in the iterator age of the kinesis stream every time druid ingests data from the stream for the next hourly segment.

I have attached a screenshot of the kinesis stream iterator age from AWS dashboard for your reference. The spikes in the iterator age are when the Druid spec parameters are “resetOffsetAutomatically”:true and “skipSequenceNumberAvailabilityCheck”:false. The iterator age line was flat when I changed the parameter “skipSequenceNumberAvailabilityCheck”:true.

Can you let us know if there is something wrong in the kinesis/druid configurations in our above Druid ingestion spec, or if it is the expected behavior or a known issue.

Thanks,

Amrita

But at the same time, we also do not want to see the spike in the iterator age of the kinesis stream every time druid ingests data from the stream for the next hourly segment.

When you see the iterator age increase at the end of the hour, is the Kinesis task for the next hour running? How many MiddleManagers and available task slots are there? It sounds like there’s some delay from when the first task is publishing to when the next task starts reading from the previous task’s checkpoint.

the iterator age of our Kinesis stream has a very big spike which we think could be because when druid ingests data from the kinesis stream for the new segment it is trying to read from the beginning of the stream ( the earliest record in the stream), instead of reading from the last kinesis records it processed.

Looking more into this, the sequence number check controlled by skipSequenceNumberAvailabilityCheck works by requesting an iterator of records starting from the earliest available sequence number, and compares that against the previous checkpointed sequence number. It will continue to read from the checkpointed sequence number (assuming it’s available) and not the earliest sequence number. However, by requesting that iterator for the earliest available number, it triggers an increase in iterator age, even though it’s not actually reading records from that point.

So the increase the metric behavior is expected, and skipSequenceNumberAvailabilityCheck would allow you to avoid that increase. The tradeoff would be that tasks would potentially fail later when they try to read from an unavailable sequence number, and you would need to manually reset, instead of reset happening automatically with the check enabled.

hi Jonathan,

Thanks for explaining the reason for the spike in iterator-age of a kinesis stream when the skipSequenceNumberAvailabilityCheck property in druid streaming spec is set to false and a new task is created every hour.

I have another question regarding the useEarliestOffset property in the druid kinesis streaming spec.

In the following Druid documentation, it states that you can set the property “useEarliestOffset” to true or false depending on whether you want Druid to read from the earliest available record in the kinesis stream or the from the latest record when the resetOffsetAutomatically property is set to true.

https://druid.apache.org/docs/latest/development/extensions-core/kinesis-ingestion.html

But when I added the property "useEarliestOffset : false " to the spec, terminated and re-submitted by kinesis-streaming druid ingestion spec, it seems like the property didn’t have any effect and there was a temporary spike in the age. Also when I checked the spec available in the Payload tab of the particular supervisor in the Druid UI, I saw the property “useEarliestOffset” was not present in the spec even though I had added it when I submitted the spec.

Is the property “useEarliestOffset” deprecated in Druid 0.15.1 ?

Thanks,

Amrita

Hey Amrita,

In Kinesis, it’s called “useEarliestSequenceNumber”. See https://druid.apache.org/docs/latest/development/extensions-core/kinesis-ingestion.html for more info.