Tranquility performance tuning

Hi,

I’m fairly new to Druid. I’ve been setting up a pipeline where Druid ingests data from Kafka using Tranquility.

I’ve been trying to figure out the important knobs/control params that would help in tuning the ingestion performance (# of events ingested per sec) of Druid.

Question:

Is there a document or post(s) that explain:

(a) how performance bottlenecks can be identified (probably using Druid metrics)?

(b) what config parameters could be used to tune the performance?

Thanks,

Jithin

Hey Jithin,

For kafka-tranquility, the reference for the tuning configs is here: https://github.com/druid-io/tranquility/blob/master/docs/configuration.md. Some of the more interesting ones are setting druidBeam.firehoseBufferSize (combined with tuning the peon heap size using druid.indexer.runner.javaOpts in the middlemanager runtime.properties), tranquility.maxBatchSize, and tranquility.maxPendingBatches.

Something else that might give you better performance is using the CMS garbage collector instead of G1GC on the peon nodes (set druid.indexer.runner.javaOpts=-XX:+UseConcMarkSweepGC in middlemanager runtime.properties).

And yes, the ingest/, jvm/, and sys/ type metrics would be helpful in figuring out where your bottleneck might be.

Having said that, if you’re just starting out with Kafka ingestion, have you taken a look at the Kafka indexing service: http://druid.io/docs/0.9.1.1/development/extensions-core/kafka-ingestion.html? It won’t necessarily make tuning easier, but it does have some nice features not available in kafka-tranquility such as exactly-once ingestion, no window periods, and no separate processes required.

Thanks David.

Yes. I had come across the Kafka indexing service. However, I think I read somewhere that it’s still an experimental feature.

So I wasn’t sure whether it’s robust enough or not.

Questions:

(i) Is there an ETA on when this would be upgraded from ‘experimental’ status to production-ready?

(ii) By design, would the Kafka indexing service provide better ingestion throughput compared to Tranquility?

Thanks,

Jithin

Hey Jithin,

Yes, the Kafka indexing service is still tagged as experimental. Having said that, it’s been out for about 4 months now and seems relatively stable based on forum reports. There’s a number of improvements that have been made to it in 0.9.2, and those changes’ll need some time to be validated in the wild as well. If those prove to be stable, I think it’s reasonable to expect the experimental tag to be removed in a few months, perhaps in 0.9.3.

I’m not actually sure how ingest throughput compares, but my guess is that it should be very similar.

Hi David,

The documentation for the Kafka indexing service mentions that the max number of tasks it would spawn (taskCount) is the number of Kafka partitions.

Doesn’t that mean that the scalability of the indexing service would be limited by the number of Kafka partitions?

Is there a way to spread the records read from one partition across multiple cores?

Thanks,

Jithin

The expectation is that you would increase the number of Kafka partitions as necessary to achieve the desired throughput as there isn’t that much overhead in adding additional Kafka partitions. There is currently no way for multiple tasks to coordinate to handle a single partition.

Thanks David.

Btw, I did notice that the single-node ingestion performance of my setup increased by about 2.5x when I switched from Tranquility to Kafka indexing service.

Just putting it out there as a reference for anyone trying to scale Tranquility-Kafka ingestion throughput - it might be better to try the Kafka indexing service instead.

Thanks,

Jithin

Thanks for reporting back, glad to hear things seem to be coming along.

Hi David,

As I mentioned earlier, I’ve switched to using the Kafka indexing service. One difficulty I’ve been facing is that I’m unable to find the logs of failed tasks. Hence, I haven’t been able to diagnose why tasks fail.

I’ve been monitoring the tasks via http:<OVERLORD_IP>:8090/console.html.

Although it lists the failed tasks, clicking on the ‘log’ link of a failed task merely takes me to a blank page.

The cluster is set up to use HDFS to store the logs.

Any ideas?

Thanks,

Jithin

Some questions:

Are you able to retrieve the logs of the successful tasks?
Do you see the logs for the failed tasks in HDFS?
When you have a failed task and there aren’t any log files generated, are there any exceptions in the middle manager / overlord logs?

  • Yes. I’m able to see the logs of the successful tasks

  • On HDFS, I see the folders for the failed tasks. But I don’t see the log files.

  • I didn’t see any exception in the middleManager/overlord logs.

I’ll run the experiment once more and let you know if any of the answers changes.

Btw, I’m using a custom parser to parse the input rows. If the parser hits an exception, where would it be logged? In the middleManager logs?

Thanks,

Jithin

Okay, let me know.

No, parser exceptions would show up in the task logs.

I ran the experiments a couple of more times and here are the answers:

  • Are you able to retrieve the logs of the successful tasks?

Yes. I’m able to retrieve the logs of the successful tasks. In fact, I realized that the failure logs are also there. Its just that they’re empty.

  • Do you see the logs for the failed tasks in HDFS?

Yes. But the log files are empty

  • When you have a failed task and there aren’t any log files generated, are there any exceptions in the middle manager / overlord logs?

(a) I saw the following exception in the overlord log:

2016-11-01T21:11:31,561 WARN [KafkaSupervisor-crs_datasource_1-0] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Task [index_kafka_crs_datasource_1_227940bb80823fc_gllobjio] failed to return start time, killing task

  java.lang.RuntimeException: java.net.ConnectException: Connection refused

  at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]

  at io.druid.indexing.kafka.KafkaIndexTaskClient.submitRequest(KafkaIndexTaskClient.java:328) ~[druid-kafka-indexing-service-0.9.1.1.jar:0.9.1.1]

