Kafka topic partition assignment and Realtime replication/sharding

Hi - sorry if this is a duplicate post, I posted this last Friday but it didn’t seem to go through email…

We’re trying to get a better understanding of using Druid Realtime nodes with the Kafka firehose. There are several mailing list threads discussing potential issues with this setup and we just want to double-check some things.

So it sounds like if you have 1 Realtime node consuming 1 Kafka topic (i.e. no replication and no sharding of Realtime nodes) then everything is OK. Also, if you have 2 Realtime nodes in the same group.id but with different shard partitionNums then this is also OK.

Now let’s say there are 4 Realtime nodes with 2 per group.id (for replication) and using 2 shard partitions. Here’s a diagram: http://imgur.com/IesG0Wt.

[1] & [2] say that Kafka partition assignment may assign different topic partitions to the same Druid shard partition in the different consumer groups. It seems like a workaround to this problem would be to set the consumer.id in each Realtime to something containing the Druid shard partition number. The Kafka docs for consumer property partition.reassignment.strategy default “range” [3] seem to imply that as long as a) the group.ids have the same number of consumers and b) consumer.ids in the separate group.ids lexicographically sort the same way then the topic partition assignment should be the same across group.ids. The diagram [4] shows an example of this.

Does this seem accurate? Aside from the operational pain of maintaining all of these group.id & consumer.id settings, this seems like a way to do Realtime sharding & replication with the Kafka firehose such that shards with same partitionNum contain data from the same Kafka topic partitions.

Or are we missing some details?

Thanks,

Zach

[1] https://groups.google.com/d/msg/druid-development/aRMmNHQGdhI/muBGl0Xi_wgJ

[2] https://groups.google.com/d/msg/druid-development/_y_lb3t1IZg/PXDa-NTWm8oJ

[3] http://kafka.apache.org/documentation.html#consumerconfigs

[4] http://imgur.com/IesG0Wt

Hmm, really interesting idea!

I’m not sure but I think that this strategy would have problems when one of the replicas goes down. Then one consumer group will have N kafka consumer threads, and the other will have N - 1, so I think they won’t distribute data the same way.

I think the behavior you’d want is that if a replica goes down temporarily, that consumer.id is still “reserved” somehow and its partitions would not get picked up by other consumers. I’m not sure if this is possible though.

I totally agree - if a Realtime node stops and will be restarted “soon” (e.g. crash, upgrade) you want to prevent a Kafka consumer rebalance. The docs for the consumer config zookeeper.session.timeout.ms [1] state:

“ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.”

We haven’t experimented with this config, but it seems like whatever you set it to, then a Realtime node can be down for that long before its Kafka topic partitions will be rebalanced to other Realtime nodes in the same consumer group.id. If the Realtime node starts again before that timeout, then it seems like it should retain its Kafka topic partitions and start consuming from the last checkpointed offsets, and everything would be OK. Needs testing though.

[1] http://kafka.apache.org/documentation.html#consumerconfigs

I’d be interested in knowing how that turns out, if you have a chance to try it.

Fwiw, we are intending to revamp Kafka ingestion in the future to not use the high level consumer. But the behavior of the high level consumer is still interesting right now, since that’s what we’re using now :slight_smile: