How does Query priority work?

As per the docs, the queries with higher priority in context are given preference for computational resources. I am running tests where I have only set druid.broker.http.numConnections=1, so that broker can run atmost one query at a time on historicals.

I send 5 parallel queries to the broker with different priorities after sending one big query(so that it completely utilizes the connection pool which is just one connection, and all the 5 queries are queued).

My expectation was that query with higher priorities should finish first. But the results are not deterministic, and the queries finish in some random order.

Is my understanding correct or is this the expected behavior?

Hi,
you devised an interesting and thoughtful test setup. I’m not a Druid expert, so my understanding might be flawed, but here’s what I believe to know:

  • the query priority comes into effect only on the historicals. the brokers just relay them.

  • historicals execute a query on each applicable segment and those tasks are put onto a prioritized queue

In this class: https://github.com/druid-io/druid/blob/4b3bd8bd630130fb852dfa3054bd9a60d507fd2b/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java
you see in lines 210-228 that the priority queue orders tasks first by their priority and among tasks of equal priority, a “queue insertion place” number is used to decide who’s first. In lines 119-124 you can see that if FIFO scheduling is enabled on the historicals, this insertion place decreases, otherwise it is always 0 which means that 100 tasks with equal query priority are executed in random order.

What this means is

  • as your testsetup entails that only a single query at a time can arrive at a historical, the query priority doesn’t come into play. The broker does not cancel a running query to give precedence to a higher-prio one. But this is not as bad as it seems because the broker splits queries up into small tasks which are to execute the given query on a single segment. As long as the broker can pass many of those tasks on to the historicals, query prioritization works pretty well. If you increase the connection count, you’ll notice that higher prio queries will be executed first.

Assuming that FIFO scheduling is disabled, a timeline would be:

  1. query1 arrives on broker with priority 10.

  2. broker determines that query1 needs to be executed on segments A, B and C

  3. broker relays a query to the historical that basically says: execute query1 on segment A, execute query1 on segment B, execute query1 on segment C

  4. the historical receives the broker’s request and schedules the task onto a priority queue:

(prio10, subprio 0, query1, segmentA)

(prio10, subprio 0, query1, segmentB)

(prio10, subprio 0, query1, segmentC)

  1. lets assume that before the historical can get to work to work off this backlog of TODOs, the broker will send another query

  2. broker receives query2 with priority 20, determines that it needs to execute on segments, A, B and C and relays those tasks to the historical again

  3. the historical puts the second batch of tasks onto its priority queue

(prio20, subprio 0, query2, segmentB)

(prio20, subprio 0, query2, segmentA)

(prio20, subprio 0, query2, segmentC)

(prio10, subprio 0, query1, segmentA)

(prio10, subprio 0, query1, segmentB)

(prio10, subprio 0, query1, segmentC)

  1. the historical at any time is hard at work to pull tasks from the queue. The queue is always sorted on prio first and this insertion place which I called subprio second. As the first three tasks on the queue have the same prio+subprio, their order is random which I illustrated by segment order B,A,C whereas for the next tasks the order is A, B, C.

So lets say that the historical has 2 processing threads. It can therefore grab tasks (query2, segmentB) and (query2, segmentA) from the queue and computes partial results. Each time a processing thread finishes, the next items from the queue will get their turn. So next up is (query2, segmentC) followed by (query1, segmentB) and so on.

When all segments for a given query have completed processing, then the historical can merge the partial results and return the resultset to the broker.

The reason for why higher prio queries get immediate precedence is due to that splitting up of queries into per-segment-tasks which usually only take <1s each.

A heavy query might break up into 10000 segments to be scanned and it would take a long time until this query completes.
If in the meantime you send a small query with a higher priority, its segments get scanned first. As soon as the historical’s processing threads finish with their current tasks which usually is within a second or so, the next segment scans they take are the ones from the higher prio query.

As said, those higher-prio segment scan tasks must be allowed to make it through to the historical’s priority queue first, which isn’t the case if there is only a single connection from broker to historical.