Optimize nested aggregations

Hello experts!

I’m wondering what is the best way to calculate nested aggregations using Druid’s SQL language with low latency (< 1s).

To illustrate my question, I’ll assume the ‘cart’ table described below, which represents an e-commerce cart system. Let’s assume that Druid is rolling up events through ‘sum’ aggregation function over price values by a period of PT10M:

__time price user_id
2021-01-01T01:40:00 05.0 0001
2021-01-01T01:50:00 05.0 0001
2021-01-01T02:00:00 10.0 0002
2021-01-01T02:10:00 10.0 0001
2021-01-01T02:10:00 5.0 0002

I want to get the count of users whose cart’s total price is higher than 15 dollars at interval 2021-01-01T01:40:00/2021-01-01T02:10:00.

My initial solution leads to a nested group by, where the inner query will get the sum of prices by user, and then the outer query will get the total count, as described below:

select
    count(*)
from (
    select
        user_id,
        sum(price) as "total"
    from
        cart
    where
        __time between '2021-01-01T01:45:00' and '2021-01-01T02:10:00'
        and "total" > 15
    group by
        user_id
)

Looking at broker’s query logs, I found out that this SQL query might be translated to native query as follows:

{
    "queryType": "groupBy",
    "dataSource": {
        "type": "query",
        "query": {
            "queryType": "groupBy",
            "dataSource": {
                "type": "table",
                "name": "cart"
            },
            ...
        }
    }
}

When using this kind of ‘query’ data source (a.k.a subquery) through large intervals with a lot of data I’ve noticed that may occur disk spills at the broker process, which increases my query’s response latency.

Is there any optimizations or better way to solve this kind of problem? In my case, I have a lot of queries where I need to get the count of aggregations…

Just checking that you meant to have a HAVING in your query… like…

SELECT
  COUNT(*)
FROM 
(
  SELECT SUM(Fare) as "total"
  FROM "taxi-data-year-8"
  WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' YEAR
  GROUP BY "Taxi ID"
  HAVING "total" > 50
)

But also (!) I am going to have a play around and see what may be possible with other things.

It sounds like that single Broker is having to merge a lot of rows from the various subqueries inside the merge steps. Maybe you need more cores in your broker to enable parallel merges from all the services?

1 Like

Are you trying to do sessionisation at query time??? Ie, like there are users and they accumulate a basket, and you need to work out the total baskets at the end of their session (inner) and then count them?

IMHO for performance I would do something in Kafka so that when the basket is “closed” (ie the session is over) you end up with those aggregates into a “sessions” (or visits or whatever) table in Druid. Not only is it shifting the effort away from the query engine (where it’ll be done lots of times over and over to a place where it’s done just once) you will also avoid counting rows for users who haven’t “closed” their basket yet within the time interval you’re reporting on.

Feel free to ignore me entirely…!

1 Like

It sounds like that single Broker is having to merge a lot of rows from the various subqueries inside the merge steps.

Yes, that may be the problem.

Are you trying to do sessionisation at query time???

No, this example is just to illustrate my query issue.

I need to calculate metrics in real time, so I cannot delegate pre-computations to kafka and wait for a session to finish.

Also, using the same e-commerce cart system scenario, I would need to calculate other metrics as well.

An example of a metric is “how many purchases were finished”, considering a purchase can have more than one status (e.g. “started”, “waiting” or “finished”).

This new metric need to aggregate by maximum status at inner query (because a purchase can have more than one status by rollup) and count at external query. The problem remains the same because of the nested group by query.