Confused about sharding

I’m having trouble understanding exactly how sharding/partitioning works in modern (Kafka indexing service) Druid. I’ve seen http://druid.io/docs/latest/design/segments.html and https://groups.google.com/forum/m/#!topic/druid-development/GFBM4ifA6ac but the shardSpec they talk about doesn’t seem to be part of KIS. What kinds of sharding/partitioning (other than by segment time interval of course) happen with KIS? Are there docs I’m missing?

–dave

Hi Dave,

I don’t think there’s doc about it. Let me give you some explanation.

Before starting describing the sharding in KIS, I would like to make the relationship among the terms of shard, partition, and segment clear.

In Druid, a dataSource is first partitioned by time and we call each result partition ‘timeChunk’. A timeChunk can be further partitioned into segments. As a result, all segments of the same timeChunk belongs to the same time interval. Each segment belongs to a partition. And finally, the partition and shard are same in Druid.

Partitioning in KIS happens based on only the max segment size or timeout for publishing segments. This is why KIS uses only NumberedShardSpec among 5 types of shardSpec. NumberedShardSpec is mostly for identifying shards by their IDs (i.g., partitionNum in code) and doesn’t have any implications about what data is in the shard.

Here is a more detailed description.

Before 0.12, the supervisor launches kafka index tasks which run for taskDuration time. Each task can create multiple segments if it reads events having different timestamps falling into different timeChunk. However, there should be a single segment generated by the same task per timeChunk. When the run time of the task reaches taskDuration, it starts publishing generated segments. If the task generated first segments for any timeChunk, they will have ‘0’ partition ID for NumberedShardSpec. If there were already some segments in timeChunk, the newly generated segments will have ‘max partition ID of existing segments + 1’ as its partition ID.

Please note that taskDuration might not be same with segmentGranularity. For example, segmentGranularity can be HOUR while taskDuration is 30 mins (which is default). In this case, 2 kafka index tasks can be submitted per hour (one by one) and they might create segments for the same timeChunk. In the end, they will create at most 2 segments per timeChunk each of which has 0 and 1 partition ID, respectively. This can be more complicated if you set partitions in Kafka.

After 0.12, things got a bit more complicated. Each kafka task can publish segments in the middle of taskDuration. As a result, a task can generate multiple segments per timeChunk. This is controlled by ‘maxRowsPerSegment’, ‘maxTotalRows’, and ‘handoffConditionTimeout’. So, the task can publish segments if its size reaches ‘maxRowsPerSegment’ or ‘maxTotalRows’ or it hasn’t published any segments for ‘handoffConditionTimeout’. Please check http://druid.io/docs/latest/development/extensions-core/kafka-ingestion for more details about the configurations. Finally, as before 0.12, kafka tasks publish segments after taskDuration.

Hope this helps. Please feel free to ask if you have more questions.

Jihoon

This is super helpful, Jihoon! I have a couple questions.

Just to be super explicit: when you say "timeChunk can be further
partitioned into segments" and that the segments "belongs" to the same
interval — when we look at the name of any of these segments, we will
see the same timeChunk represented, right? It might be that the
segments in the time chunk are (roughly) split up by time (because the
index task handed off the segment before the end of the time chunk)
but we won't observe that from looking at the segment's given
interval/version, right?

And the overall effect of having multiple segments per partition in
KIS is primarily about:

- letting the hand-off from index task to historical be more
incremental rather than one fell swoop
- allow late-arriving data to be added to a new segment in a timeChunk
- allowing Kafka partitions to be processed in parallelel

... but not about minimizing the work done at query time by allowing
you to separate data that differs along
frequently-filtered-at-query-time dimensions?

I do see that there's something that sounds like the latter
(PartitionsSpec) but that seems to be specific to Hadoop batch
ingestion (not even native batch ingestion).

--dave

Dave,

Just to be super explicit: when you say “timeChunk can be further
partitioned into segments” and that the segments “belongs” to the same
interval — when we look at the name of any of these segments, we will
see the same timeChunk represented, right? It might be that the
segments in the time chunk are (roughly) split up by time (because the
index task handed off the segment before the end of the time chunk)
but we won’t observe that from looking at the segment’s given
interval/version, right?

If you mean they can be split up by time because of the smaller taskDuration than segmentGranularity, then yes they may be. And yes again, we can’t figure it out from segment’s interval or version.

And the overall effect of having multiple segments per partition in
KIS is primarily about:

  • letting the hand-off from index task to historical be more
    incremental rather than one fell swoop
  • allow late-arriving data to be added to a new segment in a timeChunk
  • allowing Kafka partitions to be processed in parallelel
    … but not about minimizing the work done at query time by allowing
    you to separate data that differs along
    frequently-filtered-at-query-time dimensions?

I do see that there’s something that sounds like the latter
(PartitionsSpec) but that seems to be specific to Hadoop batch
ingestion (not even native batch ingestion).

This is mostly correct. The native indexTask supports hash partitioning, but parallelIndexTask doesn’t support it yet.

This kind of pruning at query time based on partitioning would be super useful!

Jihoon