Druid Realtime Indexing (Kafka FireHose)

Hi,

I’m using the latest available version of the Kafka Firehose and a single Realtime Node. Using pyKafka for producer.

I’m currently monitoring the consumer lag using Yahoo’s “kafka-manager” and the Realtime Node is most of the time around 4k-8k messages behind. My producer sends messages in an hourly interval most of the time and I can see that the lag remains the same when the producer takes a break. How can I force my Realtime Node to keep consuming when the producer takes a break?

Or have I completely misunderstood how this setup should be implemented? Thanks in advance!

Hi Sven, are you displaying metrics in your logs when it comes to the realtime node? Printing those would be very interesting.

Hi and thanks for helping out!

Metrics below are taken from the realtime node. Are these the metrics you mean?

2015-11-15T17:15:00,072 INFO [qtp532613259-26] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.072Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“query/time”,“value”:41,“context”:"{“queryId”:“5a93667e-c775-4c91-8dcb-4219d635870d”,“timeout”:300000}",“dataSource”:“testing”,“duration”:“PT25833600S”,“hasFilters”:“true”,“id”:“5a93667e-c775-4c91-8dcb-4219d635870d”,“interval”:[“2015-01-19T17:15:00.000Z/2015-11-14T17:15:00.000Z”],“remoteAddress”:“146...***”,“type”:“timeseries”}]

2015-11-15T17:15:00,151 INFO [qtp532613259-37] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.151Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:0}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:0}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:1}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:2}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:3}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:4}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:5}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:6}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:7}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:8}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:9}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:10}]

2015-11-15T17:15:00,152 INFO [qtp532613259-24] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-11-15T17:15:00.152Z”,“service”:“realtime”,“host”:“testing-broker1:8084”,“metric”:“segment/scan/pending”,“value”:11}]

Do you have the metrics related to “ingest/*”?

This is the current state in kafka-manager:

Partition
LogSize
Consumer Offset
Lag
Consumer Instance Owner
0
1477308
1473304
4004
druid-example_testing-broker1-1447592733548-911ce001-0
1
27636
23614
4022
druid-example_testing-broker1-1447592733548-911ce001-0
To me it seems that 8026 messages has not yet been ingested, right? This number will eventually go down again, but not until the producer starts again it seems.

Hi Sven, in your logs of your realtime node, can you find all the metrics that include “ingest/*” and post those?

Sorry for my misunderstanding. Please see the attached file.

Best regards

realtime.log (300 KB)

Just curious, the consumer should continue pulling messages until the current offset is reached?

Hi Sven,

I think the offset you are seeing here is the offset until which kafka messages have been committed (i.e persisted to disk on the realtime node) and not what has been ingested in druid actually.

What is the intermediatePersistPeriod set to on your node ?

fwiw, on each intermediate persist batch of events is persisted on disk and the kafka offset is comitted, if the intermediate persist period is too high you may see this lag with consumer offset in zookeeper but actually events might have already made to druid and are just in memory.

you can also verify that all the data is in druid by making a count query for number of events and checking that the results are as expected.

I think Nishant means a longSum query and not a count query, but yes, from the logs everything looks good I think understanding when offsets are committed is important.

Thanks, the longSum did give me a few more transactions compared to count.

Regarding intermediatePersistPeriod, it’s the default value PT10M, but I will lower it to see how it impacts the lag. Since I’m using SSDs, it should be alright I guess for the amount of events I have.

Thanks a lot for the input guys!