Same records ingested multiple times by Real-time node

I use Kafka firehose with Realtime node to ingest the data.
Suppose I kill the Realtime node and restart it later ( probably with a different window period).

  1. How does Druid recognise what data has been read from Kafka and what has not been?

  2. Does it re-ingest the same data?

  3. Can duplicate data be recognised by some key or id and removed later from the system (via indexing service probably) ?

Hey Saksham, Druid’s current approach to Kafka ingestion is to use the high level consumer and to commit offsets when it persists data to local disk. It can index the same data multiple times when the consumer rebalances, which will occur whenever a realtime node is stopped or started. We’re working on a different approach with the goal of transactional ingestion from Kafka, but it isn’t available yet.

For the time being, the best way to do transactional ingestion into Druid is with the Hadoop indexer. A reasonable pipeline would be Kafka -> S3/HDFS -> Druid. Many people run a lambda architecture where the same events are streamed into Druid in realtime immediately, and then indexed in batch with Hadoop after some time has passed.

Hi Gian,
Can you explain a little more about how this would work?

Would I run realtime nodes, and run the indexer services (overlord, middle managers, etc.) on some periodic schedule, and configure the realtime nodes to somehow not update historicals?

Hey Saksham, Druid doesn’t make any assumptions about data ordering in Kafka. It will simply do a linear scan of each kafka partition until it encounters indexable events. The reason for this lack of assumptions about ordering is that you may have produced messages in Kafka out of time order.

Hey Kevin,

The Druid Hadoop indexer actually does a replace-by-interval of data. So what you’d do for a lambda architecture is just have a realtime pipeline operating normally, and then add a batch pipeline that runs on some time delay greater than your segmentGranularity + your windowPeriod (to prevent it from clobbering actively-pending realtime data). Data indexed by the batch pipeline will completely replace data indexed in realtime for the same interval.