Remove null value from the input fields before aggregating

Hello,

Is there any way to remove null value fields before aggregating them?

Thanks!

If you want to filter out the entire row when a colum to be aggregated has null values, check out the transform specs: https://druid.apache.org/docs/latest/ingestion/transform-spec.html#filtering

If you want to skip aggregation for null values but still ingest the row, consider using the filtered aggregator: https://druid.apache.org/docs/latest/querying/aggregations#filtered-aggregator

Thanks for the response.

If I want to just skip the aggregation on the field that has null but ingest the row, can I do the below?

"filter": { "type": "selector", "dimension": <dimension_string>, "value": <dimension_value_string> }

and

"filter": { "type": "not", "field": <filter> }

Would this help me to skip aggregations on the null value field but still ingest it ?

Thanks

``

To be more precise, would the below block work in skipping aggregations on “id” dimension but still ingest it ?

“transformSpec”:{

“filter”: {

“type”: “not”,

“fields”: {

“type”: “selector”,

“dimension”: “id”,

“value”: “null”

}

}

}

If you want to keep the row but skip the aggregation, you’ll want FilteredAggregator.

Hi ,
I’m getting this error when I submit this query.

select count(2),OriginCityName,DestCityName,FlightDate from Flight where FlightDate >= ‘2015-03-01’ and FlightDate <= ‘2015-03-31’ group by OriginCityName,DestCityName,FlightDate;

Error:

org.apache.druid.java.util.common.RE: Failure getting results for query[04b3a118-2c72-4b21-9003-16eefe3e615f] url[http://localhost:8083/druid/v2/] because of [org.jboss.netty.channel.ChannelException: Channel disconnected]

Also during query my historical node turn off and after its autometically starts.

I send you my conf. of historical node.

HTTP server threads

druid.server.http.numThreads=12

Processing threads and buffers

druid.processing.buffer.sizeBytes=100000000

druid.processing.numMergeBuffers=1

druid.processing.numThreads=4

druid.processing.tmpDir=var/druid/processing

#Query cache

druid.historical.cache.useCache=true

druid.historical.cache.populateCache=true

druid.cache.type=caffeine

druid.cache.sizeInBytes=10000000

Segment storage

druid.segmentCache.locations=[{“path”:“var/druid/segment-cache”,“maxSize”:30000$

druid.server.maxSize=300000000000

Hey,

I want to remove the rows which have a null value and I tried both “not” and “or” filters but I still get the same error as below,

2019-06-20T18:23:47,461 ERROR [task-runner-0-priority-0] io.druid.indexing.kafka.KafkaIndexTask - Encountered exception in run() before persisting.
java.lang.NullPointerException
	at io.druid.segment.incremental.IncrementalIndex$MetricDesc.<init>(IncrementalIndex.java:928) ~[druid-processing-0.12.3.jar:0.12.3]
	at io.druid.segment.incremental.IncrementalIndex.<init>(IncrementalIndex.java:272) ~[druid-processing-0.12.3.jar:0.12.3]
	at io.druid.segment.incremental.OnheapIncrementalIndex.<init>(OnheapIncrementalIndex.java:68) ~[druid-processing-0.12.3.jar:0.12.3]
	at io.druid.segment.incremental.IncrementalIndex$Builder.buildOnheap(IncrementalIndex.java:391) ~[druid-processing-0.12.3.jar:0.12.3]
	at io.druid.segment.realtime.plumber.Sink.makeNewCurrIndex(Sink.java:266) ~[druid-server-0.12.3.jar:0.12.3]
	at io.druid.segment.realtime.plumber.Sink.<init>(Sink.java:87) ~[druid-server-0.12.3.jar:0.12.3]
	at io.druid.segment.realtime.appenderator.AppenderatorImpl.getOrCreateSink(AppenderatorImpl.java:327) ~[druid-server-0.12.3.jar:0.12.3]
	at io.druid.segment.realtime.appenderator.AppenderatorImpl.add(AppenderatorImpl.java:231) ~[druid-server-0.12.3.jar:0.12.3]
	at io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.append(BaseAppenderatorDriver.java:403) ~[druid-server-0.12.3.jar:0.12.3]
	at io.druid.segment.realtime.appenderator.StreamAppenderatorDriver.add(StreamAppenderatorDriver.java:171) ~[druid-server-0.12.3.jar:0.12.3]
	at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:612) [druid-kafka-indexing-service-0.12.3.jar:0.12.3]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:444) [druid-indexing-service-0.12.3.jar:0.12.3]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:416) [druid-indexing-service-0.12.3.jar:0.12.3]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]


Is the error even related to null values being aggregated ?

Thanks

This is my transform spec below,

“transformSpec”:{

“filter”:{

“type”:“not”,

“field”:

{

“type”:“selector”,

“dimension”:“type”,

“value”:“executor”

}

}

}

and i just found out if i send any other messages other than “type”=“executor”, i am getting this error message. Any reasons why ?

The NPE here:

2019-06-20T18:23:47,461 ERROR [task-runner-0-priority-0] io.druid.indexing.kafka.KafkaIndexTask - Encountered exception in run() before persisting.

java.lang.NullPointerException
	at io.druid.segment.incremental.IncrementalIndex$MetricDesc.<init>(IncrementalIndex.java:928) ~[druid-processing-0.12.3.jar:0.12.3]
	at io.druid.segment.incremental.IncrementalIndex.<init>(IncrementalIndex.java:272) ~[druid-processing-0.12.3.jar:0.12.3]

is coming from this block of code which is trying to find a serde implementation for a complex aggregator, it looks like it’s not able to find one.

} else {
  capabilities.setType(ValueType.COMPLEX);
  this.type = ComplexMetrics.getSerdeForType(typeInfo).getTypeName();
}

I’m not aware of a bug with TransformSpec filtering that leads to this, can you provide the full aggregators set in your ingestion task?

These are the aggregators we are using,

“metricsSpec”:[

{

“type”:“count”,

“name”:“count”

},

{

“type”:“longSum”,

“name”:“durationSum”,

“fieldName”:“duration”

},

{

“type”:“distinctCount”,

“name”:“uniqueJobs”,

“fieldName”:“jobName”

},

{

“type”:“distinctCount”,

“name”:“uniqueUsers”,

“fieldName”:“userid”

}

],

“granularitySpec”:{

“type”:“uniform”,

“segmentGranularity”:“HOUR”,

“queryGranularity”:“NONE”

},

“transformSpec”:{

“filter”:{

“type”:“not”,

“field”:

{

“type”:“selector”,

“dimension”:“type”,

“value”:“executor”

}

}

}

},

Thanks

Ah, it looks like there’s a bug with the distinct-count contrib extension, it has a non-primitive “type name” but the extension doesn’t provide a serde implementation that’s needed for this agg to work during ingestion.

Looking at the code, I would not recommend using it, its intermediate result merge function is not correct (it just sums up the bitmap sizes from each result to be merged instead of doing a union of bitmaps):

@Override

public AggregateCombiner makeAggregateCombiner()
{
  // This is likely wrong as well as combine(), see [https://github.com/apache/incubator-druid/pull/2602#issuecomment-321224202](https://github.com/apache/incubator-druid/pull/2602#issuecomment-321224202)
  return new LongSumAggregateCombiner();
}

I would suggest trying the following extensions instead (they provide approx distinct count functionality):

https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html

https://druid.apache.org/docs/latest/development/extensions-core/datasketches-hll.html

If you decide to use the DataSketches extensions, I would suggest upgrading to the latest Druid version, as there have been several bug fixes since 0.12.3.