Druid Scaling with Index_Parallel


I am trying to ingest 300 parquet files batch of 20 MB each file using Index_Parallel with 50 subTask. If I run the single task it finishes within 2 mins 40 secs but when I run the same kind of jobs 10 in parallel (same data just different name to create similar jobs) it finishes in 10:40 secs (each job). Can someone help me what am I missing here? Below is my druid setup.

Master - m5.4xlarge (2 nodes)

Query - m4.2xlarge (1 node)

Data - i3en.24xlarge (5 nodes, 150 slots each)

Druid Version - 0.19


First- you need 3 master nodes for HA. If you aren’t worried about HA, you can just have 1 and you would scale it up.

OK. Ingestion.

index tasks do simple, honest work: they read the input files you provide and make segments, and they do it on a single machine in a single thread. They do pretty well on 1.5GB of data but probably not the right choice for loading 1.5TB of data. index_parallel tasks list out your input files and then create one index_sub task (which is more-or-less doing the same thing as an index task) for each file. And at most maxNumSubTasks run at once. It is awesome because it lets you parallelize ingestion easily and automatically. But where this goes off the rails is if your files are too big – which means that one task, with its one machine and one thread, must deal with the giant file – or too small – which means you have some huge number of tasks that have to run, and each one takes like 15-20 seconds just to start up. Then processing the tiny amount of data you gave it takes like one second. But the overheads are sad.

Also, there is no reason to set the MaxNumSubTasks any higher than the total worker slots. You can figure out total worker slots for middle managers as druid.worker.capacity x # of middle managers.

If you are focused on speeding up this load (and not worried about letting queries through), you could change that setting on each data node. That will allow you to raise the MaxNumSubTasks. If you can reduce the number of files to be ingested to be about the same as the mxnumsubtasks, that should create the most optimized load.

For historicals:

  • druid.processing.numThreads should generally be set to (number of cores - 1) : a smaller value can result in CPU underutilization, while going over the number of cores can result in unnecessary CPU contention.

Task Count

The number of tasks a MiddleManager can launch is controlled by the druid.worker.capacity setting.

The number of workers needed in your cluster depends on how many concurrent ingestion tasks you need to run for your use cases. The number of workers that can be launched on a given machine depends on the size of resources allocated per worker and available system resources.

You can allocate more MiddleManager machines to your cluster to add task capacity.

HI thanks for the reply.
I’ve my worker capacity 150 set so in total I’ve 750 slots available. I didnt see any lag in task pick up but I’m observing that single_task created for index_parallel task takes more time when number of index_parallel tasks increases.

But if I check the ratio of GB ingested per minute its not degraded. below is the number:-

  1. 1 index_parallel task with 50 parallelism (300 parquet files with 20 mb size)
    Time taken - 2 min 30 sec
    Data ingested - 6 GB => ~2.4 gb/min
  2. 10 index_parallel tasks with 50 parallelism each (300 parquet files with 20 mb size each)
    Time taken - ~10 mins 54 secs each
    Data ingested - 60 GB => ~5.4 gb/min

But what I want is to finish my index_parallel task with same time 2 min 30 secs. is there any priority or anything we can change or why does all index_parallel task finishes at once that I am not able to undertstand. any help on this?

I think the index_parallel task is designed to be done all at once, rather than the old index task that needed to be run in parallel. Have you tried walking up the single command to use all the CPU on the cluster? My guess would be that #2 is starving for resources with the overhead associated with all the index tasks happening at once. How is the resource utilization looking when you run each command?

Oh so you mean, if N number of index_parallel tasks are running they will be complete all at once, BTW I observed the same though. I also noticed that with increasing number of tasks JVM boot up was taking time. I increased my data nodes to 15 and I was able to achieve below performance.

150 parallelism 5 jobs (300 parquet files with 20 mb size each) - completed within 2 mins 23 secs

It seems with increase numbers of tasks on a single machine delay the JVM boot up and I am also sensing Druid completes all index_parallel tasks all at once.

Do you think if we move to EMR this behaviour will improve ? I am thinking to do a POC not that

Hey @TechLifeWithMohsin!

My first suggestion is to check the Payload of each subtask that was spawned. That way you can see specifically which files were being processed by which task.

Second, check that you have enough workers free in the Middle Manager configuration when the job is running.

Third, you might be worth looking at the individual task logs for each job, where you’ll find timestamped information about what the job was doing.

If tasks are sat in Pending, I would check the worker allocations in the console to see whether they are waiting on workers to be allocated. As @Rachel_Pedreschi noted, is your druid.worker.capacity set to take advantage of the cores you have available?

It was ZK issue it seems, everytime I retsarted ZK it worked, I setup the 3 node quorum and after that things are working fine