kafka out of space and log.retention.hours

Hi guys,

We are using Kafka druid indexing and just upgraded our Kafka broker to 0.10.1.0. And we found Kafka logs are out of disk space. We found log.retention.hours=168, so we can change log.retention.hours=24 to solve out of space problem. We have several questions:

  1. This retention is calculated after druid consumed or just the Kafka logs are created.

  2. Druid may consume Kafka offset multiple times to achieve exactly once. How to make sure that certain Kafka offsets are consumed and published to historic node before Kafka delete these offsets (log.retention period)

  3. We see this on https://kafka.apache.org/documentation#upgrade_10_1_breaking:

Potential breaking changes in 0.10.1.0
The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.
Will it affect Kafka indexing service for indexing old data (couple months delayed data)? What does "Largest timestamp of the messages " mean? Is it created by Kafka itself or the timestamp inside our own data schema? If it is our timestamp, our old data will be deleted immediately?

Best wishes.

By Linbo

Hi Linbo,

The first thing to keep in mind is that Druid has no special connection to Kafka - it is just another Kafka consumer. Hence, if the data is no longer available in Kafka because the retention period has elapsed, Druid is out of luck and will not be able to request those events. Hence, you’ll want to make sure your log.retention.hours is long enough to allow Druid to retry a reasonable amount of times in case of failures while indexing. To answer your specific questions:

  1. The retention is based on the Kafka log creation time (i.e. the logs will be evicted regardless if there is a consumer reading from them or not)

  2. As far as I know, Kafka does not support ‘retaining messages until they have been consumed’. You might be able to write a monitoring process that periodically checks what offset has been committed to Druid (by checking the stored offsets in the druid.dataSource table) and somehow determine which offsets are going to be evicted soon from Kafka and then do something with this information. I’m not sure what this something is.

  3. That is Kafka’s internal timestamp and has nothing to do with Druid timestamps. This will not cause you to lose data that has already been indexing into Druid.

Hi David,

Your reply is quite useful. Thank you very much.