ShardSpec on Realtime

Hi,
I’ve two Realtime nodes that ingest from Kafka.

I’ve a topic with 4 partitions (3 of them produced) and this spec configuration:

"tuningConfig": {

  "type" : "realtime",

  "maxRowsInMemory": 500000,

  "intermediatePersistPeriod": "PT1h",

  "windowPeriod": "PT1h",

  "basePersistDirectory": "\/usr\/local\/dataStorage",

  "rejectionPolicy": {

    "type": "serverTime"

  },

  "shardSpec": {

    "type": "linear",

    "partitionNum": 0

    }

}

I currently need to have HA between Realtimes so same tasks will be done by both Realtime instances.

Is this the right setup? Looking at documentation it says: “For example, if you set partition number to 0, it doesn’t mean that Kafka firehose will consume only from 0 topic partition

But it’s not very clear to me, in particular I’ve no clear if partitionNum is the starting number of topic partitions, is the amount of partitions or the list (how to describe if a list?) , please explain.

Thanks

Maurizio

Hi,

Please see http://druid.io/docs/0.8.1/ingestion/overview.html#ingest-from-apache-kafka and note that you can’t have both partitioning and replication at the same time with kafka firehose.

Now, say you just want HA and no partitioning (i.e. all realtime nodes reading all the events from topic) then you would set
“shardSpec”: {
“type”: “linear”,

    "partitionNum": 0

    }

on all the nodes and make sure all of them are in different kafka consumer-group (so that each of them receive all events from the topic)

– Himanshu

Thanks for the feedback.
I can confirm that after setting two distinct consumer group I was able to ingest correctly data on Realtime and globally querable.

Thanks

Maurizio

Great to hear Maurizio!

Maurizio,

2 questions for you. (1) How do you partition your Kafka topic? Is it round robin, or do you “bucket” somehow? (2) What is the nature of your queries? Are they tolerant of data being ingested potentially in different order on the two nodes?

Thanks!

Hi Anna,
we have multiple topics, currently all of them has 4 partitions used by a custom software that produce data for Druid and consumed by two Realtime process that ingest same data.

With current configuration Broker is able to retrieve data from Realtime nodes for last hour data (segment still not persisted into Historical node).

If one of Realtime nodes goes down, Broker is aware of it and do requests to the other one.

Hope to have solved your issues.

Thanks

Maurizio

Yes, but how does the custom software publish to the topics? Are you able to guarantee that the RT processes ingest “same” data in SAME order? Or is that not a requirement for you?

Hi Anna,
the custom software is a multi-thread C code that read from a queue and put into Kafka, every thread can be faster or slower that others.

So there is no guarantee that the producer and consumer on Kafka are following the sequence of events as they happened.

However we didn’t care about the insert and read sequence, we’ve the timestamp (millisecond) for each event happened and as you know Druid is based on timestamp.

Moreover we’ve a sequence of steps customer must do, so on Druid datasource I’ve set a metric for each of these steps.

Suppose user must visit a page (step1) click on a button (step2) confirm on a second button (step3), for each customer

doing these steps the custom software will receive the step value and produce a json into Kafka with timestamp it happened (so three json produced on Kafka).

Druid side I’ll have the same three rows indexed that I can query asking for the amount of user clicked on step2 or step3, clicked before a timestamp and so on…

Hope this helps.

Maurizio

Hi Maurizio,

First – thank you so much for taking the time to explain your setup. It’s really helpful.

Second… Well, what you describe sounds like you’ve got it working, although the nature of the queries seems to not necessarily place value on absolute latest events.

What I am struggling with is the original claim: “Druid replicates segment such that logically equivalent data segments are concurrently hosted on N nodes. If N–1 nodes go down, the data will still be available for querying. On real-time nodes, this process depends on maintaining logically equivalent data segments on each of the N nodes, which is not possible with standard Kafka consumer groups if your Kafka topic requires more than one consumer (because consumers in different consumer groups will split up the data differently).” This is further explained as “Querying for the data will yield inconsistent results.”

I doubt the devs would build a whole indexing service if this were not a real issue. Perhaps this is only an issue in the case of more than one RT node per consumer group? I really wish the developers weighed in on the topic. I guess I have to do some testing on our 30-partition topic and see if I can’t reproduce the issue that was originally described.

Thanks for your help :slight_smile:

Anna!

Hi Fangjin, can you help on this?

Anna,

For any type of realtime ingestion, I recommend not using realtime nodes. For example, Imply’s distribution of Druid does not talk about or use realtime nodes. They have too many limitations. If you want to take a preview of my rewritten getting started guide for upcoming Druid 0.9.0, you should just follow the steps described here, which uses Tranquility + the indexing service.

http://druid-io.imply.io/docs/latest/tutorials/ingestion.html

http://druid-io.imply.io/docs/latest/tutorials/tutorial-streams.html

http://druid-io.imply.io/docs/latest/tutorials/tutorial-kafka.html

Tranquility will take care of partitioning, replication, etc. for you.

Thank you!