Optimize Druid for performance

Hi,

We’re using Druid in our real time analytics pipeline and would appreciate some help with tuning the configurations for better query performance and stability in general.

We’re using 3 servers with 256GB RAM and 16 physical cores for Druid. 5 machines with HDFS (4 of which are colocated with Druid) all having 1TB of disk space.

We’ve containerized druid. On each of the 3 druid machines, we have

  1. one container that’s running the historical and middle manager processes

  2. one container that’s serving as the broker node

  3. one of the machines is running a container with the coordinator and overlord processes

We are ingesting data from Kafka using Kafka Indexing service. We have 3 topics with 64 partitions each. We assign 12 indexing tasks per topic and have a task duration of 10 mins and segment granularity of 10 mins (are we creating too many segments which is affecting the query performance?).

Find below our cluster configurations. Could someone please review and offer some advice on tuning our cluster?

Thanks in advance,

Avinash

Broker:

jvm configs

-server

-Xms24g

-Xmx24g

-XX:NewSize=6g

-XX:NewSize=6g

-XX:MaxDirectMemorySize=64g

-XX:+PrintGCDetails

-XX:+PrintGCTimeStamps

-XX:+PrintGCDateStamps

-XX:+HeapDumpOnOutOfMemoryError

-XX:HeapDumpPath=/monitor/druid/logs

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=/monitor/druid/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

runtime.properties

druid.service=druid/broker

druid.port=9082

druid.host=

HTTP server threads

druid.broker.http.numConnections=20

druid.server.http.numThreads=50

Processing threads and buffers

druid.processing.buffer.sizeBytes=2147483647

druid.processing.numThreads=7

Query cache disabled – push down caching and merging instead

druid.broker.cache.useCache=false

druid.broker.cache.populateCache=false

Coordinator

jvm configs

-server

-Xms10g

-Xmx10g

-XX:NewSize=512m

-XX:NewSize=512m

-XX:MaxDirectMemorySize=10g

-XX:+UseG1GC

-XX:+PrintGCDetails

-XX:+PrintGCTimeStamps

-XX:+PrintGCDateStamps

-XX:+HeapDumpOnOutOfMemoryError

-XX:HeapDumpPath=/monitor/druid/logs

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=/monitor/druid/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

-Dderby.stream.error.file=/monitor/druid/tmp/derby.log

runtime.properties

druid.service=druid/coordinator

druid.port=8181

druid.host=

druid.coordinator.startDelay=PT30S

druid.coordinator.period=PT60S

Historical

jvm configs

-server

-Xms12g

-Xmx12g

-XX:NewSize=6g

-XX:NewSize=6g

-XX:MaxDirectMemorySize=32g

-XX:+PrintGCDetails

-XX:+PrintGCTimeStamps

-XX:+PrintGCDateStamps

-XX:+HeapDumpOnOutOfMemoryError

-XX:HeapDumpPath=/monitor/druid/logs

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=/monitor/druid/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

runtime.properties:

druid.service=druid/historical

druid.port=9083

druid.host=

HTTP server threads

druid.server.http.numThreads=50

Processing threads and buffers

druid.processing.buffer.sizeBytes=2147483647

druid.processing.numThreads=7

Segment storage

druid.segmentCache.locations=[{“path”:“var/druid/segment-cache”,“maxSize”:130000000000}]

druid.server.maxSize=130000000000

Query cache

druid.historical.cache.useCache=true

druid.historical.cache.populateCache=true

druid.cache.type=local

druid.cache.sizeInBytes=2000000000

Middle Manager

jvm configs

-server

-Xms64m

-Xmx64m

-XX:+PrintGCDetails

-XX:+PrintGCTimeStamps

-XX:+PrintGCDateStamps

-XX:+HeapDumpOnOutOfMemoryError

-XX:HeapDumpPath=/monitor/druid/logs

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=/monitor/druid/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

runtime.properties

druid.service=druid/middlemanager

druid.port=9091

druid.host=

Number of tasks per middleManager

druid.worker.capacity=80

Task launch parameters

druid.indexer.runner.javaOpts=-server -Xmx3g -XX:MaxDirectMemorySize=4096m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

druid.indexer.task.baseTaskDir=var/druid/task

druid.indexer.task.restoreTasksOnRestart=true

HTTP server threads

druid.server.http.numThreads=40

Processing threads and buffers

druid.processing.buffer.sizeBytes=536870912

druid.processing.numThreads=2

Hadoop indexing

druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

druid.indexer.task.defaultHadoopCoordinates=[“org.apache.hadoop:hadoop-client:2.7.3”]

Overlord

jvm configs

-server

-Xms4g

-Xmx4g

-XX:+UseConcMarkSweepGC

-XX:+PrintGCDetails

-XX:+PrintGCTimeStamps

-XX:+PrintGCDateStamps

-XX:+HeapDumpOnOutOfMemoryError

-XX:HeapDumpPath=/monitor/druid/logs

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=/monitor/druid/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

