Tranquility Kafka - mapping Kafka partitions to Tranquility partitions

Hi,

I’ve been trying out Tranquility to ingest events from Kafka into Druid and had a few questions regarding the same:

(i) For a given topic, how are Kafka partitions mapped to Tranquility partitions (set using “task.partitions”)?

(ii) What happens if Tranquility is configured to have more partitions than the Kafka topic?

For example, if the Kafka topic has X partitions and Tranquility is configured to ingest this topic with X + Y partitions, would the data from the Kafka topic be

(a) distributed across the X + Y tasks that would be created by Tranquility OR

(b) distributed across X tasks, leaving the other Y tasks idle?

(iii) Do all the tasks created by Tranquility share the same Kafka Consumer Group Id?

Please let me know if I’ve missed some documentation that explains these details.

Thanks in advance,

Jithin

Responses inline.

Hi,

I’ve been trying out Tranquility to ingest events from Kafka into Druid and had a few questions regarding the same:

(i) For a given topic, how are Kafka partitions mapped to Tranquility partitions (set using “task.partitions”)?

Kafka partitions and Tranquility partitions are independent and have no direct correlation. Tranquility-kafka will read from all Kafka partitions (or if you have multiple instances will be assigned a subset of those partitions) and will spread the rows across the set of Tranquility partitions using (by default) a hash of the timestamp + dimensions.

(ii) What happens if Tranquility is configured to have more partitions than the Kafka topic?

For example, if the Kafka topic has X partitions and Tranquility is configured to ingest this topic with X + Y partitions, would the data from the Kafka topic be

(a) distributed across the X + Y tasks that would be created by Tranquility OR

(b) distributed across X tasks, leaving the other Y tasks idle?

Expected behavior is (a) as per the answer in (i).

(iii) Do all the tasks created by Tranquility share the same Kafka Consumer Group Id?

By default yes, and you can set the group ID to whatever you’d like in the configuration: https://github.com/druid-io/tranquility/blob/master/docs/kafka.md

Thanks David.

A few follow-up questions:

(iv) How does Tranquility pull data from Kafka?

I have the following setup

  • A Kafka topic with 8 partitions into which a producer is pushing messages at a rate of about 120K msgs/sec.

  • Two tranquility instances (running on separate nodes)

  • When I set task.partitions to 8, tranquility ingests data from the topic at a rate of about 35K msgs/sec

  • Even if I increase task.partitions to 16, the rate remains the same - I couldn’t figure out why.

The middleManager nodes (running the tasks) are neither CPU nor memory-bound.

So, I was wondering if the setup is bottlenecked by the packet reception rate within Tranquility.

Question: Even if I increase task.partitions to scale out the performance, does the rate of pulling records from Kafka still remain constant?

Would it help if I increase the value of “consumer.numThreads” (in the tranquility config file). The current value is 8.

(v) If multiple Tranquility instances are used, what’s the strategy used to divide the partitions (of the same topic) among the various instances?

Hi Jithin,

Sounds like the bottleneck is not in the number of indexing tasks but either a) in the number of tranquility-kafka instances or b) in the number of Kafka brokers. Try increasing the number of tranquility instances to 3 or 4 and see if that makes a difference. If not, add another Kafka broker and see if that helps (I’m not sure how many you currently have). 8 consumer.numThreads should be plenty for 8 partitions.

The dividing of Kafka partitions among consumers is internal to the Kafka implementation and I’m not too familiar with that. The documentation just says that the partitions will be divided so that each consumer receives its “fair share” and partitions will be re-allocated dynamically when the number of consumers changes.

Hi David,

  • I see your point but I doubt if the number of Kafka brokers is the bottleneck. Currently, I have 3 brokers and when I run an independent Kafka consumer, I’m able to consume records as fast as the producer is publishing them (~120K recs/sec). So, I would have expected Tranquility to be able to ingest at similar speeds.

  • Given that the 2 tranquility instances I currently have are neither CPU nor memory-bound, would it help to increase the number of instances?

Thanks,

Jithin

Okay cool, that’s good to know.

Yes, if your bottleneck isn’t the indexing tasks (as task.partitions doesn’t seem to make a difference), increasing the Tranquility instances should help, but having said that, I would still try to get better performance out of your existing instances before scaling them up. Some things to try:

  • set tranquility.lingerMillis to a non-zero value to batch multiple events into a single message. Tune by adjusting druidBeam.firehoseChunkSize and tranquility.maxBatchSize
  • turn on debug logging and possibly tune tranquility.maxPendingBatches if you’re getting log messages saying something like “pendingBatches >= maxPendingBatches, waiting…”
  • set serialization.format to be ‘smile’ which should give you better throughput

Thanks David. I’ll try those suggestions.

Btw, once I turn on debug logging, which log files should I look at to check for messages related to pendingBatches.

The task logs? MiddleManager logs?

Thanks,

Jithin

No, the logs of the kafka-tranquility process.

Adding to the previous post,

I tried increasing “tranquility.maxBatchSize” from 2000 to 10000. However, this resulted in OutOfMemory errors as indicated in the tranquility log:

2016-10-27 22:10:43,918 [KafkaConsumer-2] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:

java.lang.OutOfMemoryError: GC overhead limit exceeded

at org.joda.time.DateTime.withMillis(DateTime.java:598) ~[joda-time.joda-time-2.8.2.jar:2.8.2]

at org.joda.time.DateTime.withPeriodAdded(DateTime.java:942) ~[joda-time.joda-time-2.8.2.jar:2.8.2]

at org.joda.time.DateTime.plus(DateTime.java:997) ~[joda-time.joda-time-2.8.2.jar:2.8.2]

Question:

  • How do I fix this issue? I know I need to allocate more heap memory, but I haven’t been able to figure out which process the heap memory needs to be allocated to and how much.

I tried increasing the heap memory allocated to each tranquility task by adding -Xmx3g to “druid.indexer.runner.javaOpts” in the middleManager runtime.properties file. But this didn’t help.

Could you point me to the right process I need to allocate heap space to?

Thanks,

Jithin

Hey Jithin,

You need to increase the heap of the tranquility-kafka process. Setting druid.indexer.runner.javaOpts=-Xmx3g is more related to druidBeam.firehoseBufferSize.

Keep in mind that your memory consumption will be tranquility.maxBatchSize * (tranquility.maxPendingBatches + 1). How that translates to memory depends on the complexity of your events. 10000 feels a bit high to me, I would start with something lower.

Thanks David. Increasing the heap memory of tranquility-kafka process fixed the issue.

For anyone who faces the same issue, the heap of the tranquility process can be set by providing the -J-Xmx arg to the tranquility cmd (the -J is also required):

Eg: bin/tranquility kafka -J-Xmx6g -configFile conf/tranquility/kafka.json

Thanks,

Jithin