Custom Aggregator not working at Tranquility ingestion

Hi,

we are facing some issues with using a custom Aggregator at data
ingestion with Tranquility.
We're using the "tranquility-flink" module to ingest data into Druid
(version 0.11.0) from a Flink job. That works alright as long as one of
the built-in Aggregators (e.g. DoubleMaxAggregator) is used for the
actual Metric. However, when we switch to a custom Aggregator, data
does not get handed off and indexing jobs keep around forever.

For the custom Aggregator, I implemented the Aggregator,
BufferAggregator and AggregatorFactory interfaces in Scala code.
According to my logging output, the (unbuffered) Aggregator
implementation is the one that actually gets used.

The Aggregator has to keep some intermediary data, which is different
from the aggregation result, but from which the result can be
calculated at any point.
From what I understand, this can be achieved by returning that
intermediary data in get() and computing the result in the
AggregatorFactory's finalizeComputation() method.

However, as it appears in the logging output, get() and
finalizeComputation() are never called. Neither are getFloat() and
getLong().
aggregate(), on the other hand, is called quite frequently (as
expected).

Am I misunderstanding something about how get() and
finalizeComputation() work?
Does anybody have another idea how to explain this behavior?

Thanks and Regards,
Felix

However, as it appears in the logging output, get() and finalizeComputation() are never called. Neither are getFloat() and getLong().

I don’t think these are called until a query is issued or the data is persisted. If you query the interval being ingested by Tranquility, does your aggregator show up in the results?

However, when we switch to a custom Aggregator, data does not get handed off and indexing jobs keep around forever.

I would recommend looking at the logs for the tasks created by Tranquility to see what’s failing during the handoff process, there may be errors building the segments with the new agg or something of that nature.

Thanks,

Jon

I don't think these are called until a query is issued or the data is
persisted. If you query the interval being ingested by Tranquility,
does your aggregator show up in the results?

If I query the Peon directly, I see some results. However, the fields
the aggregated values are supposed to be in, just contains an empty
JSON object.
If I query a Broker, I don't get any results.

I would recommend looking at the logs for the tasks created by
Tranquility to see what's failing during the handoff process, there
may be errors building the segments with the new agg or something of
that nature.

Interestingly enough, I can now see some calls to get() in the Peon
logs, but still none to finalizeComputation().
The first of them is being followed by an Exception complaining that
the data type of my intermediary data (a Scala tuple) cannot be
converted to a number.

I guess that brings us book to the part of my original question whether
this mental model is correct:

From what I understand, this can be achieved by returning that
intermediary data in get() and computing the result in the
AggregatorFactory's finalizeComputation() method.

Apparently it doesn't work like this, as Druid already seems to expect
a number as result of get().

Thanks,
Felix

Hi Felix,

The first of them is being followed by an Exception complaining that

the data type of my intermediary data (a Scala tuple) cannot be
converted to a number.

I would look into that type conversion issue, it’s possibly causing the other problems you’re seeing.

For recommendations, you might have already gone through this exercise, but it can be useful to use an aggregator like CardinalityAggregator and related classes as a base/skeleton for your custom aggregator. CardinalityAggregator has a non-numeric intermediate form but has a final numeric output, which sounds similar to your use case.

Thanks,

Jon

Hi,

it's been a while, but I wanna give an update on our findings for those
interested.

First of all, we ran into other problems when trying to deploy our
Flink Job (with Tranquility) to an actual Flink cluster, instead of
just running it locally.
The issues were presumably caused by library version conflicts between
Flink and Tranquility/Druid. We probably could have fixed these for the
moment, but solutions appeared hard to maintain in the long run.

At this point, we decided to ditch Tranquility and give the Kafka
Indexing Service a try. Now, Flink writes the processed data back to a
Kafka topic in a suitable format, where Druid will pick it up.
This architecture removes any Druid-specific dependencies from the
Flink Job and decouples the different systems from each other. So far,
it appears easier to maintain and more reliable than Tranquility.

As for the actual Aggregator issue:

I would look into that type conversion issue, it's possibly causing
the other problems you're seeing.

For recommendations, you might have already gone through this
exercise, but it can be useful to use an aggregator like
CardinalityAggregator and related classes as a base/skeleton for your
custom aggregator. CardinalityAggregator has a non-numeric
intermediate form but has a final numeric output, which sounds
similar to your use case.

Thanks for the ideas. CardinalityAggregator is in fact similar, but
much more complex, so it is not always easy to learn the minimal
solution from it.

Our problem was probably caused by the fact that getTypeName() always
returned "float" (a copy/paste oversight), so Druid tried converting
our intermediary type (a Scala tuple) to a Float.
ComplexMetricSerde for serialization of the intermediary type. However,
that means fiddling with quite some undocumented Druid internals such
as ColumnBuilder and ObjectStrategy.
Therefore, we didn't finish that so far.

An alternative idea is to eliminate the need for Druid Roll-Up by
performing aggregations in the Flink Job, so that only one value per
time interval is ingested into Druid.
At the moment, it looks like we might pursue that approach.

Best Regards,
Felix