Tranquility & Clipped Firehose

Why is the clipped firehose wrapper necessary for Tranquility?

For example, if I have a segment granularity of 15 minutes, at the top of the hour, tranquility will create a firehose where the segment is clipped to 00:00 - 00:15.

I presume this is to drop data that is timestamped before or after the segment (say from 23:50 or 00:20) but why is that necessary?

Thank you

It’s because the Druid indexing APIs that tranquillity uses do not have an ability to append data to a time range that already has data. So the clipping is to prevent that from happening. If the clipping weren’t there, you would see “weird” behavior when tranquility tries to index data on top of time ranges that already have published segments.

We redesigned most of this for the Kafka indexing service, which uses different Druid APIs and can append to time ranges that already have published data.

Thanks Gian.

Can you share some details on how the Kafka indexing service can append to time ranges that already have published data? For instance, if I create two “realtime” tasks with tranquility that cover the same time period, the second one to hand off will overwrite the first. This seems to be inherent in how the segments are stored in the metadata db.

How does this work in the Kafka indexing service?

Thanks.

The Kafka indexing service uses a new Druid “allocate segment” API which allocates a new, unused segment number for a particular time range. So new segments it writes will not interfere with any previously published segments.

Interesting

Is there a rest api for “allocate segment” or is it only in-process for now?

There is a web api but it is really only meant for use by Druid tasks.

If you want to use it, the best way is to write a custom Druid task by doing an extension that extends the Task interface and uses SegmentAllocateAction. Check out the KafkaIndexTask for an example.

It seems like a similar effect could be achieved with the regular real time tasks by using a custom versioning policy and setting all tasks for a given interval to have to the same version. With that all segments having the same version, they will not overshadow each other and instead coexist.

Are there problems with that approach?

Tranquility already uses the same version for all tasks in a given interval – the issue is more with the partition numbers (in the shardSpecs). Once a shard is handed off, you would want to avoid re-using its partition number, even if another shard needs to be started later for some late data. That’s the problem that the SegmentAllocate API is solving in the Kafka indexing service flow.

Will you please clarify what the effects of re-using the partition number are? For example, I ran a simple test of the kafka indexing task and observed the segments that it created (for a topic with 5 partitions). I set the segment interval to be one minute and sent a few rows of data a few minutes apart so that multiple segments would be created for the same interval

The version of all of the segments were the same (so that they don’t overshadow one another). Whats interesting is that although there are only 5 partitions on the topic, the second set of segments did not reuse the existing partition numbers, it instead double them (partition 0, became partition 6, 1 -> 7 and so on).

Below is the data.

select id, created_date, start, “end”, version, used, encode(payload, ‘escape’) from druid_segments where datasource = ‘testing-kafka’ order by created_date asc;

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_1 | 2017-09-05T15:08:49.982Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/1/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:1,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_1”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_2 | 2017-09-05T15:08:49.984Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/2/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:2,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_2”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_3 | 2017-09-05T15:08:49.985Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/3/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:3,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_3”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_4 | 2017-09-05T15:08:49.987Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/4/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:4,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_4”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z | 2017-09-05T15:08:49.988Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/0/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:0,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_5 | 2017-09-05T15:13:11.314Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/5/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:5,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_5”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_6 | 2017-09-05T15:13:11.315Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/6/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:6,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_6”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_7 | 2017-09-05T15:13:11.316Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/7/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:7,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_7”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_8 | 2017-09-05T15:13:11.317Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/8/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:8,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_8”}

testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_9 | 2017-09-05T15:13:11.318Z | 2017-09-02T16:45:00.000Z | 2017-09-02T16:46:00.000Z | 2017-09-05T15:08:21.230Z | t | {“dataSource”:“testing-kafka”,“interval”:“2017-09-02T16:45:00.000Z/2017-09-02T16:46:00.000Z”,“version”:“2017-09-05T15:08:21.230Z”,“loadSpec”:{“type”:“local”,“path”:“var/druid/segments/testing-kafka/2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z/2017-09-05T15:08:21.230Z/9/index.zip”},“dimensions”:“upc,country_code,item_no,store_no,department”,“metrics”:“quantitySum,priceSum”,“shardSpec”:{“type”:“numbered”,“partitionNum”:9,“partitions”:0},“binaryVersion”:9,“size”:4546,“identifier”:“testing-kafka_2017-09-02T16:45:00.000Z_2017-09-02T16:46:00.000Z_2017-09-05T15:08:21.230Z_9”}

(10 rows)

Druid’s segment identifier is a 4-tuple: (dataSource, interval, version, partitionNum). Druid assumes that any two segments with the same identifier are “the same” and have totally equivalent data. So re-using a partitionNum for the same interval and version will make Druid think that the old and new segments are identical, and in practice will mean that one or the other will be ignored for a given query.

The new Kafka ingestion uses the indexing service SegmentAllocate API to allocate new identifiers, which are guaranteed to be not-yet-used.