Using rollup as poor man's deduplication?

Hi,

I am streaming events into Druid with Kafka ingestion. I want to use druid for its real-time query capabilities, so it’s important that the moment an event appears on Kafka, it should be ingested and become queryable from Druid as soon as possible. However I have the following peculiar event deduplication scenario:

There are two event streams, one from system A, and the other from system B. Events from system B enrich the events from system A, so I have a stream processing pipeline in place that matches B to A, and emits the enriched events as a wider tuple (event_id, system_a_cols…, system_b_cols…). System B events are always late.

Events A need to enter druid the moment they are available, so that they are ingested and are immediately queryable, before they are enriched with B. However when the B enrichment takes place and the updated event is also emitted, this also enters druid as an additional row. These two rows need to be merged into one, because they are the same event.

I could try to deduplicate at query time, by expressing the “count number of events” as a “count(distinct event_id)” instead of just a “count(*)”, but the count distinct is orders of magnitude slower, so this is a no go.

I am looking to avoid heavy processing and re-ingestion outside of druid, so I’m considering expressing this deduplication using the rollup feature. Deduplication can be seen as a special case of an aggregation (aggregates two rows into 1 in this case), aggregations is what rollup does, so in principle this should be possible to express through rollup. The rollup I’m looking for here is quite simply to do the following:

Turn these two:

(event_id, A_col_1, A_col2, NULL, NULL)
(event_id, A_col_1, A_col2, B_col1, B_col2)

``

into a single row:

(event_id, A_col_1, A_col2, B_col1, B_col2)

``

So I think my rollup needs to use the event_id and all system A columns as dimensions, and aggregate the B dimensions using a ‘firstNonNull’ function or such. From a quick look druid doesn’t support for this out of the box, but I could write a javascript aggregator that could do this. Also I don’t see any mention of rollup supporting aggregating strings.

I haven’t dove into Druid internals to understand the feasibility of this. Any input is welcome.