How to shard with kafka

Hi

  1. For all brokers I have num.replica.fetchers=1. This there is no replications of topics. Doing this because druid supports partitions or replications … not both with kafka

  2. If I have 1 broker with num.partitions=2 and 1 realtime server is the below correct?

“shardSpec”: {
“type”: “linear”,
“partitionNum”: 2
},

  1. If I have two realtime servers, 1 broker…is the below still correct?

“shardSpec”: {
“type”: “linear”,
“partitionNum”: 2
},

In in essence the “partitionNum” in the realtime spec == num.partitions=2 in the kafka server.properties regardless on how many realtime servers I have.

Finnally 3 realtime servers and 2 kafka servers with num.partitions=3
“shardSpec”: {
“type”: “linear”,
“partitionNum”: 3
},

Thanks

PS…

I am writting to kafka on my one broker with replication =1 and number of partitions set to 2

I am randonly writing to a kafka partition.

I have two realtime servers.

“shardSpec”: {
“type”: “linear”,
“partitionNum”: 0
},

“shardSpec”: {
“type”: “linear”,
“partitionNum”: 1
},

I would expect that when I write to kafka the realtime server with the matching partition would always get the the data…but…its randon in the sense then if I write to partion 0 on kafka…realtine server with “partitionNum”: 1 can get the data. What am I missing here? Below is my spec flile

{
“dataSchema” : {
“dataSource” : “datasource-topic-test”,
“parser” : {
“type” : “string”,
“parseSpec” : {
“format” : “json”,
“timestampSpec” : {
“column”: “utcdt”,
“format”: “iso”
},
“dimensionsSpec” : {
“dimensions”: ,
“dimensionExclusions” : ,
“spatialDimensions” :
}
}
},
“metricsSpec” : [{
“type” : “count”,
“name” : “test”
}],
“granularitySpec” : {
“type” : “uniform”,
“segmentGranularity” : “minute”,
“queryGranularity” : “NONE”
}
},
“ioConfig” : {
“type” : “realtime”,
“firehose”: {
“type”: “kafka-0.8”,
“consumerProps”: {
“zookeeper.connect”: “<%=@zookeeper%>”,
“zookeeper.connection.timeout.ms” : “15000”,
“zookeeper.session.timeout.ms” : “15000”,
“zookeeper.sync.time.ms” : “5000”,
“group.id”: “topic-pixel-<%=@environment%>”,
“fetch.message.max.bytes” : “1048586”,
“auto.offset.reset”: “largest”,
“auto.commit.enable”: “false”
},
“feed”: “topic-test-<%=@environment%>”
},
“plumber”: {
“type”: “realtime”
}
},
“tuningConfig”: {
“shardSpec”: {
“type”: “linear”,
“partitionNum”: <%=@partitionNum%>
},
“type” : “realtime”,
“maxRowsInMemory”: 500000,
“intermediatePersistPeriod”: “PT10m”,
“windowPeriod”: “PT10m”,
“basePersistDirectory”: “/tmp/realtime/basePersist”,
“rejectionPolicy”: {
“type”: “serverTime”
}
}
}

Hi David, hopefully this makes it clear how to shard Kafka-based Druid ingestion: https://groups.google.com/forum/#!msg/druid-development/ClKCVbLfoiE/9HSoh1Z22ncJ

Another important thing to note is that Druid partitions and Kafka partitions are actually not tightly linked. The Kafka firehose splits up Kafka partitions evenly between Druid partitions, such that any given Kafka partition will map into a single Druid partition, but not necessarily the one with the same number (Kafka partition 0 could map to Druid partition 1). Also, many Kafka partitions may map to a single druid partition. So, you could have a Kafka topic with 40 partitions feeding into two druid partitions. In that case, each Druid partition would get some set of 20 Kafka partitions.