Important : Kafka Integration + Segment Indexing

Hi,

Please see, I’m evaluating Druid.io for analytics purpose, and asked few questions earlier and got excellent and quick replies.

I appreciate that a lot.

I reached a point, where I’m getting bit confuse about the indexing ( Segments ) and the Kafka integration with Druid.io.

And these two are very important for our system to go further.

  1. Kafka Integration:

https://groups.google.com/forum/#!topic/druid-development/_y_lb3t1IZg

I gone through this thread, and which is awesome :slight_smile:

I understood lot of stuff, though still one grey area for me so asking few questions.

Now :

Taking example / setup:

Kafka

topic: topic1,
partitions : p0 to p9 ( 10 partitions),
Kafka Consumer group = k-group1

Druid

Realtime nodes: 5 nodes running ( each running for Kafka consumer group = k-group1)
So, each node should be responsible for 2 partitions of Kafka.

Q1: Is it possible to have realtime-nodes < kafka partitions like 5 Realtime nodes for 10 kafka-partitions ?

Q2: Irrespective of the answer of the 1st question. Next question is :slight_smile:
From the earlier statement of –

From Gian Merlino–> " The multiple-consumer-group problem is less about ordering and more about which kafka partitions end up being read into which druid partitions. You will have a problem if druid partition 1 in consumer group A is reading data from kafka partitions 1 & 2, but druid partition 1 in consumer group B is reading from kafka partitions 2 & 3. The broker will assume that both instances of partition 1 have the same data, but that won’t be true. "

I’m not able to understand if there are 2 different Kafka-Consumer-groups, it will consume data from same partitions ( duplicating the data). As High Level Kafka consumer will not consume the distinct partitions but will consume all the partitions.
OR
When Gian is referring “consumer group A”, its something Druid consumer group and not kafka consumer group. ( I didnt find much of druid-consumer group thing). ?

       Q3: In continuity with Q2. 

a. Is there any specific partition logic in druid when consuming and indexing Kafka data?

b. Is there any relation between the Kafka partitions to the Druid.io partitions?

As the Gian statements said that if different kafka partitions data goes to same partition of Druid, then broker will think that Druid partition has data always from same kafka partitions but that is not happening so there will be a problem?

c. If the indexer has some special logic for partitioning data in druid segments then is it known to Broker as well?

Ideally it should not, as if the query comes for particular

---- column filter

---- aggregation

then Broker will identify the realtime nodes as for standard case ( its my understanding :slight_smile: ) based on DataSource_StartTime_EndTime_Version to identify the right Realtime node.

So, there is no mapping of the query and the Kafka-partitions or Druid-partitions?

Please confirm.

  1. Segment indexing:

http://druid.io/docs/latest/design/segments.html

http://static.druid.io/docs/druid.pdf

I tried to understand the indexing logic from above page and paper.

I got most of the stuff ( I think so :slight_smile: ) , but I want to just clear all the doubts as this is the ver-very important too.

Example given :

"

1: Dictionary that encodes column values
  {
    "Justin Bieber": 0,
    "Ke$ha":         1
  }

2: Column data
  [0,
   0,
   1,
   1]

3: Bitmaps - one for each unique value of the column
  value="Justin Bieber": [1,1,0,0]
  value="Ke$ha":         [0,0,1,1]

"

Q1: The 2nd data structure that stores the Dictionary integer value to the right index in array to represent that this string is present at this row, for ex:

3: Bitmaps - one for each unique value of the column
value=“Justin Bieber”: [1,1,0,0]
value=“Ke$ha”: [0,0,1,1]

This represents the array represents which distinct value is at which row.

In our case

This means that "Justin Bieber" is there at row 1 and row 2 in this segment.

And similarly “Ke$ha” is present at row3 and row4.

The size of this bitset is 4, because in example it has 4 rows.

So, if any segment has 5 million rows, then each column’s unique value of the table/data-source will have BitSet with 5 million entries ( Druid uses RoaringBitMap).
Is it correct?

