KIS: Queries and help with using Kafka-Indexing-service

Hey,

We are using tranquility in production and we intend to move to kafka-indexing-service. Here is my understanding of the same.

Given a supervisor spec, the supervisor spawns and creates a task which listens for events from the specified kafka topic, pulls the produced events onto the middle manager locally and places them into partitions just like in a kafka topic. After maxRowsPerSegment are hit or after taskDuration is hit, the local partitions of middle manager are combined into segments and are handed over to deep storage which are then pulled by historical. This data can be queried in the entirety of the flow. After the handoff is complete a new task is spawned, old task is killed and the new task takes over the offsets of the kafka topic to read from and resumes the above flow again.

Please correct me if I am wrong in understanding this. I have a couple of questions here.

  1. What would be the consumer group name with which kafka is approached?
  2. What happens if my kafka index task is in the pending queue and there are events streaming onto my kafka topic? Are the previouly read offsets known to the task in pending so that it can resume from where it needs to?
  3. What happens if any older events are produced, older events meaning the segment is already handed over and pulled by historical, in which case I see that the event is appended to the segment but is a new shard created every time?
  4. I have multiple topics onto which data is produced and I intend to enable kafka-indexing service to ingest data in real time from all of those topics, do I have to have a supervisor for each of the datasources and won’t this be a costly operation?

Thank you for the help.

–Yasasvee.

Hi. I’m just a relatively new user who happens to have tried to learn how the KIS works, not a major Druid developer, so here’s my best attempt at being helpful — hopefully a core contributor can confirm or deny :slight_smile:

Hey,

We are using tranquility in production and we intend to move to kafka-indexing-service. Here is my understanding of the same.

Given a supervisor spec, the supervisor spawns and creates a task which listens for events from the specified kafka topic, pulls the produced events onto the middle manager locally and places them into partitions just like in a kafka topic. After maxRowsPerSegment are hit or after taskDuration is hit, the local partitions of middle manager are combined into segments and are handed over to deep storage which are then pulled by historical. This data can be queried in the entirety of the flow. After the handoff is complete a new task is spawned, old task is killed and the new task takes over the offsets of the kafka topic to read from and resumes the above flow again.

Please correct me if I am wrong in understanding this. I have a couple of questions here.

  1. What would be the consumer group name with which kafka is approached?

KIS doesn’t really use Kafka consumer groups — it uses the version of the KafkaConsumer API that allows each member to specifically decide which partitions to read, and it persists offsets in Druid-specific locations (the metadata probably-SQL database) rather than in Kafka or Zookeeper. (It looks like the “supervisor” task inside the overlord sets up a consumer group with the name kafka-supervisor-RANDOMID which it uses to watch the metadata of the topic — I’m not sure why that doesn’t show up when I list consumer groups with kafka-consumer-groups.sh.)

  1. What happens if my kafka index task is in the pending queue and there are events streaming onto my kafka topic? Are the previouly read offsets known to the task in pending so that it can resume from where it needs to?

Because Druid tracks the offsets when a task succeeds, your new task (when it starts) will start at the right place.

  1. What happens if any older events are produced, older events meaning the segment is already handed over and pulled by historical, in which case I see that the event is appended to the segment but is a new shard created every time?

Yes, the new task will add more shards (not 100% sure if “shard” is the right word) to the older time interval. Note that you really really really should ensure that any events that fall into your KIS’ earlyMessageRejectionPeriod/lateMessageRejectionPeriod fall into your historical’s load rules, or else your KIS task will time out waiting for historicals to load the segments at the end. I think this may be improved in the next release. See https://github.com/apache/incubator-druid/issues/5868

  1. I have multiple topics onto which data is produced and I intend to enable kafka-indexing service to ingest data in real time from all of those topics, do I have to have a supervisor for each of the datasources and won’t this be a costly operation?

I believe KIS only supports specifying a single topic per ingestion supervisor, yes.

–dave

Hey David,

Thank you for the response.

1 & 2) Yes, even I dont seem to find it when I list the consumer groups nor in the kafka-manager, expecting something like
“kafka-supervisor-%s”, RandomIdUtils.getRandomId()

``

3 & 4) It would be interesting to know of any ways to approach this, if we could give a topic pattern as we have n number of topics increasing by the day and also I am not sure how big of a performance hit it is if the number of shards[or buckets/partitions] keep increasing for events with older time stamp.

Thank you,

–Yash.

Hey Yash,

Druid’s Kafka integration doesn’t use consumer groups; the “fake” one in KafkaRecordSupplier is just to make the library’s validation of group.id work out. You wouldn’t expect it to show up in Kafka-side monitoring.

For (3) a new segment (shard) will get created if you get new events for an older time chunk after a while has passed. Automatic compaction (http://druid.io/docs/latest/design/coordinator.html#compacting-segments) helps with this.

For (4) you do need one supervisor per topic. The model currently is one topic = one supervisor = one datasource. It would potentially be interesting to loosen that up in the future (so multiple topics can go to one datasource, or one topic can go to multiple datasources) but it is not currently doable. Some people work around this by doing the necessary shuffling on the Kafka side. (Create new topics and mix data into them.)

Gian

Thank you for the help Gian and David, much needed and appreciated!

–Yash