Too many shards (kafka indexing services)

Hi there!

I’m using the Kafka Indexing service intrduced in the 0.9.1 release. Everything works just fine, but I’m getting an incredible amount of shards per segment (granularity is set to DAY).

I have two tasks running for each dataSource, and I keep getting data from a mysql db with a service that runs every hour to get the latest data. This is creating more than 100 shards per segment, and I think that this is going to lower the query performance a lot.

Could you please explain why is this happening? Can I avoid that the service creates so many shards?

Thank you in advance!

F.

I have the exact same situation. Kafka indexing service reading from 1 Kafka topic with 5 partitions, have segment granularity “hour” ends up with 10+ shards per hour. Resulting segment size is around 240M, some shards are of size 0 or 8K, some are 24MB.

And queries have a huge “query/wait/time”.

Hey Fede and Nikita,

Kafka indexing service creates a different segment for each Kafka partition in order to ensure deterministic segment generation. The issue is that since Kafka only guarantees message order within a partition and not across multiple partitions, if two replica tasks were to write events from multiple partitions into a single segment each and maxRowsPerSegment was hit and we cut a segment, each of those segments would contain different data depending on the order the Kafka consumer received them. In order to get around the limitation of no order guarantee across a Kafka topic, we have to treat each partition separately.

The other thing to keep in mind is that taskDuration don’t align themselves on time boundaries the way that
segmentGranularity does. A segmentGranularity of an hour with realtime data will run from the start of the current hour to the end of the current hour, whereas a taskDuration of an hour will run from now until now+1H. This is why Nikita is seeing 10+ shards per hour for 5 partitions and why the segment size is not consistent.

To minimize the number of segments created, use as few Kafka partitions as necessary that still allow you to meet your Kafka/Druid ingestion throughput requirements (since partitions cannot be split across multiple Druid indexing tasks). You can also try increasing taskDuration, but there’s tradeoffs to doing that (MMs may need more resources for queries as it’s holding onto data longer, in the case of a
failure more data will need to be reindexed).

Right now, the best way to handle excessive segments is to run a Hadoop batch job using a dataSource inputSpec to re-index the data from the existing segments into more optimally sized segments. Docs for how to do this are here:

http://druid.io/docs/0.9.1.1/ingestion/update-existing-data.html

In the future, we are looking to add alternate ways to do this that don’t require Hadoop.

Hope this helps,
David

Thank you David,

I’ll try the reindexing and see how it works. I’m posting the results later.

Hi David,
I have same issue with kafka indexing service. Do you have some good example on how to run daily/weekly re-index task to keep data optimized? Or is it only manual job?

Not sure if I understand how re-indexing will work.from the doc… is it possible to re-index data injesting to the same datasource or I will allways result in 2 sources - initial(huge amount of small shards) and optimized.

Hi - Yes it is possible to re-index the data of specific datasource and push it back to the same datasource. It will replace the earlier segments with new one and once done your queries will be served by new segments.

Regards,
Arpan Khagram

+91 8308993200

Hi
Thanks. Where can I find example of such operation? Some queries examples would be really helpful.

Hi - There are 2 ways you can do it - one is through indexing firehose service and another one is through hadoop batch job.

Firehose indexing works and should only used for prototype or small amount of data. If you are willing to re-index large data set than you should go for hadoop batch job. Shall provide you with some sample task JSON.

Regards,

Arpan Khagram

+91 8308993200

Thanks, Arpan.

Here is working example of reindexing firehose job in case someone need it.

{

“type”: “index”,

“spec”: {

“dataSchema”: {

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: [

“dimension1”,

“dimensionN”

],

“dimensionExclusions”: ,

“spatialDimensions”:

}

}

},

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “WEEK”,

“queryGranularity”: “HOUR”,

“rollup”: true,

“intervals”: [

“2017-07-03T00:00:00.000Z/2017-08-28T00:00:00.000Z”

]

},

“dataSource”: “dataSourceName”,

“metricsSpec”: [

{

“type”: “longSum”,

“name”: “metric1”,

“fieldName”: “Metric1”

},

{

“type”: “longSum”,

“name”: “metricN”,

“fieldName”: “MetricN”

}

]

},

“ioConfig”: {

“type”: “index”,

“firehose”: {

“type”: “ingestSegment”,

“dataSource”: “dataSourceName”,

“interval”: “2017-07-03T00:00:00.000Z/2017-08-28T00:00:00.000Z”

}

},

“tuningConfig”: {

“type”: “index”,

“targetPartitionSize”: 5000000,

“maxRowsInMemory”: 75000

}

}

}

``