Druid.worker.capacity configuration

I am having a question regarding “druid.worker.capacity” configuration, according to druid documentation, this should be configured to Number of CPUs on the machine - 1, and if I am using i3en.2xlarge which has 8 vCPUs, does it mean that I need configure this to 7?

Thanks for any help.

Correct, but may we please have a bit more context? Are you working on a provisioning issue?

That is a rule of thumb if you are using this i3en.2xlarge as a Middle Manager only. If you plan to colocate Middle Manager and Historical services on this one node, then you should drop it down to (cores/3) so about 3 for your instance.

I am working on a druid update, and noticed that the middlemanager ONLY takes 3 tasks once, and other tasks are in pending state, so I want to see if I can increase it to speed up indexing time.

we have separate nodes for Historical services .

If you have separate nodes, please go ahead and increase the slots to 7. You can test and go a little higher if your use case permits.

I am getting a lot of index errors after increasing to 7 (Task execution process exited unsuccessfully with code[137]. See middleManager logs for more details…), but there are not clear errors in logs, I suspect it is OOM issue, so the process get shutdown !

Yes. That’s probably an OOM. What are your JVM settings for the MM and for the Peons in druid.indexer.runner.javaOptsArray which define JVM settings for the workers? If you don’t specify the Peon JVM settings they take their settings from the MM’s JVM settings. If you have 7 workers per MM, this means one JVM for MM and one for each worker, that’s 8 total. The total of all their memory limits should not exceed the memory of the server.

1 Like

Thanks, Sergio, I am using i3en.2xLarge which has 64G, if I am running 7 worker nodes plus MM itself, does it mean roughly I should set Xmx to 8G for the Peons workprocess.

This is our Druid 0.12 setting before I did the upgrade to Version 24 which sets Xmx to 8G MaxDirectMemorySize to 50G

druid.indexer.runner.javaOpts=-server -Xmx8g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:MaxDirectMemorySize=50g

Just to confirm, you have multiple i3en.2xLarge that are each used solely as MMs. Right?
maxDirectMemorySize is way too high, but it is a limit so I don’t think it will affect anything. Let’s break this down according to the tuning guide.
MM’s JVM should not require much memory - Heap = 256M should be plenty

The workers will need both Heap and Direct Memory, the total of both multiplied times the number of workers (7) will be the total memory used on the node. Since you are only running MMs on these nodes, you can split up all the memory among them minus the MM’s JVM.

Are you using lookups? Those will be loaded into heap of each worker, it can use up to 2 times the total size of the lookups.

You’ll need to set the processing parameters accordingly:

"druid.indexer.fork.property.druid.processing.buffer.sizeBytes": 300000000
"druid.indexer.fork.property.druid.processing.numMergeBuffers": 2,
"druid.indexer.fork.property.druid.processing.numThreads": 2,

and the direct memory estimate for each task will be:
(druid.processing.numThreads + druid.processing.numMergeBuffers + 1) * druid.processing.buffer.sizeBytes
Using the example settings above, that is 1.5 GB perhaps use 2GB for good measure.

You’ll also want to setup enough of http threads for internal requests and query processing coming from the brokers:

For Tasks, druid.server.http.numThreads should be set to a value slightly higher than the sum of druid.broker.http.numConnections across all the Brokers in the cluster.

As a rule of thumb set it to:
druid.server.http.numThreads = SUM(druid.broker.http.numConnections across all the Brokers) + 10

All that said, I think you have plenty of memory to do something like 9GB total per worker:

druid.indexer.runner.javaOpts=-server -Xms7g -Xmx7g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:MaxDirectMemorySize=2g

Let us know how it goes.

1 Like

Hello, Thanks for the reply, Sergio,

I am seeing this memory error after reducing the direct memory to 4G, I have about 400 columns for the dataset, will that cause the issue?

2022-11-07T13:27:17,051 INFO [[index_parallel_c32b3a3b-aecf-4372-9603-fb168606c668_pdoddeep_2022-11-07T12:18:34.670Z]-appenderator-merge] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Preparing to push (stats): processed rows: [5000000], sinks: [1], fireHydrants (across sinks): [76]
2022-11-07T13:28:41,829 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.common.task.IndexTask - Encountered exception in BUILD_SEGMENTS.
java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Direct buffer memory
	at org.apache.druid.indexing.common.task.IndexTask.generateAndPublishSegments(IndexTask.java:1035) ~[druid-indexing-service-24.0.0.jar:24.0.0]
	at org.apache.druid.indexing.common.task.IndexTask.runTask(IndexTask.java:526) ~[druid-indexing-service-24.0.0.jar:24.0.0]
	at org.apache.druid.indexing.common.task.AbstractBatchIndexTask.run(AbstractBatchIndexTask.java:187) ~[druid-indexing-service-24.0.0.jar:24.0.0]
	at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.runSequential(ParallelIndexSupervisorTask.java:1199) ~[druid-indexing-service-24.0.0.jar:24.0.0]
	at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.runTask(ParallelIndexSupervisorTask.java:532) ~[druid-indexing-service-24.0.0.jar:24.0.0]
	at org.apache.druid.indexing.common.task.AbstractBatchIndexTask.run(AbstractBatchIndexTask.java:187) ~[druid-indexing-service-24.0.0.jar:24.0.0]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:477) ~[druid-indexing-service-24.0.0.jar:24.0.0]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:449) ~[druid-indexing-service-24.0.0.jar:24.0.0]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_352]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352]

With a large number of columns there are other considerations in direct memory usage:

  • Decompression will require about (64k * <column count> * number of segments read)
  • Dictionary Merge will require <column cardinality> * 4 bytes for each column.

So maybe reduce the Heap a bit and increase the direct memory size… perhaps this is the reason it was set so high before. you could try Heap at 5GB and Direct memory at 4GB.

Also one other suggestion. If this is batch ingestion, you might want to try SQL Based Ingestion which is faster and a lot easier to tune.

1 Like

Sergio and Vijeth, really appreciate all the help here, I also worked one co-worker who used to work at Imply, you guys are awesome to help the community!!!

1 Like