Q2: How this will help to quickly answer the “filter queries” as if my query is :
Filter --> page="Ke$ha"
then when it will try to identify the rows with “Ke$ha” is there, the steps will be:

  1. Get the ID from Dictionary for “Ke$ha”

  2. Iterate through the BitSet and get the set indexes and then assuming that index will be pointer to get the actual metric values.
    Is iteration of this big BitSet or RoaringSet is efficient.
    Or as RoaringBitSet is advance and efficient then BitSet, it optimize this operation.
    If yes then how :slight_smile: ?

  3. Then apply aggregation on top of the values.

Is this understanding correct?

Q3:

2: Column data
  [0,
   0,
   1,
   1]

This is the array that represent the Dictionary integer ID of the column value and the index is the row where its present, that is perfect !!

This is needed to find the TopN or GroupBy.

a. Is this array size for the 5 million entries will be 5 million ?

b. For Group-by :
It needs to iterate through all the entries and then create groups of the same integers ?
Which might not be efficient, Is it thats why GroupBy in Druid is not efficient?

c. For TopN:
TopN query is always happened on a Dimension, so that dimension will be used for group-by kind.

This also needs iteration of all the elements and then while iterating get the metric values and keep applying the aggregation function and sorting the values with N sorted elements.
And when the iteration is finished we should get a list of topN elements based on one dimension ( in our case its ‘page’ )
So, wondering how the TopN is very efficient in Druid ?

I’m definitely missing something :slight_smile: , please explain.

Q4:
How and where is the link between the Row and the Metric-Name, Values.
Basically when the Page column value like:

value="Justin Bieber": [1,1,0,0]
  value="Ke$ha":         [0,0,1,1]

says that ‘Justin Bieber’ is present in row1 , so where and how the values of Metrics are stored in segment files.

Q5:
Is it possible to read the 0000.smoosh files where the segment indexes are written those are in binary format, I believe?

Lots of questions, some might be naive and not correct.

But please if someone can respond that would be great.

I have some immediate need to evaluate and work on this solution.

So, if someone can help by Monday that would be very helpful !!

Please let me know if more explanation is required for any of the question?

Regards.

Manish

Please see the first question which was related to Kafka kind of clear to me :slight_smile:

But I was wondering the in case of Kafka, if the realtime node can themselves do the replication, much like Kafka, HDFS or Cassandra.

Then the Indexing Service / Overlord might not be required.

And I can just have my multiple Realtime nodes which are consumers using same ConsumerGroup and Kafka will handle the load-balance and Realtime nodes will handle the replication.

Regards,

Manish

Inline.

Hi,

Please see, I’m evaluating Druid.io for analytics purpose, and asked few questions earlier and got excellent and quick replies.

I appreciate that a lot.

I reached a point, where I’m getting bit confuse about the indexing ( Segments ) and the Kafka integration with Druid.io.

And these two are very important for our system to go further.

  1. Kafka Integration:

https://groups.google.com/forum/#!topic/druid-development/_y_lb3t1IZg

I gone through this thread, and which is awesome :slight_smile:

I understood lot of stuff, though still one grey area for me so asking few questions.

Now :

Taking example / setup:

Kafka

topic: topic1,
partitions : p0 to p9 ( 10 partitions),
Kafka Consumer group = k-group1

Druid

Realtime nodes: 5 nodes running ( each running for Kafka consumer group = k-group1)
So, each node should be responsible for 2 partitions of Kafka.

Q1: Is it possible to have realtime-nodes < kafka partitions like 5 Realtime nodes for 10 kafka-partitions ?

Yes, this is possible. There is no direct mapping between kafka partitions and realtime partitions.

Q2: Irrespective of the answer of the 1st question. Next question is :slight_smile:
From the earlier statement of –

From Gian Merlino–> " The multiple-consumer-group problem is less about ordering and more about which kafka partitions end up being read into which druid partitions. You will have a problem if druid partition 1 in consumer group A is reading data from kafka partitions 1 & 2, but druid partition 1 in consumer group B is reading from kafka partitions 2 & 3. The broker will assume that both instances of partition 1 have the same data, but that won’t be true. "

I’m not able to understand if there are 2 different Kafka-Consumer-groups, it will consume data from same partitions ( duplicating the data). As High Level Kafka consumer will not consume the distinct partitions but will consume all the partitions.
OR
When Gian is referring “consumer group A”, its something Druid consumer group and not kafka consumer group. ( I didnt find much of druid-consumer group thing). ?

       Q3: In continuity with Q2. 

