Questions about kafka indexing service

Hi, all,

Having read the related documents about the kafka indexing service, I still have some confusions as fellows:

(1) How does a kafka index task decide which part of data to read and to index?

According to http://druid.io/docs/0.9.1.1/development/extensions-core/kafka-ingestion.html, it seems that during the taskDuration the task will continue to consume data from kafka.

taskDuration defination:The length of time before tasks stop reading and begin publishing their segment.

But according to https://imply.io/post/2016/07/05/exactly-once-streaming-ingestion.html, the data seem to be pre-assigned to task by someone(may be supervisor):

Each task is assigned a set of partitions with corresponding start and end offsets and will begin reading messages from Kafka sequentially until all assigned offsets have been read

(2) What’s the relation between the configuration parameters taskDuration and segmentGranularity? How does they takes effect in generating segment? Assume I set both of the to PT1H.

(3) Does the kafka index tasks also take the job of answering the realtime queries?

(4) According to https://imply.io/post/2016/07/05/exactly-once-streaming-ingestion.html:

*Crucially for exactly-once ingestion, the task will also atomically record the final Kafka offsets in the same metadata transaction as the segment entry. *

So does that means even if I set the task replicas to 1, the data assigned to a task that failed finally will still be processed by the subsequent tasks?

If that is true, what’s the gains to make more than 1 task replicas?

Hey Bill,

(1) How does a kafka index task decide which part of data to read and to index?

The description of the offset assignments in the blog post is somewhat simplified to keep things as simple as possible. The actual mechanism is:

  • the supervisor will create a task and specify a starting offset (either based on where the previous task left off or if there was no previous task based on earliest/latest offset in Kafka) but will not specify an ending offset (it sets it to Long.MAX_VALUE, i.e. run until you’re told to stop)
  • when taskDuration elapses, the supervisor will then tell all of the tasks to pause reading and report the last read offsets for each partition
  • the supervisor will then determine the highest read offset by any of the tasks for each partition, and will send this map of end offsets to each of the tasks
  • the tasks will then read to the end offsets and then stop reading and begin publishing

The mechanism is designed this way to guarantee that replica tasks (tasks which are assigned to read the same partitions) will have read the exact same data and will generate the same segments.

(2) What’s the relation between the configuration parameters taskDuration and segmentGranularity? How does they takes effect in generating segment? Assume I set both of the to PT1H.

The two parameters are (unfortunately?) completely independent. Segment granularity will generate intervals aligned with time boundaries (i.e. hourly will create segments that begin and end on the hour) but taskDuration starts counting from when the create task request has been issued. The result is that setting PT1H for both will mean that you will get a minimum of two segments per partition per task. This is somewhat annoying in that it will result in the creation of more segments, but doesn’t affect correctness. The blog post has a link to documentation on how to merge segments together if they become problematic.

(3) Does the kafka index tasks also take the job of answering the realtime queries?

Yes.

(4) So does that means even if I set the task replicas to 1, the data assigned to a task that failed finally will still be processed by the subsequent tasks?
If that is true, what’s the gains to make more than 1 task replicas?

Yes that’s correct, offsets read by failed tasks will be read again by subsequent tasks if there is no replication. Replication is definitely more important for other ingestion mechanisms (like Tranquility) where the loss of a task could mean the loss of data. For Kafka indexing tasks, replicas might be important to you if task failures are problematic, i.e. temporarily losing the last hour of data for your realtime queries isn’t acceptable and you want to minimize that possibility. In some cases, if you have heavy realtime load, replication might help to scale out realtime queries since multiple nodes can now respond to queries for the same data (although there’s a lot of other levers you’ll want to play with to optimize realtime query performance).

Hey David,

I see that the answer to the question:

(3) Does the kafka index tasks also take the job of answering the realtime queries? is YES,

but I am not fully sure if I understand the full meaning of that (was not able to find anything in the docs)

Does that mean the data (segments) that is still in read mode by the kafka indexer (meaning that the taskDuration is still not over and segments still not pushed to the deep storage) available for immediate queries ?

or the segments would be available to be queried only after being handed-off to the historical nodes ?

Also I would like to verify my understating of the relationship between taskDuration and segmentGranularity, if for example the segmentGranularity is 1H and the taskDuration is 15 minutes, is that mean that kafka indexer would create 4 segments for that 1H time interval ?

