Kafka ingestion resets offset to 0, out of range

It’s plain ingestion task from within Kafka. We have plenty of those running.

What is different from the usual case is the fact that there used to be datasource with the same name as the one just created, but we have removed it few hours after creation (Mark unused, Issue kill task etc.), as we have noticed couple invalid fields in our new stream.

Now, after creating it anew, it keeps resetting offsets to 0 / earliest, even though I have directly disabled it:

Use earliest offset = false
Skip offset gaps = true
Reset offsets automatically - I’ve tried both true and false.

It seems like in case of offset earliest != true, there seems to be some kind of strange internal handling for offsets in druid, where it tries to manually seek to specific offset instead of letting Kafka clients handle it automatically to start from the top of the stream.

I’d like to know whether my assumptions are correct and if there is anything I might try to resolve that issue.


Things I've tried

I’ve tried playing around turning those offset-related params on/off in a different combinations.

Architecture 3 - data nodes (historical/middle manager)
Logs ``` 2021-03-23T12:47:08,245 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms 2021-03-23T12:47:38,246 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Fetch position FetchPosition{offset=12049398, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.1.128.108:9092 (id: 7 rack: 2)], epoch=0}} is out of range for partition topic-name-10m-26, raising error to the application since no reset policy is configured 2021-03-23T12:47:38,246 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=12049398, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.1.128.108:9092 (id: 7 rack: 2)], epoch=0}} is out of range for partition topic-name-10m-26] 2021-03-23T12:47:38,246 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Seeking to EARLIEST offset of partition topic-name-10m-26 2021-03-23T12:47:38,250 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Resetting offset for partition topic-name-10m-26 to offset 0. 2021-03-23T12:47:38,250 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Seeking to offset 12049398 for partition topic-name-10m-26 2021-03-23T12:47:38,250 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Seeking to offset 12049398 for partition topic-name-10m-26 2021-03-23T12:47:38,250 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms 2021-03-23T12:48:08,251 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Fetch position FetchPosition{offset=12131620, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.1.128.111:9092 (id: 10 rack: 2)], epoch=0}} is out of range for partition topic-name-10m-17, raising error to the application since no reset policy is configured 2021-03-23T12:48:08,251 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=12131620, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.1.128.111:9092 (id: 10 rack: 2)], epoch=0}} is out of range for partition topic-name-10m-17] 2021-03-23T12:48:08,251 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Seeking to EARLIEST offset of partition topic-name-10m-17 2021-03-23T12:48:08,253 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Resetting offset for partition topic-name-10m-17 to offset 0. 2021-03-23T12:48:08,253 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Seeking to offset 12131620 for partition topic-name-10m-17 2021-03-23T12:48:08,253 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-coclnlfl-1, groupId=kafka-supervisor-coclnlfl] Seeking to offset 12131620 for partition topic-name-10m-17 2021-03-23T12:48:08,253 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms```

Relates to Apache Druid 0.20.1

Have you attempted to reset the supervisor? I wonder if it has somehow recorded an offset and is continually going back to it…

I’ve been recreating it couple of times.

Have you checked for any weirdness in the metadata? Just for sanity?

Can you elaborate? Metadata of what?

And what would you conclude to be the norm?

Strangely enough, after leaving the ingestion task running like that for couple of days it has started working.

We still don’t know what was the initial issue nor why did it resolve itself :confounded:

Still thanks for taking a look :slight_smile:

OH no that’s the worst cos now I want to know :smiley: :smiley:

Sorry to bring this back to life! I have seen this issue.
Exactly same as what’s mentioned here by Michal.

I attributed this issue to - (a conspiracy theory :grinning:)

Kafka topics got dropped. This led to Druid Kafka consumers to become kind of orphans and were left seeking for a high offset value which doesn’t exist on the broker. I guess, if we have the auto.create.topics.enable=true and
allow.auto.create.topics=true on the kafka broker, the Druid kafka consumer creates the topic and that messes up the offset metadata stored in the metadata table druid_tasks.

I expected reset API to fix things for me. But, that didn’t help.

Actually we had auto.create.topics.enable=false configured on our cluster, so that might have been a miss.