Druid.coordinator.merge.on is not working on Kafka Indexing Service

hi ,

We are using the experimental exactly once Kafka Indexing Service mechanism. Because of out of order messages, we are ending up with small segments which we want to merge. So we made property druid.coordinator.merge.on to true. I have read in some other forum that sharSpec has to be none for druid.coordinator.merge.on to work. So we used below configuration.

“tuningConfig”: {

“type”: “kafka”,

“maxRowsPerSegment”: 5000000,

“shardSpec”: {

“type”: “none”

}

}

Still these segments are not getting merged. I think kafka indexing service is not honoring shardSpec. Any inputs will be really helpful.

Thanks !

Siva

Hey Siva,

These features are not currently compatible with each other due to the fact that coordinator merging requires None specs and Kafka indexing service requires Numbered specs. Improving this (i.e. having a better story around compaction of ingested segments) is on the road map for the Kafka indexing service.

Thanks Gian for swift response. This is a desirable behavior, glad to know that it’s in pipeline.

Thanks !

Siva

Hi Gian,

We moved from hadoop batch indexing to kafka indexing service and came across the same problem. There too many small segments and query performance is slow:

Any suggestions to how to handle this problem? I saw “periodically run batch indexing tasks to compact the segments.” in this blog https://imply.io/post/2016/07/05/exactly-once-streaming-ingestion.html. But there are vertical and horizontal segments, how can we merge them? I mean which druid batch indexing method can compact two kinds of segments.

By the way, previously we used druid.coordinator.merge feature and it’s quite useful, when will druid support auto-merge for kafka indexing segments?

Thank you very much.

Best wishes.

By Linbo

Hi Linbo,

If you still have access to your Hadoop cluster, doing a re-indexing using Hadoop is currently the best way to merge small segments together. You would use a ‘dataSource’ inputSpec to read the data from the segments generated by the Kafka indexing service as described here: http://druid.io/docs/latest/ingestion/update-existing-data.html

Controlling the compaction of segments horizontally and vertically can be managed respectively by choosing an appropriate segmentGranularity in the granularitySpec and setting your targetPartitionSize in the partitionsSpec, just as you would with a standard Hadoop batch job.

There’s an open PR related to automatic merging of segments using Hadoop here: https://github.com/druid-io/druid/pull/1998. I believe this should work with shardSpecs other than the NoneShardSpec, but haven’t tried it out myself. If you’re adventurous, you can give it a try and see if it works for you. Eventually we plan to support automatic merging of sharded segments without requiring Hadoop.

Hi,

Assume a scenario where I use daily segment granularity for a data source’s supervisor spec and perform a compaction job on historical data to convert the segments to weekly ones. Will it work if new data with timestamps in the past get indexed, still with daily segment granularity, into horizontally merged weekly segments?

Yes. What will happen is when the event with the timestamp in the past comes in, the indexer will look for a segment covering this time interval and will add another shard for the same time interval with the segmentGranularity of the existing segment - so in your case it would ignore the DAY segmentGranularity and create a shard with WEEK granularity.

The only caveat to this is that the compacted segment must be an extendable-type segment, and if you compact your day segments into a week segment with a single partition this will not be extendable. The “forceExtendableShardSpecs” option will be available in 0.9.2 to help with this issue (see: https://github.com/druid-io/druid/pull/3473). Alternately, you can size your segments by choosing a ‘targetPartitionSize’ such that you generate at least 2 partitions which will make them extendable.

hey Gian,

Any update on auto compaction/merging when we use Kafka supervisor for exactly once semantics. We can’t do batch reindex as suggested in article, because we get wide range of timestamps in every batch. So we can’t go back 15 days or even more and do batch reindex for compaction.

Thanks !

Siva

Hey Siva,

The next Druid release will include a revamped Index Task aimed at making compaction easier. Auto compaction is still a goal but will likely not make it into the next Druid release. Although in the meantime, you can always write your own automation around Druid’s reindexing facilities.

But, also note that even when available, auto compaction / merging would still be done as batch tasks that require locks and so if you really do write back arbitrarily far in time, you will probably have to deal with compaction and ingestion fighting for locks (one of them will have to wait). If you’re just writing “pretty far back” but not forever far back, you can avoid those lock fights if you avoid issuing compaction jobs for the region of time that you might be loading into from Kafka.

Hi Gian,

Assuming we can avoid write locks by turning on compaction for one month old data. Do you think below PR enables Kafka indexed segments to compact ? I think it’s part of Druid next release (0.9.3)

https://github.com/druid-io/druid/pull/1998

Thanks !

Siva

Hi Siva,

I haven’t looked into the details of that PR but yes I do believe it would allow for automated compaction of Kafka indexed segments using Hadoop. Unfortunately, it’s been almost a year since the last activity on the PR and it’s not clear which Druid release it’ll make it into (but it won’t be this upcoming one which is 0.10).

In the meantime, our team uses Oozie as an external scheduler that periodically submits batch ingestion jobs to compact the segments generated by the Kafka indexing service. We find that it works quite well for us and is quite stable. You could look into setting up something similar.

Also, this feature discussion while not directly related might be interesting to you, as it’ll help reduce the number of segments generated by the Kafka indexing service: https://github.com/druid-io/druid/issues/4016.

Thanks David. We are exploring similar kind of solution by using Oozie. And the PR that you shared is really useful. I think that’s a very good feature with which we can control number of segments for every batch.

Hi,

There is some new about this subject ? (auto merge small segment generated by kafka indexers )

how do i contribute to this feature ?

Is the druid.coordinator.merge.on works even on segments generated by hadoop index task?

Hi,

druid.coordinator.merge.on should work on segments generated by hadoop index task too.

You will need to make sure you have not set appendToExisting to true explicitly.

There is also a PR for making compaction more user friendly here -

https://github.com/druid-io/druid/pull/4985

Thanks Nishant, are there any advantages/disadvantages between druid.coordinator.merge.on and hadoop reindexing?

I believe that druid.coordinator.merge.on is better in general as it makes properly sized segments whereas reindexing segment size is given by segmentGranularity, right?