Understanding behaviour of Kafka Indexing service

Hi All,

I am using druid 0.9.2 and recently trying to use kafka indexing service for realtime analytics.

I have got this working but there are some edge cases which I am still not able to understand.

  1. In my supervisor spec rollup is true so any event which have all the dimension same should roll up.

while this work fine until the segment is handed off but if a new event comes with all dimension same after segment handed off I get metrics differently.

events :

Dim1 Dim2 timestamp

A B 2017-12-22 14:00:00

A B 2017-12-22 14:00:00

Until now segment is not handed off and count query will result in 1 as rollup is true.

After this segment handed of and still if we query count will be 1.

One more event came with same dimension

A B 2017-12-22 14:00:00

I was expecting this still result to be 1 but result got was 2.

2- How the segment will be created if we receive event of the same timestamp for which segment is already handed off.

Also is there any white paper or architecture diagram exist for the kafka realtime indexing to understand it in detail ?

Kafka index task will create a new partition for segments that have been handed off, therefore the count you are getting is 2 as one row exists in each partition.

  • Parag

Kafka index task will create a new partition for segments that have been handed off, therefore the count you are getting is 2 as one row exists in each partition.

1- Will these partition be merged later on in historical.

2- This holds true for other metrics also, but out requirement is to merge the metrics in each partition to get on metric.Is there anyway by which this can be achieved?

These partitions will not get merged unless you have a separate lambda pipeline setup that does regular batch ingestion and replaces the segments created by kafka indexing tasks. From query perspective the results would be same for data present in one segment as compared to data present in 2 partitioned segments. Only the count query will give different results are it gives a count of number of Druid rows present in segments.

Kafka index task will create a new partition for segments that have been handed off, therefore the count you are getting is 2 as one row exists in each partition.

1- Will these partition be merged later on in historical.

2- This holds true for other metrics also, but out requirement is to merge the metrics in each partition to get on metric.Is there anyway by which this can be achieved?

Also there is work going on (https://github.com/druid-io/druid/pull/5102) to automatically merge smaller segment partitions created by Kafka Index task.

These partitions will not get merged unless you have a separate lambda pipeline setup that does regular batch ingestion and replaces the segments created by kafka indexing tasks. From query perspective the results would be same for data present in one segment as compared to data present in 2 partitioned segments. Only the count query will give different results are it gives a count of number of Druid rows present in segments.

Kafka index task will create a new partition for segments that have been handed off, therefore the count you are getting is 2 as one row exists in each partition.

1- Will these partition be merged later on in historical.

2- This holds true for other metrics also, but out requirement is to merge the metrics in each partition to get on metric.Is there anyway by which this can be achieved?

Hi,

Thanks for your quick response.

But we check with other metric such as sum of (max of a metric), and result was not as we were expecting.

ex : before segment handoff :

event 1: Dim=A metric = 0

event 2: Dim=A metric = 1

Sum of max of metric is = 1.

after segment handoff an other event cam with same

event 3: event 2: Dim=A metric = 1

Sum of max of metric was return as 2 but expected was 1.

Can you mention the exact query that you are using.

  • Parag

Hi,

Thanks for your quick response.

But we check with other metric such as sum of (max of a metric), and result was not as we were expecting.

ex : before segment handoff :

event 1: Dim=A metric = 0

event 2: Dim=A metric = 1

Sum of max of metric is = 1.

after segment handoff an other event cam with same

event 3: event 2: Dim=A metric = 1

Sum of max of metric was return as 2 but expected was 1.

Hi Parag,

Please find below query

{

“metric”: “sum__Max metric”,

“aggregations”: [

{

“fieldName”: “Max metric”,

“type”: “longSum”,

“name”: “sum__Max Metric”

}

],

“dimension”: “Dim 1”,

“intervals”: “2017-10-03T00:00:00+00:00/2018-01-03T10:49:04+00:00”,

“dataSource”: “POC_1”,

“granularity”: “all”,

“threshold”: 50000,

“postAggregations”: ,

“queryType”: “topN”

}

Also I have one more doubt other than this :

The dimension created using an extraction function doesn’t works in filter ?

So from the query it seems like you are just doing a sum over the metric, so getting 2 as result is expected as there will be two rows in two segments having metric value as 1 in each. When you say sum of max of metric, are you using longMax aggregate function while indexing, if yes then you will have to use the longMax aggregate function first and then use a longSum to actually get sum of max.

I am not very familiar with extraction fn and filters, probably ask a separate question for that, also if you think it should work and its not working then raise an issue on github.

– Parag

Also I have one more doubt other than this :

The dimension created using an extraction function doesn’t works in filter ?