Suggestion: LastValue aggregator

(I’m happy to add this as a feature request in Github if there is interest)

I have an interesting use case for real-time stream ingestion where I’m using metrics as a sort of back-door method to update previously ingested events.

We are tracking events related to phone calls. We’ll start with a “call started” event which contains entity ids (accounts, etc.) and indexed on the call start time.

Next event is “call ended” event at which time we know duration of the phone call. This could occur up to an hour later than call start.

After the call is completed, various downstream processes asynchronously generate additional data. For example, some calls are recorded and when the recording is stored, a recording ticket id is generated. We also have a process that analyzes voice activity data on the call and attempts to classify it as, e.g. “Conversation” or “Wrong Number”. These bits of data might be generated 1-30 minutes after the end of the call, depending.

Now, we have a batch ingestion pipeline that will load the most complete information we have which can follow along hours after the end of the phone call. But it would be a big win if we could present a picture of the calls as they are in progress, adding additional bits of data as they are generated, in advance of the batch ingestion.

I have prototyped a scheme using real time event pushing where all events associated with a call are indexed on the call start time timestamp and include the fixed entity id dimensions. Then at call end time, I push an event with the same timestamp and dimensions but also contains a duration metric. Then, when the call is classified as “Conversation”, etc. I convert that to an integer enumeration and send it as a “classification” metric with the same timestamp and id dimensions. A SelectQuery will return this as a row with a count of “3”, but service and/or UI logic can look at the duration metric and display it as “in progress” if duration == 0 or calculate an end time, and look at the classification metric and display it as “Pending” if classification == 0 or map it back to “Conversation”, etc. otherwise.

This seems to work really well!

One rub is that in the ingestion spec, I add these metrics using “maxValue” aggregations. That guards perfectly well against cases where duplicate events are generated. But there are rare edge-casey situations where the processes which generate the data might be rerun and emit a new value. Ideally I’d keep the new value, but if it happens to be less than the original, “maxValue” will exclude it.

I could live with this situation as it’s a very rare case, and anyway the batch process will come behind eventually and fix it. But, if there was an aggregation function “LastValue” that just replaced the previous value with the new one, it would plug this hole. Maybe pretty easy to implement? (If implemented I’d imagine a “FirstValue” would be quite easy to tack on as well, though I don’t have a use case for that.)

Hey Ryan,

This sounds like https://github.com/druid-io/druid/issues/2845 – does that match what you’re looking for?

Indeed!

Thanks, and apologies for being too lazy to search for it.

I will add my thumbs up on the issue.