Processing before consuming from KafkaEightFirehose

Can I do some processing on the kafka messages before they are injested?

The basic idea would be to extract every message from kafka, and send it to some pre-processor function. The function would produce 0 or 1 or 2 events that should be injested by realtime node .

The logic in that function would primarily involve a DB lookup for a key-value combination. So a follow up question, If I use the metadata storage database for such purpose, would that be accessible in this pre-processor function?

I think it can be done - if you write your own firehose. Druid support plugins/modules:
I think this approach is not the best idea… it will slow down your ingestion and add complexitiy to your Druid cluster deployment, because you will often need to change business logic in your new plugin and stop realtime nodes and deploy your new plugin.

A better aproach from my point of view would be to pre-process your events via another service and then write the results to a new Kafka topic which will then be read by the regular Kafka firehose.

Which another service to use depends on the load… if you should process huge amounts, it’s good to use a stream processor (Apache Storm, Apache Samza, Apache Spark Streaming, etc…), if the amounts are not so big, just write your own service and don’t introduce additonal complexity when it’s not needed.

In upcoming Druid 0.9.0 you can do simple transforms using regex or js. THey will be slow than implementing something native:

More complex transformations should use a stream processor.

Thanks guys. Really helpful stuff. I am at a beginner phase of using Druid so please RTFD me anytime.
I think I should use Apache Storm considering a high volume of data.
The basic problem I am trying to solve using this pre-processing is actually the update of records that were injested before. The data I am dealing with has a primary key, some dimensions and a status. This status may change for this primary key over time and I need to update the record whenever this happens.

If I also keep the negated count of the previous state value at every update ingestion, then at query time, I can get the final report by the difference of the original and the negated counts.

For this, however, I would need the information of the last state of the given primary key. I plan to keep a lookup table on my own for this. Can I leverage segment data for this?

Or can you suggest a better way of doing this?

Thanks in advance.

Saurabh, what is your latency requirements for your update? I think deciding to use lookups versus just using batch ingestion to do updates will depend on understanding your use case more.

Thanks Fangjin.
Here is a overview of my usecase.

Most of the time, the stream might be idle or have very low input rate.
At times, this load would dramatically increase to upto 1 billion/hr events at Kafka for a few hours. Now most of these events would be fresh events with unique primary keys. But there would be a small percentage of events that would have the primary key of an earlier event with a different status(dimension?) value than the previous. These update events would continue to come haphazardly even after the high load period is passed for about 5-10 days.

As such, there would be no metrics. I just need the count of events that fit a criteria.

The use cases are :

  1. Have only the final value of the status counted in the aggregation/groupBy reports. The queries would be like “select top 3 dim1, count(*) as val where dim2=xyz & dim3=pqr group by val order by val desc”.

  2. Have a trended report counting all the status changes for all primary keys.

Ofcourse, i would not be making the primary key as a dimension. Should I even keep it in druid?

I think for your use case you should use batch indexing to do updates.

If I do batch indexing, I would have to keep all the data in a separate data warehouse for all the segments that I can expect an update on. Also, if I need realtime reports, I would need to run the batch reindexing practically on each update. Wouldn’t this be costly?
Finally, if i do updates on the segments, I would lose on the trended reports that need all the statuses.

Or did you mean something else by batch indexing. Can you please elaborate?

You can keep all your data in deep storage, which Druid requires anyways to operate. For realtime reports, if you need appends very quickly, Druid is great for that, if you need updates reflected very quickly, it is a poor option. Druid is not really designed for OLTP use cases.

Sorry, I was not clear with my use-case overview. Let me give an example.
During the peak hours, I would get several events like:

timestamp=2016-02-22T00:00:01Z, pk=1234, dim1…dim10, status=1 …(1)

timestamp=2016-02-22T00:00:05Z, pk=1235,dim1…dim10, status=3 …(2)

Then, there would also be events like:

timestamp=2016-02-22T00:05:00Z, pk=1234, dim1…dim10, status=7 …(3)

Now, for topN reports, I would want that I get only the latest event for pk=1234 counted in aggregations. So, a filter for status=1 should not return the record (1).

But for trended reports, I would want to count all these events (1),(2), and (3).

So, essentially, they are not update events, but they have their own timestamp values and we need to keep both the events.

Just that we want to count only the latest record in aggregations of topN/groupBy reports for a specific pk.

Saurabh, what SQL query are you trying to run?

Have you tried

Yes. I have explored plyql and pivot. The problem is not with the translation of SQL queries into druid queries. I am able to query datasources directly from druid easily.

I was hoping to have a discussion regarding the schema I should adopt for my use cases.



Reviving this thread with a follow up question.
If I send negated events for update events, I would then have 2 choices:

  1. Add a dimension negated to all events and set the flag true for the negated events

  2. Add a metric myCount with value 1 in normal cases and -1 for negated events.

Now while creating topN queries, I would have

for 1.


“queryType”: “topN”,

“dataSource”: “myds”,

“granularity”: “all”,

“dimension”: “dim1”,

“metric”: “minused”,

“threshold”: 5,

“aggregations”: [

{“type”: “filtered”,“filter”: {“type”:“selector”, “dimension”: “negated”, “value”: “true” },

“aggregator” : { “type”: “count”, “name”: “negatedcount” }},

{“type”: “filtered”,“filter”: {“type”:“selector”, “dimension”: “negated”, “value”: “false” },

“aggregator” : { “type”: “count”, “name”: "normalcount }}


“postAggregations” : [{

“type”: “arithmetic”,

“name”: “minused”,

“fn”: “-”,

“fields”: [{

      "type": "fieldAccess",

      "name": "normalcount",

      "fieldName": "normalcount"



      "type": "fieldAccess",

      "name": "negatedcount",

      "fieldName": "negatedcount"



“intervals”: [“2016-02-16T07:45/2016-03-16T07:48”]


and for 2:


“queryType”: “topN”,

“dataSource”: “myds”,

“granularity”: “all”,

“dimension”: “dim1”,

“metric”: “myCount”,

“threshold”: 5,

“aggregations”: [“type”: “longSum”, “name”: “myCount”, fieldValue:“myCount” }],

“intervals”: [“2016-02-16T07:45/2016-03-16T07:48”]


Is there any tool/runmode/queryparam wherein I could see how the indexes are being used so that I could verify which choice would be give faster results?

Saurabh, what are you update latency requirements?

Ideally we need realtime updates just like realtime appends.

Hey Saurabh,

There are a few ways people usually handle stream updates in Druid.

  1. Batch replacement, i.e. replacing an entire hour or day of data at a time in batch mode. This is high latency but gives you a lot of flexibility.

  2. “Holding back” appends until data for a row is completely known. This is common for folks storing a row-per-session in Druid, especially when a session can last a maximum of 10–30 minutes. This has the effect of adding that 10–30 minutes of latency to the feed. Usually the holding back and buffering is done in a stream processor.

  3. “Metric-only” updates. This is along the lines of what you’re talking about with negated events. This takes advantage of the fact that Druid automatically merges rows with the same dimensions at query time. The idea is that you can send two events, A and B, and as long as they have the exact same dimensions, their metrics will always be aggregated together at query time. So B can have “negative” versions of the metrics from A. Or it can add to them, if you need that instead of negating.

  4. Query-time lookups. This is most appropriate if you want to update an id-to-name mapping, but is not really useful if you want to update a lot of individual events.