Filtered count aggregator in metricsSpec

Hi,
is it possible to use a filtered count aggregator in metricsSpec in order to roll-up data at ingestion time? I’m trying to add something like this in metricsSpec:

{

“aggregator”: {

“name”: ,

“type”: “count”

},

“filter”: {

“dimension”: “label”,

“type”: “selector”,

“value”:

},

“type”: “filtered”

}

But here’s the exception that occurs:

2016-01-08T09:01:13,523 INFO [chief-kafka-sessionized-low[0]] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[kafka-sessionized-low_2016-01-08T09:01:00.000Z_2016-01-08T09:02:00.000Z_2016-01-08T09:01:00.000Z] at path[/druid/segments/localhost:8084/localhost:8084_realtime__default_tier_2016-01-08T09:01:13.516Z_0b2ab80e89fc4263bc5f97c6f6cc179e0]

2016-01-08T09:01:13,592 ERROR [chief-kafka-sessionized-low[0]] io.druid.segment.realtime.RealtimeManager - RuntimeException aborted realtime processing[kafka-sessionized-low]: {class=io.druid.segment.realtime.RealtimeManager, exceptionType=class java.lang.NullPointerException, exceptionMessage=null}

java.lang.NullPointerException

at io.druid.segment.incremental.IncrementalIndex$1$6.lookupId(IncrementalIndex.java:238) ~[druid-processing-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

at io.druid.segment.filter.SelectorFilter.makeMatcher(SelectorFilter.java:67) ~[druid-processing-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

at io.druid.query.aggregation.FilteredAggregatorFactory.factorize(FilteredAggregatorFactory.java:53) ~[druid-processing-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

at io.druid.segment.incremental.OnheapIncrementalIndex.addToFacts(OnheapIncrementalIndex.java:144) ~[druid-processing-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

at io.druid.segment.incremental.IncrementalIndex.add(IncrementalIndex.java:462) ~[druid-processing-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

at io.druid.segment.realtime.plumber.Sink.add(Sink.java:127) ~[druid-server-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

at io.druid.segment.realtime.plumber.RealtimePlumber.add(RealtimePlumber.java:216) ~[druid-server-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

at io.druid.segment.realtime.RealtimeManager$FireChief.runFirehose(RealtimeManager.java:362) ~[druid-server-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:264) [druid-server-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]

When the filtered aggregator is removed from metricsSpec no exceptions occur.

The events are in the form of: {: , : , : , label: }

and label is in the list of dimensions of dimensionsSpec. What I’m trying to achieve is to pre-aggregate the count of each event by label. The queryGranularity is set to “MINUTE”.

E.g:

event1 - {‘label’: ‘event1’, ‘timestamp’: }

event2 - {‘label’: ‘event1’, ‘timestamp’: <now + 1s>}

event3 - {‘label’: ‘event1’, ‘timestamp’: <now + 61s>}

event4 - {‘label’: ‘event2’, ‘timestamp’: }

What I’m expecting is:

{‘label’: ‘event1’, ‘event1’: 2, ‘timestamp’: }

{‘label’: ‘event1’, ‘event1’: 1, ‘timestamp’: <now + 60s>}

{‘label’: ‘event2’, ‘event2’: 1, ‘timestamp’: <now + 60s>}

We already successfully managed to do this kind of aggregation at query time (not at ingestion time) with another datasource with queryGranularity set to “NONE”, so no roll-up. But now we would like to do the same thing at ingestion time in another datasource.

I’ve just found this issue https://github.com/druid-io/druid/issues/2061, so I guess that it is not supported and that I have to denormalize the data before ingesting them.

Hi,
yeah, using FilteredAggregator during ingestion is not supported at present.

Also, druid rollup is done for unique combination of truncated timestamp and dimension values.

having filtered metrics is not going to affect the rollup.