Kafka index task replicas and taskCount

Hi,

I am trying to understand a bit better the meaning of the replicas and taskCount for the new kafka indexer (kafka supervisor).

If I have for example a kafka topic with 10 partitions and I set taskCount = 1, does that mean that single kafka index task would consume all the 10 partitions ?

If I set taskCount = 10, does that mean that druid would create separate kafka index task for each partition in the kafka topic ?

If my worker capacity is 3 tasks, and I have 5 works (nods), does that mean that those 10 tasks would be distributed to the different workers ?

What about the replicas parameter, what exactly its controls and how does it help ?

does the replicas parameter creates same kafka index task on another worker ? If so, which replica would be published eventually to the deep storage ?

Thanks,

Hi Igor,

“If I have for example a kafka topic with 10 partitions and I set taskCount = 1, does that mean that single kafka index task would consume all the 10 partitions ?”

[Ben] Yes

“If I set taskCount = 10, does that mean that druid would create separate kafka index task for each partition in the kafka topic ?”

[Ben] Yes

“If my worker capacity is 3 tasks, and I have 5 works (nods), does that mean that those 10 tasks would be distributed to the different workers ?”

[Ben] Yes

"What about the replicas parameter, what exactly its controls and how does it help ?

does the replicas parameter creates same kafka index task on another worker ?"

[Ben] Yes it creates a duplicate of all kafka indexing tasks for that datasource. This is useful because these serve queries, so a) you can serve more concurrent queries by having multiple replicas, and b) if you lost a node or nodes you have a higher chance of not having any moment in time where the data processed so far is unavailable to query.

If so, which replica would be published eventually to the deep storage ?"

[Ben] Whichever one wins. :smiley: The handoff is threadsafe so what you will see is that some tasks will fail because the lock was acquired and the segment already published by another task.

Best,

–Ben

Thanks for the answers.

is it possible some how to control on which nodes the replicas would start ?

In our case we have cloud infrastructure with 3 zones and the druid cluster would have (for example) 3 nodes in each zone, to make sure

that if 1 zone completely down, the 2 others can still serve data.

Can I somehow configure that when using kafka indexer with replica=3, each zone should have at least 1 runnig kafka indexer task ?

Same question for loading segments into historical nodes, how can I control that same segment is loaded for at least 2 out of 3 zones ?

Thanks,

Hi Igal,

Not out of the box, but if you are interested in contributing something, try looking at WorkerSelectStrategy and its subclasses.

Hi Gian,

thanks for the answer.

How actually Druid manage failed tasks, whether its kafka indexer or hadoop batch,

especially in the case when the executing node (middle manager) goes down ?

Would Druid start a new instance of the failed task on another middle manager ?

Thanks,

Hi Igal,

If the ingestion is “supervised” (like Kafka indexing) then Druid will start a new task to replace failed ones. If it is “unsupervised” (like Hadoop batch) then it’s up to the user (you) to resubmit failed tasks if you want to retry them.