Custom Aggregator Event Processing Order

Hello,

My company is considering the use of Druid for analytics reporting over events that our main product generates.
I have a question regarding one of our more-complex use cases, and whether we could solve it by writing a custom aggregator.

Given event records containing timestamp, user, and an event type, our goal is to produce the average time users take between two subsequent events (of various types), broken down per day (or whatever granularity).

For example, we would want a result that tells us the average time users spent between event type X followed (immediately) by event type Y was 11.2 seconds on the 1st of the month, 10.3 seconds on the 2nd, and so on.

I’m pretty sure we would need a (nested group-by) query grouping by user (so we consider each pair of events on a per-user basis), with an outer query averaging across users.

I also believe we would need to create a custom aggregator for the inner query, which:

  • takes as input the timestamp & event type column names, as well as the desired event type values;
  • keeps track of “previous event timestamp”, setting it when processing an event of the 1st type;
  • also keeps track of the average thus far (e.g. total & count values, to be divided for average), updating it iff “previous event timestamp” is set and the current event is of the 2nd type (and then clearing “previous event timestamp” before the next event of the 1st type).
    My question is: can I be sure that Druid will feed the events into the aggregator in (ascending) time order, such that the above logic would work (will the aggregator receive the events in-order, such that the “previous event timestamp” would be correct)?

I’m guessing that will be the case, as Druid reads event records from each segment file. But I’m less certain of that in the face of the group-by on user, and also when Druid has the aggregator factory merge/combine data from different segments (e.g. across Druid nodes).

Thank you,

Brad

I’m guessing that will be the case, as Druid reads event records from each segment file. But I’m less certain of that in the face of the group-by on user, and also when Druid has the aggregator factory merge/combine data from different segments (e.g. across Druid nodes).

Within a single segment, I think the aggregator will receive rows in ascending time order. With a group-by on user, this should still be the case within a single segment, each per-user aggregator would still see rows for the user in ascending time order.

The merging/combine seems like it could be complicated though as you mentioned. For example, I think it’s possible depending on how your segments are partitioned for rows for the same user/time bucket to reside in different segments.

e.g., I have data for a certain day, partitioned into two segments, where segment 1 has a row user=A,event_type=X and segment 2 has the later event user=A,event_type=Y. Within each segment, the custom aggregator associated with user=A would only see one event, and I think the aggregator would have to retain enough information for the time differences to be calculated when results for the two segments are combined

Thanks for the info Jonathan!

After digging into the problem (and the Druid source code) more, we’re starting to think maybe we could add our own REST endpoint (via the Jersey extension mechanism).

This endpoint would send select/scan queries out to the historical nodes, and then process the records (in segment order) as we see fit (we’d have to roll our own group-by logic, but if not Druid then we probably would be looking at a compute-grid solution where we might have to roll our own logic anyway).

In order to do the above (our own REST endpoint which sends select/scan queries to historical nodes), we would need to inject/use classes like TimelineServerView (getTimeline(DataSource) and getQueryRunner(DruidServer)) and DimFilterUtils (filterShards(…)).

Such interfaces & static utility classes aren’t explicitly marked with @PublicApi (so they could change in even minor releases), but would they probably be relatively stable, since they deal with one of the core functions of brokers (tracking which nodes own which segments)?

Or would using those APIs be a Very Bad Thing ™ such that we should look for another option? :wink:

Thanks again!

–Brad

would they probably be relatively stable, since they deal with one of the core functions of brokers (tracking which nodes own which segments)? Or would using those APIs be a Very Bad Thing ™ such that we should look for another option? :wink:

I don’t think they’ll change drastically, but I also can’t really make any guarantees on that.

For building a custom query layer on top of Druid historicals, another option that would give you looser coupling with Druid code internals could be to query /druid/v2/candidates on the broker which will return the candidate server-segments for a given query.

Using those results, your application can query the Druid historicals directly, using SpecificSegmentSpec in the per-historical queries based on information from the previous candidates request.

The output looks like:


[
  {
    "interval": "2016-06-27T00:00:00.000Z/2016-06-27T01:00:00.000Z",
    "version": "2019-06-07T20:11:29.677Z",
    "partitionNumber": 0,
    "size": 92365,
    "locations": [
      {
        "name": "localhost:8083",
        "host": "localhost:8083",
        "hostAndTlsPort": null,
        "maxSize": 300000000000,
        "type": "historical",
        "tier": "_default_tier",
        "priority": 0
      }
    ]
  },
  {
    "interval": "2016-06-27T00:00:00.000Z/2016-06-27T01:00:00.000Z",
    "version": "2019-06-07T20:11:29.677Z",
    "partitionNumber": 1,
    "size": 94657,
    "locations": [
      {
        "name": "localhost:8083",
        "host": "localhost:8083",
        "hostAndTlsPort": null,
        "maxSize": 300000000000,
        "type": "historical",
        "tier": "_default_tier",
        "priority": 0
      }
    ]
  },