Real time ingestion and late elements


I have multiple questions on real-time ingestion and on how to process late elements from a message queue in case of outages. We have message queues in different geographical zones with the same schema, and we’d like to potentially, but not necessarily insert the data from them into the same datasource (note: one of the zones is dominates all the others in terms of traffic). Now let’s suppose that the elements in each message queue have a monotonically increasing timestamp, but in case of broken network links we can end up having large deltas between the timestamps of the last elements in different queues while catching up.

If I correctly understood (please confirm), these are the ways of loading old data into druid:

1, with batch ingestion,

2, using the new druid-kafka-indexing-service,

3, tranquility (by setting windowPeriod to a huge number),

4, real time node (by setting rejectionPolicy to messageTime),

5, tranquility (with a custom Timekeeper to use message time to client side rejection instead of system time:!topic/druid-development/4-pSFaSDzEM).

If possible, we’d like to use the same method for catching up than for real time insertion (thus avoiding implementing 1,). I understand the problems with 3, and 4, so that leaves us with 2, and 5.

To have a better understanding of these methods, I’d have the following questions:

1, In the above linked discussion Gian Merlino mentionned that in the long-term you are thinking about eliminating the rejection policies. Where are you in that process?

2, Let’s imagine that we are using solution 5, and a failure happens at 14h30, and we are unable to consume for a day. A real time indexing task was created for the interval between 14h - 15h, and it was closed at 15h10 (because of the windowPeriod of 10 minutes). The next day we restart our storm topology, and the beam bolt recreates a new realtime indexing task for the 14h-15h period, because of the unconsumed message starting from 14h30. The segment created before the outage is going to be merged with or overwritten by the segment created by the new task?

3, I have the same question for the kafka-indexing-service: in case of an outage longer than lateMessageRejectionPeriod, messages are going to be dropped or merged with the already indexed messages?

4, If lateMessageRejectionPeriod is none, but we are not manipulating the same segment from two different pipelines, then are we safe from the concurrency issues mentioned in the docs?

5, Is it possible to use sharding with kafka-indexing-service? Or is it automatically sharded based on partitions?

6, For consuming from Kafka, is still there a point of using tranquility over the new indexing service (besides stability)?

7, Using multiple datasources for different zones with “union” datasource in queries sounds like a good idea?

Thanks in advance for your answers,

– Balazs

Hey Balazs,

Given your requirements I would take a look at the new Kafka indexing service. Options #3, #4, #5 are all kind of hacky – it’s better to consider those approaches as ways to get historical data loaded for some testing, but not something you’d actually want to do in production. Other than the Kafka indexing service, the expected use case for realtime ingestion is recent data only.

To answer some of your questions about the Kafka indexing service:

  1. Older events are written into new segments that become part of an existing segment set. It never drops events by default. It will only drop events if you set lateMessageRejectionPeriod to something other than the default (null, i.e. disabled).

  2. I’m not really sure what you mean by this?

  3. Sharding will happen based on Kafka partitions; you can control how many shards get created by changing the number of Kafka partitions.

  4. For loading data from Kafka, the main advantage of Tranquility Kafka at this time is that it is more battle tested. I expect that in the long run the Kafka indexing service will be the preferred method though.

  5. Yes the “union” dataSource makes sense here.

If you’re thinking of trying it out, we have a tutorial here that you can check out to get started: