Some questions about kafka indexing service

Hi all,

We are testing kafka indexing service on our product pipeline (segmentGranularity: fifteen_minute, taskDuration: PT1H). We can get data indexed correctly but we still have several problems:

  1. Is it possible that kafka indexing service can append data to old segments created by batch hadoop indexing method (use case: our product is running batch hadoop indexing method and we want to upgrade it to kafka indexing service but new coming data may have old timestamps so kafka indexing tasks need to modify(append) old segment(same time period) created by batch indexing tasks). Based on my knowledge, batch hadoop indexing segments do not have numbered shard specs so if kafka indexing tasks (with same segmentGranularity as batch indexing tasks) try to create new segments within same time period, it will trigger error: something like could not allocate segment for row with timestamp[xxxxxxx]. We tried to specify shard specs when creating batch hadoop indexing tasks, but we found numbered or linear shard specs are only used for real time indexing tasks.

  2. Any suggestions on how to set the segmentGranularity value? Because we use fifteen_minute currently, we saw a lot of small segments (about 300KB for each segment) and it will definitely impact query performance. So we try to increace segmentGranularity, e.g. one day, to make sure each segment size is around 500MB. If it’s the correct way, does it mean segmentGranularity value depends on actual data size and should be tuned based on data size?

  3. Similar to segmentGranularity, any suggestions on taskDuration? Given larger segmentGranularity, it seems we also need to set larger taskDuration. But larger taskDuration means it will keep more data in real time pipeline (in memory or on middleManager disk) and it will take a longer time to push such data into historical nodes. Will such pushing tasks affect real-time indexing tasks as well as query performance? If so, does it mean we need to give more resources to middleManager nodes if we set larger taskDuration?

  4. We know we can change(update new) kafka indexing supervisor spec during kafka indexing service’s lifetime. For example, we set segmentGranularity as fifteen_minute and it creates one segment 201609090000_201609090015, and then we update new supervisor spec with new segmentGranularity value equal to one hour, for new coming data with timestamp 201609090012 (delayed data), i think kafka indexing service will create new segment 201609090000_201609090100 for it, but this segment will have overlap with the old segment(201609090000_201609090015), how does druid handle such case? Can we get correct data if we query it?

Thank you very much.

By Linbo

Copying Gian’s response from: https://groups.google.com/forum/#!topic/druid-user/9Gok9YcoVuE

Hey Linbo,

  1. Ah, this will depend on https://github.com/druid-io/druid/issues/3241. For a workaround you could modify the source to remove the NoneShardSpec branch in DetermineHashedPartitionsJob.java.

  2. Definitely, things work best if segmentGranularity is oriented around actual data size. For most people, HOUR or DAY is best. Another thing to keep in mind with Kafka indexing is that you get at least one segment for every Kafka partition, so if you have too many Kafka partitions, then you can get a lot of small segments.

  3. Your understanding is right. Data is pushed to deep storage (and handed off to historicals) at the end of each task, so taskDuration has a direct impact on how much data is kept in the realtime system. Usually a duration similar to, or larger than, your segmentGranularity works best.

  4. In this case, Druid should use the fifteen_minute segment for time ranges where they already exist, and create larger granularity segments for new ranges. If this doesn’t seem to be working right then please let us know.

Gian