runtime.properties

druid.host=

druid.service=druid/overlord

druid.port=9090

druid.indexer.queue.startDelay=PT30S

druid.indexer.runner.type=remote

druid.indexer.storage.type=metadata

Also, the Druid version I’m using is Druid 0.11.0

Hi Avinash,

We are ingesting data from Kafka using Kafka Indexing service. We have 3 topics with 64 partitions each. We assign 12 indexing tasks per topic and have a task duration of 10 mins and segment granularity of 10 mins (are we creating too many segments which is affecting the query performance?).

It’s hard to say without knowing the data volumes involved - but yes, you are probably creating too many segments. 64 partitions is a lot and it is not likely optimal for a 3 node Druid cluster. You can check their sizes on the coordinator console, and if they are much smaller than 100MB then you could definitely be doing better. In that case you will probably be interested in the new Kafka partition multiplexing coming in Druid 0.12.0, where the number of Druid segments is no longer dependent on the number of Kafka partitions. It really helps in situations where you would otherwise have way too many small segments.

I haven’t fully reviewed your configs but after skimming them I would say look at the following:

  1. Depending on realtime data load you might be able to dedicate more than 7 processors to your historicals (druid.processing.numThreads=7). Check cpu usage on your machine to see if you have the free cpu capacity. This is important to verify since on most clusters the historicals are doing most of the work, and it generally pays to give them as many threads as you can afford.

  2. druid.worker.capacity=80 on your middleManager is way too high: 80 tasks * 3G memory per task = 240G and that is almost all of your memory.

  3. druid.cache.type=caffeine performs better under high concurrency than local.

Btw: the company I work for does offer professional Druid cluster support. Email me off list if you are interested.

Hi Gian,

I tried Druid 0.12.0. It has a batch parsing feature that let’s us read one record from Kafka and map to multiple rows in a table. But I hit a couple of errors when using it (initially a Null pointer exception while the indexing job was logging some data getMetricLongValue and then maxing out memory 10 minutes into the run). Decided to not use Druid 0.12.0 until the stable version is released. I’ve tried the rc1 and rc2 till now.

My segment sizes from the Druid console are around 1.2GB

  1. I’ve assigned 7 process threads for the historicals

  2. I’m using equal distribution selectStrategy. From the druid console I can see max capacity of any worker at any given time is around 20GB so a total of 60GB.

  3. I haven’t tried cache type caffeine. Thanks for the pointer.

I would be interested in getting touch with you for professional support. What’s the best way to reach you?

Thank you,

Avinash

Hi Avinash,

About 0.12.0: if you have time to try rc3, that would be super helpful, since it will verify if the bug you ran into was fixed or not. If you can, please test the version from http://druid.io/downloads.html (http://static.druid.io/artifacts/releases/druid-0.12.0-rc3-bin.tar.gz) as this is the current release candidate build and will very closely resemble the final release.

About segment sizes: do you mean 1.2GB per segment (individual file) or per time chunk (hour, or day, or what have you)? If it’s per individual segment file then that sounds totally fine - maybe even a bit on the high side - and so I don’t think small segments are your problem. But if it’s per time chunk, you should drill into that and see how big the individual segment files are. In the coordinator console you do this by clicking on the time chunk.

For (1) I was trying to say that you might be able to get away with giving the historicals more than 7 threads. It’s at least worth looking into how much CPU you have spare on the boxes when the system is under query load: ideally it should be near 100% meaning you are maxing out the hardware. I am not sure if 7 is the right number or not but checking into CPU use should tell you.

To reach me directly just email me at the email I use for this group (gian@imply.io).

Gian,

  1. I was looking at the wrong sizes. What I reported was per time chunk. Upon drilling down I found them to be very low. The recommended value is around 500-700MB per segment?

  2. My servers are constantly using 70% CPU on average. This is when I’m not constantly querying the system either.

  3. Unfortunately most of my queries are group by queries. We rewrote the ones that could be made a topN and get decent performance from them but I guess high number of segments sizes are hurting our performance.

What’s the best way to achieve optimum segment sizes? Is it just trial and error with adjusting the task duration and segment granularity?

Sure, I’ll try out the latest release candidate some time. Just a little hesitant since I’m looking for something more battle tested. Do we have a tentative date for the stable release?

Hi Avinash,

With Kafka indexing, odds are it is impossible in your situation (1.2GB of data spread over 64 partitions) to achieve optimum segment sizes upfront pre-0.12.0. Pre 0.12.0 you can reindex/compact your data to achieve good sizes after the fact: basically do an “index” task with an “ingestSegment” firehose which will repartition the data. And post 0.12.0 it should be totally possible to get good sizes upfront, where it’d mostly be a matter of selecting the right taskCount and segmentGranularity.

The 0.12.0 stable release should be any day now (there is a vote going on in the development list). But even so, it’s valuable for the release candidates to be tested by members of the community such as yourself. That’s a big part of how they become battle tested for a wide variety of situations beyond what is covered by in-house testing.