Druid middle manager only using one core on native index parallel

Hello, recently went to 0.17.0 version, wanting to try native index parallel task for ORC format.
My cluster is 2 masternodes and 10 datanodes. Datanodes have 128 GB RAM and 32 vcpus

My daily dataset is 110 GB, i am running the task with “maxNumConcurrentSubTasks”: 10 and “forceGuaranteedRollup”: true.

However the performance is lacking alot compared to a hadoop cluster, specifically, it takes 20 hours to finish. I suspect it has to do with underutilization of cores on the middle manager.

I am attaching an htop view from one of my datanodes.

Also attaching my spec without the s3 paths.

Is this performance expected? Or am i missing something from my spec file?

new_druid_spec.json (6.68 KB)

Hi,

I tested the native parallel task reading 100GB of CSV files before. I don’t remember exactly how long it took, but it was faster than 20 hours. I think it was about 5 - 6 hrs which I think it’s still slow.

The cpu usage does look strange. How many task slots does each middleManager have? It’s druid.worker.capacity in the middleManager runtime.properties file (https://druid.apache.org/docs/latest/configuration/index.html#middlemanager-configuration).

Jihoon

Hi,

Each middleManager has 3 task slots. I dont know maybe that mine is in ORC format is causing the extra time? 15 hours diff is pretty huge. Might be making something wrong

I tried a smaller scale job (10 files, total size 5 GB) and it still took 53 minutes to finish.
Partition spec type is hashed

I tried increasing my worker.capacity and active tasks running limit, in an effort to have subtasks running for all my files (240 files daily approximately) but unfortunately after adding more than 4,5 entries in S3 prefixes, the tasks are not even starting. This is the error i am seeing in druid logs:

java.lang.RuntimeException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Cannot': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"Cannot find any task with id: [partial_index_generate_partition_1h_utc=2020-02-02-00_fddoeihe_2020-02-03T12:45:14.137Z]"; line: 1, column: 7]
	at org.apache.druid.client.indexing.HttpIndexingServiceClient.getTaskStatus(HttpIndexingServiceClient.java:266) ~[druid-server-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.submit(TaskMonitor.java:243) ~[druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexPhaseRunner.submitNewTask(ParallelIndexPhaseRunner.java:268) ~[druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexPhaseRunner.run(ParallelIndexPhaseRunner.java:142) ~[druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.runNextPhase(ParallelIndexSupervisorTask.java:279) ~[druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.runHashPartitionMultiPhaseParallel(ParallelIndexSupervisorTask.java:552) ~[druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.runMultiPhaseParallel(ParallelIndexSupervisorTask.java:541) ~[druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.runTask(ParallelIndexSupervisorTask.java:448) ~[druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.common.task.AbstractBatchIndexTask.run(AbstractBatchIndexTask.java:138) ~[druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.17.0.jar:0.17.0]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.17.0.jar:0.17.0]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Cannot': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"Cannot find any task with id: [partial_index_generate_partition_1h_utc=2020-02-02-00_fddoeihe_2020-02-03T12:45:14.137Z]"; line: 1, column: 7]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2868) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1914) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:773) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4340) ~[jackson-databind-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4189) ~[jackson-databind-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3205) ~[jackson-databind-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3188) ~[jackson-databind-2.10.1.jar:2.10.1]
	at org.apache.druid.client.indexing.HttpIndexingServiceClient.getTaskStatus(HttpIndexingServiceClient.java:258) ~[druid-server-0.17.0.jar:0.17.0]
	... 14 more

Hmm, the error looks very suspicious. Could you search for the task ID (partial_index_generate_partition_1h_utc=2020-02-02-00_fddoeihe_2020-02-03T12:45:14.137Z) in the overlord logs and check whether the task was started or not?

I tried a smaller scale job (10 files, total size 5 GB) and it still took 53 minutes to finish.

Partition spec type is hashed

How many “partial_index_generate” and “partial_index_merge” tasks were created to process this input? Also, was the partitionsSpec same with the one you attached? You may want to reduce numShards for the smaller input.

Regarding the original question about ingesting large input, I see “maxNumConcurrentSubTasks” was set to 10 in the ingestion spec, which means the “index_parallel” task will spawn up to 10 sub tasks (“partial_index_generate” and “partial_index_merge”) at any time. Since the default worker selector strategy of the overlord is “equal distribution” (https://druid.apache.org/docs/0.17.0/configuration/index.html#worker-select-strategy), each middleManager will be assigned only one task. You can increase “maxNumConcurrentSubTasks” to use more cores.

Each middleManager has 3 task slots. I dont know maybe that mine is in ORC format is causing the extra time?

AFAIK, ORC files should be faster than CSV when they store the same data since they are smaller.

Hmm, the error looks very suspicious. Could you search for the task ID (partial_index_generate_partition_1h_utc=2020-02-02-00_fddoeihe_2020-02-03T12:45:14.137Z) in the overlord logs and check whether the task was started or not?

(I am assuming you mean coordinator logs?) I checked coordinators logs but couldnt find anything about that specific task.
I see a bunch of
Unknown task completed: partial_index_generate_test2_okgjnifo_2020-02-03T14:27:02.860Z

``

Which i guess is what i should be seeing for the tasks that are completing normally.

So, i did some test runs and i can reproduce the above error, if i add more than 3 s3 folders in the S3 prefixes on UI. Each folder contains 10 files, so it tries to spawn 30 partial_index_generate tasks. This fails with the error i mentioned above.

It works as expected with 2 or one prefix,(aprox. 20 files). Is there a possibility that perhaps S3InputSource tries to start the partial_index_generate subtasks before it actually finishes listing all the contents of the prefixes? it kinda makes sense for me that its an issue with aws s3 sdk connection and the big amount of prefixes i add (24 folders, each folder has 10 files) given that it works perfectly for a few prefix entries.

Hmm, I meant the overlord logs unless you’re enabled the single master mode (“druid.coordinator.asOverlord.enabled” = true).

It works as expected with 2 or one prefix,(aprox. 20 files). Is there a possibility that perhaps S3InputSource tries to start the partial_index_generate subtasks before it actually finishes listing all the contents of the prefixes? it kinda makes sense for me that its an issue with aws s3 sdk connection and the big amount of prefixes i add (24 folders, each folder has 10 files) given that it works perfectly for a few prefix entries.

This sounds very strange. S3InputSource should do nothing with the parallel indexing. In fact, the index_parallel task will create one partial_index_generate subtask per s3 file and issue them to the overlord to run in middleManagers. AFAIK, it’s not possible that
the index_parallel task runs subtasks without issuing them to the overlord. Since all tasks submitted to the overlord should be logged, it’s very strange that you are seeing the logs of “Unknown task completed”. As long as you didn’t update the metadata store manually, this shouldn’t happen. I suspect that there might be two overlords in the same cluster and both thinks it’s the leader. Can you double check your cluster configuration?

I do in fact have the “druid.coordinator.asOverlord.enabled” set to true.

My Cluster is setup as follows:

14 machines total split to two datacenters:
1 master per dc (2 total)
6 datanodes per dc (12 total)

master nodes have:

  • Coordinator
  • Router
  • Broker

Datanodes have:

  • Historical
  • Middle Manager.

So if its not an issue with S3InputSource and there is not a technical limit on how many s3 files it can consume, i m more inclined to believe i am running into some memory issues if the middle managers cannot start that many partial_index_generate subtasks.

Problem is the error i am seeing is not guiding me into a specific configuration, most likely peon related

So if its not an issue with S3InputSource and there is not a technical limit on how many s3 files it can consume, i m more inclined to believe i am running into some memory issues if the middle managers cannot start that many partial_index_generate subtasks.

Hmm, no matter what error happens in the middleManager or the task, the overlord should be aware of all tasks submitted. Even if the middleManager couldn’t run the assigned task for some reason, the Overlord who assigned that task should know what the task is.

Unknown task completed: partial_index_generate_test2_okgjnifo_2020-02-03T14:27:02.860Z

But, this log means there was a task completed in a middleManager but unknown to the overlord (https://github.com/apache/druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java#L482).

All tasks submitted to the overlord are stored in both the metadata store and overlord memory. The overlord memory is a sort of cache of the metadata store and periodically refreshed. The above error can happen only when there was a task missing in the overlord memory cache (the “tasks” list in TaskQueue). This usually never happens and AFAIK, the only exception allowing this error is when there are two Overlords in the same cluster.

So my original error
Cannot find any task with id:

``

could be due to the fact i have two coordinators? (running as overlords)

But shouldn’t the leader check take care of that? i checked and only one of them is registered as a leader

I’m not sure what else could lead to this error. However, I’m pretty sure that the error isn’t related to any particular task type or input source.

But shouldn’t the leader check take care of that? i checked and only one of them is registered as a leader

Yes, the leader check should take care of it, but I saw sometimes there can be a “split-brain” case (two overlords think each of them is the leader at the same time) which I don’t know the root cause yet.

I’m not sure what I can suggest… maybe worth to double check all your cluster configurations (especially with ZooKeeper).