we are facing some issues with using a custom Aggregator at data
ingestion with Tranquility.
We're using the "tranquility-flink" module to ingest data into Druid
(version 0.11.0) from a Flink job. That works alright as long as one of
the built-in Aggregators (e.g. DoubleMaxAggregator) is used for the
actual Metric. However, when we switch to a custom Aggregator, data
does not get handed off and indexing jobs keep around forever.
For the custom Aggregator, I implemented the Aggregator,
BufferAggregator and AggregatorFactory interfaces in Scala code.
According to my logging output, the (unbuffered) Aggregator
implementation is the one that actually gets used.
The Aggregator has to keep some intermediary data, which is different
from the aggregation result, but from which the result can be
calculated at any point.
From what I understand, this can be achieved by returning that
intermediary data in get() and computing the result in the
AggregatorFactory's finalizeComputation() method.
However, as it appears in the logging output, get() and
finalizeComputation() are never called. Neither are getFloat() and
aggregate(), on the other hand, is called quite frequently (as
Am I misunderstanding something about how get() and
Does anybody have another idea how to explain this behavior?
Thanks and Regards,