I’m running Druid 0.11 and I’m getting loads of warnings in the Kafka indexer peon task log for my datasource of the form:
WARN [task-runner-0-priority-0] io.druid.indexing.kafka.KafkaIndexTask - Skipped to offset[16,521,675] after offset[16,521,674] in partition.
I'm using Kafka-Streams in transactional mode to transform data before ingesting in Druid.
I have set "skipOffsetGaps": true in my ioConfig, so the ingestion is working correctly. But it seems that Druid doesn't understand that Kafka uses up offsets for commits when using transactions - it's certainly doesn't seem something that should be logged at WARN level as it's expected that Kafka skips offsets with transactions.
I am worried that Druid may have other issues with Kafka Transactions. Is ingestion from topics that have been produced with Kafka transactions supported by Druid or should I expect other issues when doing so?
The ‘skipped offset’ check in Druid’s Kafka indexing consumer is a sanity check meant to make sure we process every message in order. It was originally put there just in case there were bugs in the Kafka broker, consumer, etc, that caused them to skip messages. This doesn’t hold for compacted topics of course, but that’s ok since it’s not expected that people will want to read from compacted topics into Druid.
I’m not familiar with how Kafka transactions works but it sounds like they break this assumption too. That’s more interesting since I do expect people to want to read from these topics into Druid. Do you have some pointers to how this works, especially on the consumer side? It sounds like we should change the Druid code a bit.
The following Confluent blog is a good overview:
In my application, I enable the use of transactions by turning on “exactly once” semantics in Kafka Streams:
I do think there are additional issues, but I’m not sure yet if there are inherent in the kafka 0.10.2.0 client libraries or in the way that druid uses them.
I’ve had the situation twice now in which the kafka indexing consumer was stuck without any messages in the log files (not even the “offset skipped” warning that I normally get a lot). In both situations, the currentOffsets in the “/druid/indexer/v1/supervisor/[name]/status” endpoint was pointing at a non-existing offset (a commit message).
With a small Java client (which initially has some classpath issues), I see that the problem is likely in the 0.10.2.0 client, which stops reading at a commit message - it keeps returning zero ConsumerRecords. The “skipOffsetGaps” feature normally manages to skip over, but likely that doesn’t work if the KafaIndexerTask happens to start exactly at an offset that points to a commit record. I guess when Druid upgrades the kafka client to 0.11 or higher, the problem will go away by itself.
I’ve had to turn on the “exactly once” guarantee in my Kafka Streams transformation application, which is not ideal. But hopefully that will make the hangs go away.
(Note: it would be nice to have a REST endpoint to explicitly set the offsets for each partition for the indexer, rather than only a general “reset” endpoint. That would be useful for debugging in other situations too)
Thanks for the research Erwin, I have filed this Druid issue: https://github.com/druid-io/druid/issues/5404
Reading the Document you pointed out I see the following (https://www.confluent.io/blog/enabling-exactly-kafka-streams/)
"First of all, in Apache Kafka we record offset commits by writing a message to an internal Kafka topic (called the offsets topic). "
Thus am not sure where the gap is coming from? are you sure it is a commit? do you have a reference about how Kafka Streams write commits to the output Stream?
IMO that will pretty bad Idea anyway.
Search for “transaction commit markers” in the first link https://www.confluent.io/blog/transactions-apache-kafka/
I found another resource, the Kafka KIP-98 for transactions:
Look at section 5.2 of that KIP “5.2 WriteTxnMarkerRequest”