Data-driven applications and dashboards can place performance and scalability demands on the underlying datastore. In a world where each app or dashboard may fire off dozens of queries per second, a single poorly designed query can make or break the user experience.
This tutorial explains the interplay between data modeling and query performance in Apache Druid. We dig into how a query is submitted to Druid and how Druid responds, providing several techniques for optimizing query performance.
First, we’ll discuss how we think about performance. Yes, our goal is to discuss data modeling in Apache Druid, yet it’s important to understand that the primary reason we care about data modeling is because we’re always working towards optimizing query performance. If we didn’t care about performance, then we would just model data willy-nilly, capriciously ingesting data just to get the job done. As Data modeling is done in service of performance, any discussion about Druid data modelling necessarily is a discussion about Druid performance.
Next, we’ll talk about the relationship between data modeling and performance, and how they influence and affect each other.
We’ll conclude with the four most common go-to data modeling techniques used to achieve required performance:
The late, great, Carl Sagan tells us that, if you want to make an apple pie from scratch, you must first invent the universe. Similarly, if you want to have a good performing data application, you must first understand how the query engine and data layer work from the ground up.
User generated queries are submitted to the query broker (1) at the top of the diagram below. From there, the queries fan out (2), distributing them to data servers. Each data server handles its piece of the query (3) and results are then returned to the broker via a fan in (4). The broker merges them (5) and serves them to the user.
Examining this in greater detail will inform your data modeling decisions with an insight into how they will affect query performance. As we saw above, queries run as a fan out and then a fan in. Let’s start at the bottom, after the fan out has occurred and reached the data servers.
This diagram, affectionately referred to as the Great Layered Pyramid of Query Performance (GLPQP), helps us understand how a query is returned from data stored in Druid. The bottom two layers of the diagram describe data server activities, while the top two represent broker activities. The fan-in mechanism takes place between the two halves of the diagram, connecting the broker and data servers.
Starting at the bottom layer of the diagram, we turn to the individual threads on a data server, the processing threads for historicals, indexers, tasks, and JVMs.
This is where Druid opens segments that match your time filter. This is critical to query performance in Druid, time filtering is a very important technique because there is a global time index within Druid that enables very fast data scans, so it’s essentially free to filter down to data that matches your time filter.
Next, the data server evaluates the WHERE clause to set filters on fields that aren’t time.
For example, an app might query first on “time is greater than January 1 2020 and less than June 30 2020” (the initial step) and then on country, for example, “United States”. In this second step, the threads on a data server, armed with their segments, apply the WHERE clause. These non-time elements are not as free as the time-based step. Some of them are better than others, though, and nuances become important when we’re trying to squeeze the best performance out of a data model.
After the data server processes execute the WHERE clause, any rows that match the WHERE clause are run through GROUPBY and aggregate functions. Grouping queries include more than just the GROUPBY statement, and also include SQL queries that result in a grand total and native queries like TOPN.
It’s worthy of mention that most of the queries that Apache Druid users run are grouping queries, so that is the focus of this article. Scan queries execute slightly differently.
The data server processes run the GROUPBY and its aggregation functions. Then if ORDER BY / LIMIT has been pushed down to the segment, Druid applies them. This is more of a query tuning thing and less of a data modeling thing, so it won’t be addressed in this post.
The bottom stage of the GLPQP occurs in processing threads on data servers where there’s one thread per processor per data server. The result is that everything on the data server is fully parallelizable, meaning that anything that executes in the bottom layer of the pyramid is linearly scalable. This is the “beauty layer” where we get the most value for our investment when adding servers. If we double the number of servers, our functions in the bottom layer run in half the time.
︎ Provide enough cores to handle enough query threads in parallel. The bottom processing layer is constrained only by the number of threads.
In general, moving up the Great Layered Pyramid of Query Performance, Druid does more computational work on a lower volume of data. On the bottom layer, Druid operates on the greatest volume of data, yet those operations are the least CPU intensive so they are distributed and very scalable. Moving up, Druid handles less data volume with more CPU intensive operations that are less scalable.
The next layer up the pyramid is the web server thread, also called the Jetty thread. There’s only one of these per data server process: one per historical, one per indexer, and one per task. This is critical for performance because it means that this layer does not scale out on demand to fully utilize compute resources; just one is used per query.
The Jetty thread merges results from each query thread and sends them to the broker. The Jetty thread doesn’t handle as much data as the processing threads; it’s only dealing with things that have already been grouped. For this reason, it’s OK (most of the time) that it’s less scalable; the only situation where it’s not OK is when Druid must group on high-cardinality dimensions. In this case, this single thread has to deal with as much data as the individual processing threads.
The next step in the GLPQP is where the broker merges results in parallel from the data servers.
There’s one merge operation per processor of the query broker that is in command of each query. This is important because while it does scale out to all the processors on a particular broker, it does not scale to all the processors in the cluster. For example, if you have query servers that are 16 cores each, and you have 10 of them, you have 160 processors at your disposal, but you can only use 16 for any one specific query during this stage of the pyramid.
︎ Monitor core utilisation on individual Brokers when queries execute. You need to be sure that you have enough capacity per broker to merge results from across your data servers at query time.
Finally, at the very top, the Broker server Jetty thread, runs one more query element across the entire cluster.
This query element is single threaded, so for best performance we don’t want to have too much data or too many operations taking place here as it will rapidly become a bottleneck. This is where the final LIMIT, ORDER BY, or HAVING is evaluated.
As an example of the complex interplay between data modelling and query performance, it is much more efficient to push the LIMIT and ORDER BY down to the data server processing threads where it can scale better. Druid does this automatically for LIMIT clauses that run with the native Scan or TopN query types, and for ORDER BY clauses that run with native GroupBy query types. Adding LIMIT and/or ORDER BY clauses that push down to data servers is a great technique to boost performance in Druid.
Let’s dig deeper into the bottom layer of the GLPQP to see how to and split things up and optimize for performance.
One way to think about how Druid operates, is that there are a bunch of segments that the query is going to get fanned out to, and then a fan in with the response is going to happen. This is significant because we see lots of examples where queries bottleneck at that bottom layer. It’s easy to see that this is what is happening when queries are running and data server CPUs are pegged at 100%. That’s a clear sign that that bottom layer is where the action is happening, and this is a typical Druid experience. Of course, any layer could be a bottleneck, the best way to identify the bottleneck so that it can be addressed is with flame graphs.
What’s really taking place within the bottom layer? There’s a set amount of CPU processing (or CPU time) that needs to happen on each segment to execute the query, i.e. segment one is going to take a certain amount of time and segment two will take a certain amount of time. These times are fixed for a given data set and a given query. A query on segment number one might take 400 milliseconds of CPU time. It’s not possible to change how fast that runs, but it is possible to add more processors and process more query tasks. You can keep adding processors until the number of processors equals the number of segments, and at that point adding more doesn’t decrease the amount of time needed because you can’t split a segment between multiple processors.
Druid must divide CPU time between segments. As a result, segments are chopped up so they can be assigned to different processors while each processor processes in parallel. The amount of time that the bottom layer (data server processing threads) takes is roughly mathematically going to be the total CPU time needed for all segments divided by the number of processors.
And so here we are, starting with a pretty simple formula, a basic division operation. This is our first step towards connecting data modeling and query performance.
It’s possible to break down different factors that come into play in determining query performance. After we dig into this piece of the puzzle, we’ll connect the various pieces together with data modeling and you’ll be able to see the whole picture.
To explain the equation, t is the time a query takes, and there are three components that factor into how long a query is going to take: row by row processing, segment overhead, and merging.
Row by row processing and segment overhead take place in the bottom layer of the GLPQP, while merging takes place in the top three layers.
Looking at the first bit of our equation, row by row. The way to think about row by row processing is that Druid has an amount of CPU time that it has at its disposal to process segments, and then you divide that by the number of CPUs available. The term Rs over r, if you ignore the P, is the amount of CPU time that’s necessary to process all the segments. Therefore, P is the number of processors, while R is the number of rows in the matching segments, and s is filter selectivity.
How does the concept of index selectivity (at the top of row by row) affect performance? If there’s a query with a WHERE clause that can meet the index, and it returns half the rows. In this example, index selectivity is one-half. Alternatively, if there’s for a query that’s not filterable, then index selectivity is basically equal to one. In other words, index selectivity is now negligible where there are no indexable filters. At that point the term is simply R over r, where, if you recall, r is the row process rate per processor. This tells us that if we have a billion rows, and we can process 10 million rows per second, then Druid requires one hundred seconds of CPU time. If we have 100 CPUs, we can get that done in one second.
The next term, located in the middle of our equation, is segment overhead. There’s an overhead per segment (ts) as a result of the query process described at the data server process level above, multiplied by the number of segments (S) that hold the data for the query, divided by the number of processors
(P) . Even at a glance, we can see why we love compaction in Druid. As compaction reduces the number of segments this term gets smaller.
Finally, merging, on the right of our equation, involves the number of rows to merge (Rm) and the row merge rate (rm). The row merge rate is determined by query server size as each query server merges rows in parallel with the others, the implication being that our denominator is heavily dependent on query server size. The more powerful the server, the larger the row merge rate, therefore the merging component of the equation becomes less impactful. Similarly, the number of rows to merge is based on the cardinality of the grouping, with more rows to merge making the merging component of the equation more impactful.
Now for the connection between data modelling and performance. Returning to our equation, note that some of the variables are now orange. The orange variables are ones that we can affect with data modelling, and you should be happy to note that we can actually affect a lot of these terms with data modelling.
The only things we can’t affect out of all these terms is the row merge rate, which we saw earlier is heavily dependent on query server size, and the number of CPUs. The two factors that are under Druid’s control are under your control because they’re determined by your budget.
However, we can affect everything else in this performance equation by optimizing data modelling with Druid. There are four major data modelling techniques used to improve performance in Druid: rollup, partitioning, sorting and indexes.
RollUp in Druid is a process of summarization, an optional feature that serves to reduce the row count. You can understand it as Building an OLAP Cube, or pre-aggregation. At ingestion time, instead of treating everything as individual columns, Druid takes columns and bends them into dimensions and metrics, then does a
GROUPBY on all the dimensions by time and then aggregates the metrics. One of the ways Druid is able to be so fast is because of this partial aggregation around time done during ingestion.
Depending on the peculiarities of your data, rollup may do absolutely nothing, for example if there’s some combination of dimensions in your data set that’s unique in some way. This could happen from unique IDs, or an IoT data set that has only one data point per minute, and you’re grouping on minute and sensor, so there’s nothing really to group there. So it’s not going to change anything. RollUp could give you nothing or be extremely meaningful, providing a twenty to forty times reduction in data size and commensurate 20 to 40 times increase in performance.
Rollup decreases the number of rows in the matching segment. It reduces the number of rows in the data source, and therefore the number of rows and all the segments that match a query.
Think back to our equation where we have a bunch of CPU time and a bunch of segments and we need to process each one. What partitioning does is reduce the number of segments Druid needs to think about by getting all the data for the same value together in the same segment. Partitioning matters most for situations where users are consistently filtering on the same, selective, column. The canonical example is a multi-tenant situation where there’s a column for customer ID, and users are always filtering customer ID so they only see their own data.
Done properly, partitioning can significantly improve Druid performance because the system can now process a subset of segments rather than have to process every segment. Just like rollup, partitioning won’t always make a difference to performance. If there’s no regularity or structure in the date and how users filter the data, it’s likely partitioning won’t improve performance.
Knowing how users will query the data is really important to get the most out of data modeling. One does not simply enable rollup and/or partitioning and receive a guaranteed speed boost. A great Druid developer understands the kinds of filters that are likely to get used, and combines them with intimate knowledge of the data set as far as the uniqueness of the records, and will then decide if and how to implement rollup and partitioning.
Looking at our equation, partitioning affects the number of rows in matching segments and the number of matching segments. Fewer segments will match query parameters, and those that do will have fewer rows in them. Partitioning reduces the number of rows that need to be processed per core. Partitioning also reduces the overhead per segment, because part of the overhead for a segment is doing that filtering. If we’re partitioning data by a column, and then always filtering on that column, then each segment will have a smaller range of data in it for that column. The consequence is that the filtering will happen faster and that overhead is reduced.
Sorting and partitioning are best friends. The situations where partitioning matters are almost always situations where sorting also matters. Sorting is organizing the data within a segment to achieve internal locality.
This diagram shows what happens when Druid sorts. There’s data in the left column, and then in the middle there’s an unsorted segment. During ingestion, Druid’s storage engine chunks columns up into what we call chunks, we chunk it into chunks, and yes that’s the technical term for it. Each chunk has a certain number of rows in it. In this example, there are four, but usually it’s much more than four, usually it’s hundreds or thousands. Here there are four, and we can see that to find all the rows that have a 1 in them, which are all the rows that match Ke$ha, then actually every chunk has a 1 in it. That means that, in this example, we have to read every chunk.
However, reading a chunk is expensive because chunks are compressed individually. If users are going to be filtering heavily on one specific column, then our data model should sort by that column. In this way, all the rows that correspond to our filter are together and can be placed into adjacent chunks. Now Druid only has to read two chunks to get all the 1 results that are in the four chunks. That’s half as much overhead, and that’s why sorting and partitioning typically go together.
Partitioning reduces the number of segments Druid is required to process in order to respond to a query. Sorting further optimizes performance by placing all query results within a segment next to each other.
The flame graph below demonstrates the importance of how a data model can affect performance, in particular partitioning and sorting. In this example, the data is filtered so that a query typically hits about 2% of the overall data, a pretty small percentage. We were initially not sorting.
Every arrow above points to a spot where LZ decompression must take place, and it can only take place chunk by chunk within individual segments. The flame graph shows us that, for this particular example, we’re spending roughly ninety percent of the time of the query decompressing data to throw most of it away. We can overcome this waste at query time by partitioning and sorting in advance.
Let’s look at our equation to see why sorting helps.
Sorting improves the row process rate so we can process rows faster. It’s much harder to scan through jumbled chunks, decompressing and compressing them, because they’re out of order. The combination of partitioning and sorting is so valuable towards improving Druid query performance. Partitioning speeds up three variables in our performance equation and sorting adds another variable to improve. Together, partitioning and sorting optimize the bulk of our performance equation.
Today, only string columns have indexes in Druid. We’re adding numeric column indexes, but for now, only string columns are indexable.This means that it may further optimize performance by reading in columns users are going to filter on as strings instead of numbers. This is a little bit counterintuitive because usually with data systems it’s best to model things in their most natural type for best performance. This happens to be an exception, and it is only temporary because we’re adding numeric column indexes.
Imagine that your data set includes a field for a number that users always filter on, customer ID. Users are always filtering on customer ID, and never running any aggregations that would require the field to be numerical.In this case, it’s beneficial to store the customer ID field as a string so it gets an index.
In the future, Druid will get numeric indexes, but until then, you can help boost query performance with this powerful technique. In our performance equation, we reduce index selectivity, and row by row processing shrinks based on the suitability of the index to the filters.
Druid was designed to be fast and scalable. This tutorial showed you how Druid processes queries and provided four techniques for improving query response time. Download Druid and get started on the path to blazing fast data-driven applications.