a. Is there any specific partition logic in druid when consuming and indexing Kafka data?

b. Is there any relation between the Kafka partitions to the Druid.io partitions?

As the Gian statements said that if different kafka partitions data goes to same partition of Druid, then broker will think that Druid partition has data always from same kafka partitions but that is not happening so there will be a problem?

c. If the indexer has some special logic for partitioning data in druid segments then is it known to Broker as well?

Ideally it should not, as if the query comes for particular

---- column filter

---- aggregation

then Broker will identify the realtime nodes as for standard case ( its my understanding :slight_smile: ) based on DataSource_StartTime_EndTime_Version to identify the right Realtime node.

So, there is no mapping of the query and the Kafka-partitions or Druid-partitions?

Please confirm.

I hope this thread answers most of your questions:

https://groups.google.com/forum/#!searchin/druid-development/fangjin$20yang$20"thoughts"/druid-development/aRMmNHQGdhI/muBGl0Xi_wgJ

  1. Segment indexing:

http://druid.io/docs/latest/design/segments.html

http://static.druid.io/docs/druid.pdf

I tried to understand the indexing logic from above page and paper.

I got most of the stuff ( I think so :slight_smile: ) , but I want to just clear all the doubts as this is the ver-very important too.

Example given :

"

1: Dictionary that encodes column values
  {
    "Justin Bieber": 0,
    "Ke$ha":         1
  }

2: Column data
  [0,
   0,
   1,
   1]

3: Bitmaps - one for each unique value of the column
  value="Justin Bieber": [1,1,0,0]
  value="Ke$ha":         [0,0,1,1]

"

Q1: The 2nd data structure that stores the Dictionary integer value to the right index in array to represent that this string is present at this row, for ex:

3: Bitmaps - one for each unique value of the column
value=“Justin Bieber”: [1,1,0,0]
value=“Ke$ha”: [0,0,1,1]

This represents the array represents which distinct value is at which row.

In our case

This means that "Justin Bieber" is there at row 1 and row 2 in this segment.

And similarly “Ke$ha” is present at row3 and row4.

The size of this bitset is 4, because in example it has 4 rows.

So, if any segment has 5 million rows, then each column’s unique value of the table/data-source will have BitSet with 5 million entries ( Druid uses RoaringBitMap).
Is it correct?

Uncompressed, there will be 5M entries. However, the compression we see with real time world is quite significant and the compressed sets are very tiny.

Q2: How this will help to quickly answer the “filter queries” as if my query is :
Filter --> page="Ke$ha"
then when it will try to identify the rows with “Ke$ha” is there, the steps will be:

  1. Get the ID from Dictionary for “Ke$ha”
  2. Iterate through the BitSet and get the set indexes and then assuming that index will be pointer to get the actual metric values.
    Is iteration of this big BitSet or RoaringSet is efficient.
    Or as RoaringBitSet is advance and efficient then BitSet, it optimize this operation.
    If yes then how :slight_smile: ?

By default Druid uses Concise (not Roaring) and there is a cursor that walks through the rows that match the filtered bitset.

  1. Then apply aggregation on top of the values.

Is this understanding correct?

Q3:

2: Column data
  [0,
   0,
   1,
   1]

This is the array that represent the Dictionary integer ID of the column value and the index is the row where its present, that is perfect !!

This is needed to find the TopN or GroupBy.

a. Is this array size for the 5 million entries will be 5 million ?

Uncompressed.

b. For Group-by :
It needs to iterate through all the entries and then create groups of the same integers ?
Which might not be efficient, Is it thats why GroupBy in Druid is not efficient?

No, groupBy is not efficient because of the implementation of the query engine combined with teh fact that topN uses approximate results.

c. For TopN:
TopN query is always happened on a Dimension, so that dimension will be used for group-by kind.

This also needs iteration of all the elements and then while iterating get the metric values and keep applying the aggregation function and sorting the values with N sorted elements.
And when the iteration is finished we should get a list of topN elements based on one dimension ( in our case its ‘page’ )
So, wondering how the TopN is very efficient in Druid ?

I’m definitely missing something :slight_smile: , please explain.

Please see: http://druid.io/docs/latest/querying/topnquery.html

Thanks Fangjin !!

Regards,

Manish