Regarding re-indexing, if from particular reasons I would like to re-index (once a day) the generated segments by the kafka indexer with batch-hadoop index task, would Druid delete those 4 segment files (for 1H interval) and would use the new single segment file generated by the batch-hadoop index task ?

Thanks,

Hey Igor,

Yes that’s correct - the events ingested by the Kafka indexing task are available for queries shortly after being read from Kafka and you don’t need to wait for them to be handed off to the historical nodes.

Regarding the relationship between taskDuration and segmentGranularity, 4 segments is the theoretical minimum for your example, but in practice you’ll always generate more segments than this. The related reason for this is that taskDuration and
segmentGranularity are not aligned on the same time boundaries, i.e. segmentGranularity of 1H will generate segments from say 12:00:00-1:00:00, 1:00:00-2:00:00, but taskDuration 1H will run for 1H starting from when the task is created, so for example 12:01:00-1:01:00.
There’s some discussion on this here: http://druid.io/docs/0.10.0/development/extensions-core/kafka-ingestion.html#on-the-subject-of-segments

Typically,
the Kafka indexing service generates a large number of segments, so I’d
recommend setting taskDuration to something higher than 15 minutes (somewhere between 1 and 4 hours is probably a good starting point). As mentioned in the above link, having a daily re-indexing job is also a good idea, which leads to your third question…

The segments generated by the batch indexing job run later on will have a higher version number than those generated by the Kafka indexing tasks and will
overshadow them, meaning that historicals will load this newer version and will use it to serve queries. The previous segments generated by the
Kafka indexing tasks will still remain in deep storage until they are deleted using a coordinator kill command, or alternatively you can enable automatic killing of unused segments. See: druid.coordinator.kill.on and related properties here: http://druid.io/docs/0.10.0/configuration/coordinator.html

One
thing to keep in mind - to keep things moving as smoothly as possible, it’s best to have batch jobs and realtime jobs working on different sections of the timeline; in other words, the daily batch job should operate on intervals that the Kafka indexing tasks are no longer seeing events for. Otherwise, the batch jobs and the realtime jobs will be trying to acquire locks for the same time ranges and will be blocking one another.

Hope this helps,
David

Thanks David, that helps indeed.

So I guess the broker, via zoo-keeper would be also aware about segments which created by the kafka indexer and which are still located on the middle managers until they would be handed-off to the historical nodes ? if so, then in the kafka indexer case, the middle managers not only responsible for executing index tasks, but also responsible querying segments ?

I would like also to ask regarding the following use case (with kafka indexer):

I thought to create with Druid help some “urgent feature” for products.

I would like to show for each product how much unique users are currently viewing that product (currently could be for example in last 10 minutes).

My idea is to have segments with 1 Hour granularity, 1 Minute query granularity, and hyperLogLog aggregator (in index time) on the user id field, for each product id. Then I could query that data source in order to ask how much unique visitors were looking on the specific product in the last 10 minutes.

I guess that these 1H segments could be discarded quite fast and I dont have really the need to keep them for long time.

I would be happy to hear your thought regarding such use case.

  • in such case I am not sure if it even worth to handoff those segments to historical nodes, is there someway to tell kafka indexer no to load the segments to deep storage ? is there a way to specify discard strategy for segments that still were not handed-off ?

Thanks,

Hey Igor,

Yes, you are mostly correct, but it’s the indexing tasks themselves that announce their presence and segments to Zookeeper and respond to query requests from the broker. The middle manager is just responsible for spawning indexing tasks (we refer to the processes as ‘peons’) and isn’t involved in the query or ingestion paths directly.

Your proposal sounds good to me and is something that Druid is well suited for. In terms of immediately discarding segments, Druid isn’t really set up for that use case and you can’t prevent the indexing tasks from pushing segments to deep storage in a clean way. If you don’t need the segments, I recommend letting them be handed off to historicals and then setting an aggressive drop rule (http://druid.io/docs/latest/operations/rule-configuration.html) to drop the segments within a few hours. You can combine this with druid.coordinator.kill.on and related properties to have the segments removed from deep storage as well, see: http://druid.io/docs/latest/configuration/coordinator.html)

Hope this helps,
David