Scanning performance

Hi all,

So, our single server Druid instance has around 22k segments with minute granularity in local deep storage with around 100 million events stored in them. On this server, there is 1 coordinator, 1 broker, 3 historical, and 1 realtime node. I’m able to query over a couple of days with about a 1-2 second response time. Querying over the full set (about 2 weeks worth of data) takes 20+ seconds, so I’m concerned at the degredation in performance; we want to store months and potentially years worth of data.

This is not a production setup, so I understand that it’s not going to scale well as-is. However, we’re starting to plan out what the production instance will look like and I’m curious as to which type of node adds the most to historical query performance. I had originally thought historical (duh), but the broker node apparently plays a large part too. I’m also not seeing much performance increase with the addition of more historical nodes (they are all on the same server, does that matter?). Which nodes should I plan on having the most of for large amounts of historical data?

For clarity, are the segments in minute granularity (one segment per minute), or are the segments in something like hourly granularity with the Query granularity for each segment on minute (one segment per hour)?

Also, what is the resulting size of each segment in this case?

The realtime spec specifies minute granularity and the query I’m running is a simple TopN query that specifies day granularity for the full range of data. This results in almost all of the segments being loaded and aggregated, which is to be expected. This is roughly 100 million events, so 20 seconds is still a good bit better than a SQL-based GROUP BY+ORDER BY query. My concern is that at the scaling rate I’m seeing, we won’t be able to handle more than maybe a month of data before query times become too long to use in real-time.

I’m very curious as to what kinds of nodes affect the loading/aggregation of segments during query execution so we can plan out our hardware needs. As I understand it, any of the historical nodes that have segments within the interval are given the query by the broker to run. I can see some evidence of this by looking at the system load during a fresh run of a query; the historical nodes are all maxing out the CPU.

The broker then caches the resulting segments and, since segments are immutable, it can reuse those segments as long as the only thing that changes about the query is the interval. This is a really neat feature, as most of our queries are going to be relatively static with adjustable timescales and granularities.

Based on this, I’m thinking it makes sense to have a bunch of general-purpose VMs as historical nodes and one or two beefy VMs with a ton of memory as the brokers. On the other hand, it might be that we’ve structured our schema poorly. Is our data simply too granular? How are other people handling these volumes of data?

Thoughts inline.

The realtime spec specifies minute granularity and the query I’m running is a simple TopN query that specifies day granularity for the full range of data.

