Rolling segment data forward when query intervals expire

Hi All,

We’re currently POCing a new data pipeline that uses Druid as its application-facing query store downstream from some stateful stream processing with Samza. We’re considering using Druid to store daily history of a large number of objects with a number of different dimensions identifying each row. The way we’re considering building our pipeline, our events are cumulative over time. We’re sending metric deltas to Druid from our stream processing layer that result in aggregate updates within in the queryGranularity specified in the ingestion spec, but when we cross between queryGranularity segments, we’d like to retain the current values as the base over which to apply the new period’s aggregates. For example, in our Tranquility ingestion spec, we’d set a queryGranularity of one day and all events for that day would be aggregated into their dimensional buckets. When we roll over into the next day, we’d like to pre-populate the new segments with the previous day’s data so that all streaming aggregates are initialized with the previous day’s data.

It seems like this might be a common case, and it’s basically the opposite of the handoff from the indexing nodes to the historical nodes, so I thought there might already be something in Druid to handle this case and let the existing infrastructure in Druid handle the query routing the same way a broker currently routes to indexing nodes and historical nodes based on the query interval. Is there existing configuration/process/best practice to handle this kind of situation? Or is this kind of roll-forward something I’ll need to handle at the application level?

It’s also possible that I’m abusing the queryGranularity configuration to gain a small amount of mutability on the metric aggregates during the current day, so if that’s the case, I’d be open to alternative suggestions.

Thanks!

– Will

Will

There isn’t currently anything like what you’ve described in Druid. Druid considers every data point to be cumulative.

Is this a performance optimization, or something that you have to implement to satisfy some business logic?

Hi Max,

Thanks for replying! It’s kind of a mix of both.

On the business side, we have a large number of events that have a number of dimensions, some of which are set when the event is originally ingested and are fixed for all time and some of which can be changed as a user interacts with the system and recategorizes things over time. For example, we have a tagging system. When the original event is ingested, in addition to the fixed dimensions (things like the ip address the event originated from) it has an empty list of tags. As time goes on and users interact with the system, they can add and remove tags. In addition, external events can result in updates to metrics on the original events. We’d like to track the metrics associated with each tag over time on daily time scales. The solution we came up with is to roll events forward and use our stream processing layer to process the application events into metric deltas that we apply during the currently open aggregation window. So if someone tags the original event, we would emit a new event with the same fixed dimension signature as the original event, but containing a new tag value adding 1 to a count metric.

On the optimization side, by far the most frequent questions our users ask is about the current state of the data, or how the data looked on a specific day in the past. So we could actually do away with the roll-forward idea completely and just store the metric deltas aggregated by day using Druid’s existing aggregations on ingestion. However, if we take that approach, then we have to sum over the entire history at query time in order to figure out what things look like “today”. So the roll-forward would make it easy to answer questions like “what did things look like today?” and “what did things look like a month ago” with a minimum of query-time aggregation and grouping.

This may be an abuse of Druid, which is why we’re prototyping how this all will work before committing to a solution, but we have time-based data and we’re really happy with the performance and scalability we’re getting on our data on the querying side (and we have some upcoming milestones that are going to increase the amount of data we’re ingesting by orders of magnitude), so it’d be great if we can make this all work out.

I did look to see if the query time lookups could handle this, but my understanding of those is that they always return current values, so we couldn’t keep track of the history associated with our mutable dimensions.

Thanks again,

– Will

Actually, after I wrote that response, I think it’s really only a performance optimization for which I have no data to justify. I think I can do everything I need to to with fewer edge cases and questions about latency and eventual consistency by just aggregating the change events themselves and querying for current state by aggregating over time. We probably have the option to resample older data to a lower query granularity, so there’s likely an upper bound for the data storage. In fact, that approach will probably result in less data overall being stored since we won’t be duplicating everything forward in time. So I think this question is unnecessary, I should just use Druid in the way it’s expecting to be used!

I figured I’d post this back to the list, in the hopes of helping the next Druid newbie, especially since it appears my (bad) idea was really due to lingering ideas about how mutable state databases would handle this kind of problem.

Thanks again!

– Will

Will

When we were planning our system we made the same mistake of over-complicating things prematurely. I would encourage you to use Druid as intended until it stops working, and then make it complicated. For example, we planned a very involved sampling method, and then realized that Druid was efficient enough that we would never need to use sampling.

It seems like you came to the same conclusion.