Large number of coarse time series

Dear Druid-User members,

We plan to use Druid in a scenario where we have a large number of coarse time series: the smallest case is 50M time series sampled at 15-minute intervals (96 points daily per time series). The queries we plan to perform fall into two categories. The first are "classical" `group-by` or `timeseries` aggregations over the dataset, which already work very well and which is why we try to use Druid for this dataset.

The second category is high-speed large-scale random reads of raw measurements from the last 24 hours. We essentially would like to ask for the last 96 data points belonging to a time series, for all time series one by one in a random order, keeping the total time as short as possible. These queries are used in our application to generate alerts based on non-trivial business logic, like: "does time series X have 3 consecutive periods of elevated measurements and simultaneously time series Y's exponential average is above Z?"—as far as I understand, not possible inside Druid, so we're handling the logic in application code.

We're currently storing data in a single data source, in form of a 3-tuple: (timestamp, ID, value), where `ID` is a dimension column: a 10-byte identifier chosen at random to identify each time series (10 bytes is the shortest we can confidently use without risking birthday paradox collisions), and `value` is a metric column: a double. In production use, time series come and go in a pretty much random way; our synthetic tests have a fixed set of them. We can extract data using a `select` query with a filter on the ID, and for a single time series or a small set of them (using the new `in` filter), it works quite well.

Some data regarding the ingestion process. Currently we can ingest one day of data in 3 hours using a parallelized `index_realtime` task, and the ingestion process uses about 10 GB of RAM. We're creating a single daily segment partitioned by IDs (so that all data points for a single time series are stored in a single partition). We generate 50 partitions, each takes 1 GB (if using Concise bitmaps) or 1.4 GB (if using Roaring bitmaps), so the total amount is 50 GB or 70 GB. Note that this means each of the partitions has almost 100M rows.

However, we've hit a wall while trying to optimize the queries. We cannot extract more than about 1000 of time series per second on a single server (i7 6700, 32 GB of RAM, 2xSSD in RAID0). Our main testing scenario is as follows. We send `select`-`in` queries in parallel (10 queries at a time). Each query asks for values from the same segment, from the first half of the partitions (25 GB, so that it should fit in RAM). Each query asks for values of 6000 time series. Time series do not repeat between queries. Queries are sent directly to a historical node (no other Druid components are running at test time), with -Xmx2g, 8 threads and processing buffer size set to 160MB. These numbers were found using after running ~100 short experiments to find the settings that result in the biggest speed (it improved our own hand-tuning by ~30%; the defaults resulted in the dataset not fitting in RAM).

We observe that after initial segment loading, there's no large disk activity (as expected). CPU is at 800% (this unit has 8 virtual cores). Rough profiling shows that the hottest parts of code are: Concise bitmaps, then (if using Roaring bitmaps) the select query code, sometimes JSON serialization.

Some additional tests we've run. We initially tested querying for time series from all 50 partitions on a single machine; this obviously resulted in constant disk activity, but we weren't getting much worse results (!)—roughly 500÷600 time series per second, while still observing CPU usage in the 100%÷300% range.

We also wanted to get some baseline, an estimate on what's possible on this hardware without network overhead and most of Druid features. We created a toy database that just stores all data in mmap-able read-only tries (using the marisa_trie library). We managed to get 300k time series per second on the same dataset: two orders of magnitude more.

We'd like to run all queries in under 15 minutes. At the current speeds for the smallest case of 50M time series, this would mean using a cluster of ~50 i7 CPUs, which is more than we expected. Is there anything we could change to get these queries to execute faster? Ingestion parameters, historical node settings, query structure?

Thank you,

Hey Tomasz,

You’re really pushing the limits of the select query :slight_smile:

Fwiw, its original purpose was to power an “event search” sort of view, where you’d get the oldest or newest X events that matched a particular filter – not pulling a large amount of data. It can do that, but I think nobody’s really tried to speed it up yet. Would you mind sharing your profiling results that showed where the hot parts of the code were?

In the meantime you could try some queries that are a worse fit in theory but might be faster in practice. topN is a good choice as a lot of work has been put into speeding that up. Maybe try a topN with dimension “ID”, lexicographic sort, and with a granularity that matches your underlying data granularity.


I’ve been working along with Tomasz on this one and I’ve just posted my findings on your github: Would be very glad if you could have a look a this. Thanks :-).