What is the size of each segment you are creating? You can find this information in the coordinator console (http://coordinator_ip:port). Druid’s parallelization model uses 1 core to scan 1 segment, and we typically recommend segments that are ~5M rows and around 250-700mb in size. Segments that are too small will introduce overhead as many passes are required to scan the data.

This results in almost all of the segments being loaded and aggregated, which is to be expected. This is roughly 100 million events, so 20 seconds is still a good bit better than a SQL-based GROUP BY+ORDER BY query.

This seems very slow. Druid is very finicky around configuration as misconfiguration can lead to dramatically reduced performance.

My concern is that at the scaling rate I’m seeing, we won’t be able to handle more than maybe a month of data before query times become too long to use in real-time.

I’m very curious as to what kinds of nodes affect the loading/aggregation of segments during query execution so we can plan out our hardware needs.

Historical nodes scan segments and do a first level merging of the results of the segments they’ve scanned. Broker nodes do a second level merging of results from historical/realtime nodes.

As I understand it, any of the historical nodes that have segments within the interval are given the query by the broker to run. I can see some evidence of this by looking at the system load during a fresh run of a query; the historical nodes are all maxing out the CPU.

The broker then caches the resulting segments and, since segments are immutable, it can reuse those segments as long as the only thing that changes about the query is the interval. This is a really neat feature, as most of our queries are going to be relatively static with adjustable timescales and granularities.

Based on this, I’m thinking it makes sense to have a bunch of general-purpose VMs as historical nodes and one or two beefy VMs with a ton of memory as the brokers. On the other hand, it might be that we’ve structured our schema poorly. Is our data simply too granular? How are other people handling these volumes of data?

There’s some general guidelines of performance tuning here: http://druid.io/docs/latest/operations/performance-faq.html and we have some example performance configs here: http://druid.io/docs/latest/configuration/production-cluster.html.

I’d be curious to know how you’ve initially set things up to get an idea of where inefficiencies may lie. Also, if you haven’t had the chance, you may be interested in: http://druid.io/docs/latest/misc/evaluate.html

Just wanted to clarify: our hardware currently is a single 2012 MacBook with 16GB RAM running Ubuntu. I am aware that calling this amount of hardware “insufficient” would be a massive understatement, it was just what we had on hand to test Druid out. Hardware planning where I work is a huge pain, so we unfortunately didn’t have the luxury of just getting a bunch of VMs from AWS and testing this out properly.

Our segments are way smaller than 5M, we typically only see maybe 15K events a minute or so. I’m seeing that no segment is much over 200KB in the Druid console. We were hoping to capture data with minute granularity and just aggregate it at larger granularities over longer periods of time (like days or weeks or months). Is this not feasible with Druid? If not, what would you suggest we do?

I’ll give the performance tuning guidelines a look for individual settings, but we’re currently planning on our first iteration of a Druid cluster looking like the list below. I’m omitting Zookeeper, we’ve already got that set up on a separate set of servers.

  • node1[8CPUx48GB]: Broker (30GB heap), Coordinator (1GB heap), Realtime (12GB heap), MySQL
  • node2[4CPUx16GB]: Historical (12GB heap)
  • node3[4CPUx16GB]: Historical (12GB heap)
  • node4[4CPUx16GB]: Historical (12GB heap)
  • node5[4CPUx16GB]: Historical (12GB heap)
    I think the thing I’m most concerned with as far as the hardware architecture goes is what nodes are responsible most for processing new data and making sure we can scale that out as our data grows. It sounds like it would be Historical nodes and the Broker just does the merging from its cache.

Hi Taylor, please see inline.

Just wanted to clarify: our hardware currently is a single 2012 MacBook with 16GB RAM running Ubuntu. I am aware that calling this amount of hardware “insufficient” would be a massive understatement, it was just what we had on hand to test Druid out. Hardware planning where I work is a huge pain, so we unfortunately didn’t have the luxury of just getting a bunch of VMs from AWS and testing this out properly.

Our segments are way smaller than 5M, we typically only see maybe 15K events a minute or so. I’m seeing that no segment is much over 200KB in the Druid console. We were hoping to capture data with minute granularity and just aggregate it at larger granularities over longer periods of time (like days or weeks or months). Is this not feasible with Druid? If not, what would you suggest we do?

I think there may be some confusion over segmentGranularity versus queryGranularity. It sounds like what you are trying to do is roll data up to a minutely basis. To do this, you can set queryGranularity to minute. SegmentGranularity is a config parameter that is used to control how segments are shaded. Given your current volume of data, you can set segmentGranularity to hourly, even daily.

I’ll give the performance tuning guidelines a look for individual settings, but we’re currently planning on our first iteration of a Druid cluster looking like the list below. I’m omitting Zookeeper, we’ve already got that set up on a separate set of servers.

  • node1[8CPUx48GB]: Broker (30GB heap), Coordinator (1GB heap), Realtime (12GB heap), MySQL
  • node2[4CPUx16GB]: Historical (12GB heap)
  • node3[4CPUx16GB]: Historical (12GB heap)
  • node4[4CPUx16GB]: Historical (12GB heap)
  • node5[4CPUx16GB]: Historical (12GB heap)

I recently wrote up some more thoughts on a POC cluster for Druid here: https://github.com/druid-io/druid/pull/1607 (to be merged). Given the nodes you listed above, you are probably overkilling on heap size and there will be a lot of paging occurring that will affect performance. Reading this doc might help make sense out of how to tune Druid a bit more: http://druid.io/docs/latest/operations/performance-faq.html

I think the thing I’m most concerned with as far as the hardware architecture goes is what nodes are responsible most for processing new data and making sure we can scale that out as our data grows. It sounds like it would be Historical nodes and the Broker just does the merging from its cache.

The real-time nodes are responsible for ingesting new data, periodically creating immutable segments from that data, and handing off to historical nodes. Brokers merge results from historical and can also merge those results with results in the cache.

Ah! You’re absolutely correct, I misunderstood the the segmentGranularity/queryGranularity distinction; I just assumed that one limited the other, so they’re both set to minute right now. Ill give that a change as soon as I can.

Thanks for the tips on performance, but I think I’m still a bit unclear on the off-/on-heap stuff. I recognize that you’re allocating memory directly (I assume with sun.misc.Unsafe), but I thought that memory came out of the space available to the GC’d portions of the heap. I haven’t had the opportunity to do much work with that technique in Java, but from what you said I guess it’s pretty much just malloc(). I’ll tune the memory down to 2GB per node.

Would you recommend having multiple Historical nodes per VM? As I said, hardware planning is a pain where I work and it takes a while to change things.

Once a segment is persisted the data is memory mapped. This leaves the paging in and out between memory and disk to whatever your kernel and file system are tuned to. There isn’t any Unsafe usage for this, just Direct ByteBuffer from memory mapping.

Ah, so you’re just depending on the kernel’s ability to cache recently accessed files in memory. Makes sense, thanks!