(b) And the following in the middleManager log:

     2016-11-01T21:04:50,048 INFO [forking-task-runner-10] io.druid.indexing.overlord.ForkingTaskRunner - Exception caught during execution

     java.io.IOException: Stream closed

     at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) ~[?:1.8.0_101]

     at java.io.BufferedInputStream.read1(BufferedInputStream.java:291) ~[?:1.8.0_101]

Does this mean that the overlord is unable to connect to the Kafka cluster?

But from outside druid, I’m able to connect to the Kafka cluster from those same overlord and middleManager nodes

Thanks,

Jithin

Hey Jithin,

Okay that’s helpful. I wonder if the reason the log files are empty is that the process gets hard killed and the logs never get properly flushed to HDFS.

Anyway, the exception you’re seeing happens when the tasks take too long to start up and are killed by the supervisor. It’s actually a bug in 0.9.1.1 (the supervisor should wait longer before killing unresponsive tasks) that has been fixed for 0.9.2. Are you able to try out the 0.9.2 release candidate and see if that fixes your problem? You can get the latest RC here: http://druid.io/downloads.html

Thanks David.

Would it be sufficient if I replace the druid-kafka-indexing-service extension folder from the 0.9.1.1 package with the one in 0.9.2?

Or do I need to replace the whole package?

I have a pipeline set up using the imply-1.3.0 package and I would prefer to make minimal changes. Hence the question.

Thanks,

Jithin

I tried replacing just the Kafka indexing service extension folder in my previous setup with the one from druid-0.9.2.
But now the tasks fail due to this error:

java.lang.NoSuchMethodError: io.druid.segment.realtime.firehose.ChatHandlerProvider.register(Ljava/lang/String;Lio/druid/segment/realtime/firehose/ChatHandler;Z)V
	at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:244) ~[?:?]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCal

So, I guess I have to replace the whole package (instead of just the druid-kafka-indexing-service folder)?

Thanks,

Jithin

Yeah looks like you’ll have to replace the whole package. You should be able to extract the druid-0.9.2-rc1 bundle and just replace the contents of imply/dist/druid/ and I believe it’ll work. According to the draft release notes (https://github.com/druid-io/druid/issues/3503) there aren’t any relevant backwards-incompatible changes.

Thanks David!

Replacing imply/dist/druid with the druid-0.9.2-rc1 folder resolved the issue.

The tasks now run fine and I’m able to query the data.

Btw, kafka-indexing service guarantees exactly-once semantics right?

Or are there any corner cases where data loss could occur?

I noticed that the number of items druid seems to have stored is a bit lower than the number of items that were published to Kafka.

So I was wondering if its an issue with my Kafka setup or if its possible for druid kafka indexing service to drop rows.

I didn’t see any parse-related exceptions [reportParseExceptions has been set to true].

Thanks again,

Jithin

Yes, the Kafka indexing service guarantees exactly-once and there shouldn’t be any corner cases (but if you encounter one please let us know!). The indexing tasks will stop ingesting and start throwing failures if something happens such as messages getting evicted from Kafka for being too old before they could be read.

What query are you running to check the number of items ingested into Druid? Be sure to use a longSum type query vs. a count type when querying your count aggregation as Druid may have rolled up multiple events into a single row.

I’ve been using PlyQL (included in the imply package) to run the following query:

'SELECT COUNT(*) FROM ’

Not sure whether this translates into a longSum query or a count type query though.

Do you know what Druid query PlyQL translates this SQL query into?

Thanks,

Jithin