How to find out kafka topic offset if kafka indexing service failed

We are trying to use kafka indexing service to ingest stream data in druid (using Druid v0.13). In the configuration file we set
“ioConfig”: {

“topic”: “###”,

“replicas”: 1,

“useEarliestOffset”: true,

“taskDuration”: “PT5M”,

“completionTimeout”: “PT10M”,

“consumerProperties”: {

“bootstrap.servers”: “****:9092”

}

}

For some reasons, overlord node failed, we need to restart it. I am just wondering how kafka indexing service will get resumed and pick up the offset correctly. Currently, I don’t see how we can find out how much data has been ingest from the topics.

Many thanks,

Christine

Hi Christine,

I think you should be able to find the kafka offsets used on the coordinator console under supervisor status

Thanks

Is there any detailed design document to introducing Kafka indexing service such as how it implement exactly once?

在 2019年3月6日星期三 UTC+8下午11:00:40,naveen kavuri写道:

Hi, Naveen

Do you mean on Overlord console? It will show task failure, but I am wondering how it detect how much message has been ingest and where to pickup for the next task.

As asked, how exactly one is implemented.

Thanks,

How about checking your Druid metastore DB, and “druid_dataSource” table ? It shall have the latest offset ingested by Druid.

So if you spin up the console(which is for some reason named as coordinator console but still runs on overlord node)

The status tab gives you the below information

The fields “latest offsets”- Offset value being read by supervisor from kafka topic

“starting offsets”- Offset value when supervisor started reading messages from topic

“Lag”-Number of messages supervisor is behind reading messages from topic

“Aggregate Lag”-Aggregate of message lag

“OffsetsLast Updated”-Timestamp when offsets are updated in supervisor

These fields should help you figure out what messages are being read by druid supervisor from your kafka topic.

Again i am by no means an expert in druid and this is just my understanding of druid but this is the way i use to see the offset values.

Thanks!

I don’t know if there are docs, but I learned it by reading the source. The operation that the indexing service does to publish new segments is the SegmentTransactionalInsertAction, which eventually translates to a SQL transaction which

  • Reads the data source’s record in druid_datasources and confirms that the offsets stored there are the same as the ones the task had when it started

  • Writes new offsets to druid_datasources

  • Inserts new segments into druid_segments

So essentially the exactly-once comes down to the ability to run a SQL transaction over cluster metadata.