Realtime Node - Kafka Ingestion - Commit/Persist Issue

As per your KafkaFireHouse Factory sample, there is a comment indicating this.

/*

This is actually not going to do exactly what we want, cause it will be called asynchronously

after the persist is complete. So, it’s going to commit that it’s processed more than was actually

persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade

of our Kafka version.

*/

  1. Is this still an issue?

  2. The reason for this question is that I ran a real time node to write data to disk after 500 rows.

In every single run, druid was able to consistently do the segment write and then commit to kafka.

2015-09-25T00:31:01,715 INFO [shift-incremental-persist] io.druid.segment.IndexMerger - outDir[/tmp/realtime/basePersist/shift/2015-09-18T18:20:00.000Z_2015-09-18T18:25:00.000Z/1/v8-tmp] completed walk through of 500 rows in 50 millis.

2015-09-25T00:31:01,772 INFO [shift-incremental-persist] io.druid.segment.IndexMerger - outDir[/tmp/realtime/basePersist/shift/2015-09-18T18:20:00.000Z_2015-09-18T18:25:00.000Z/1/v8-tmp] completed inverted.drd in 57 millis.

2015-09-25T00:31:01,803 INFO [shift-incremental-persist] com.zscaler.zpn.io.druid.firehose.kafka.ZPNKafkaEightFirehoseFactory - committing offsets

Hey Subbu,

This is still an issue. We are working on a new Kafka ingestion mechanism that won’t have this problem. Fwiw, the specific issue is that the firehose.commit() is called after persisting to disk is complete, but in that time, more events would have been read into memory. Those events would fall under the same commit, even though we didn’t really want to commit them.

Inline.

As per your KafkaFireHouse Factory sample, there is a comment indicating this.

/*

This is actually not going to do exactly what we want, cause it will be called asynchronously

after the persist is complete. So, it’s going to commit that it’s processed more than was actually

persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade

of our Kafka version.

*/

  1. Is this still an issue?

The shorter answer is yes, although recent work, including the work done to use the low level Kafka consumer (https://github.com/druid-io/druid/pull/1609) should have improved this behavior. I believe what you really want is the completion of https://github.com/druid-io/druid/issues/1642, which is currently in progress should be ready in the next few months, which will enable exactly once ingestion from Kafka.

Thanks a lot.

Is there any way to listen to indexing complete events and then issue an commit through firehose factory? Some sort of an observer model perhaps.

Hey Subbu,

I’m not sure what you mean by that, can you please elaborate a bit?

Sure Gian.

  • The use case is not to lose data because of loose coupling between kafka read/commit cycle and indexing event cycle.

  • A firehouse FirehoseFactory implementation can register itself with the IndexManager (or equivalent of it).

  • Before index manager starts its cycle, it will fire an event to all registered FirehoseFactory instances.

  • The FirehoseFactory instance stops consuming from the Kafka and waits for another notification from the IndexManager.

Once it receives notification that indexing is complete it then issues a commit and then proceed to continue read from Kafka.

Of course there is a window of opportunity for Kafka complete to fail, but this better than what we have now.

I am worried about data loss when JVM could be killed etc leading to data loss.

Ah okay, I see. Things are not currently done this way because we want to ingestion to continue during a persist. There’s also some other issues that the comment doesn’t get into. I think the most notable one is that during a Kafka consumer rebalance, you can get some duplicate data (the new consumer picks up from the last checkpoint, but the old consumer still has some data past that).

The new Kafka ingestion scheme we’re working on will address these shortcomings by associating partition offsets with the actual Druid segments themselves. This way, the ingestion will become transactional.

Some more info (maybe too much info!) is at our umbrella issue for improving realtime ingestion: https://github.com/druid-io/druid/issues/1642