Kafka to Druid: Previous sequenceNumber [x] is no longer available for partition [y]

Hi folks,

I’m ingesting data into Druid from Kafka. The ingestion works fine for a few days but then fails. I see the following error in the supervisor UI:

“Previous sequenceNumber [289] is no longer available for partition [0]. You can clear the previous sequenceNumber and start reading from a valid message by using the supervisor’s reset API.”

I am not sure why I’m getting this. I’ve reset the supervisor several times; ingestion starts working but then again fails after a few days.

My Kafka deployment is configured to store messages for 7 days. So I’m guessing once 7 days pass, old messages are no longer available and then I see the above message. But I would have expected Druid to have ingested these old messages, and moved to new sequence numbers. Can someone help what might be going wrong?

For what its worth, both Kafka and Druid are always up, so its not a matter of not having time to ingest the incoming messages.

I saw https://support.imply.io/hc/en-us/articles/360009051253-Issue-Indexing-tasks-from-Kafka-or-Kinesis-are-finishing-successfully-but-without-any-data-ingested-, but that is not what’s happening in my case.

Any pointers will be highly appreciated. Thanks in advance.

Regards,
FM

Are you by any chance terminating or resetting the supervisor at any point in the process?

What do u specify for useEarliestOffset in your spec?

Also…is your data persisting to disk or is your data always in memory?

Thanks Chris for the response. Please see answers inline:

Are you by any chance terminating or resetting the supervisor at any point in the process?

No, I’m not terminating or resetting the supervisor. I reset it only once when I first encountered this issue. After resetting, ingestion worked for a few days and then ran into the same issue.

What do u specify for useEarliestOffset in your spec?

useEarliestOffset is set to true. I expect this will result in reading the Kafka topic from the earliest point available. The initial supervisor tasks succeed, which indicates this shouldn’t be an issue. But maybe I’m wrong.

Also…is your data persisting to disk or is your data always in memory?

On the Kafka side, I’m using AWS MSK which is persisting to disk. On the Druid side, I’ve configured it to use Postgres (in AWS) for metadata storage and S3 for deep storage.

Regards,
FM

No problem. I’m hoping we can figure this out.

The only issue with use earliest offset is you May end up ingesting some data more than once.

When I say persisting to disc, are your ingestion tasks completing and performing segment handoff? Are you seeing segments loaded on your historical nodes as well as stored in deep storage?

Are you seeing entries in your postgres database for the segments?

What is the task duration of your supervisor specs?

I’m asking so many questions because it’s almost behaving as if your data is ingesting into memory on the peon, perhaps not ever persisting, your ingestion may start over again from the same offset, and then perhaps the offset is no longer there because Kafka no longer retains it after 7 days.

I doubt that is happening because you would see many task failures before getting to that point.

Please see answers inline:

The only issue with use earliest offset is you May end up ingesting some data more than once.

I’ll keep a note of this, and will get back to it once the main issue is resolved.

When I say persisting to disc, are your ingestion tasks completing and performing segment handoff? Are you seeing segments loaded on your historical nodes as well as stored in deep storage?

Yes, the ingestion tasks successfully completed in the beginning when I started indexing. After a few days, they start failing. This is a consistent behaviour: I reset supervisors multiple times.
Here is the view of segments loaded by historical nodes per ingestion spec:

I can see the segments in deep storage (S3) too.

Are you seeing entries in your postgres database for the segments?

Yes, I can see entries in the postgres db:

What is the task duration of your supervisor specs?

1 hour.

I’m asking so many questions because it’s almost behaving as if your data is ingesting into memory on the peon, perhaps not ever persisting, your ingestion may start over again from the same offset, and then perhaps the offset is no longer there because Kafka no longer retains it after 7 days.

This is plausible, but I’m not sure how to verify that that is what is happening. Please ask as many questions as you like as I really want to solve this issue.

Thanks for answering the questions. I think we are at the point where we need to look at your indexing logs and middlemanager logs for errors.

Thanks for helping. I’ve attached the log of one of the failed indexing tasks. I didn’t see anything suspicious in it.

The middlemanager log has lines like:

2020-08-12T22:17:23,164 INFO [WorkerTaskManager-NoticeHandler] org.apache.druid.indexing.worker.WorkerTaskManager - Task [index_kafka_dev.serving.encode_6ac3f80a2ad0351_clgknpaf] completed with status [FAILED].

But nothing else that I think is relevant.

One more thing: in the Druid UI, when I click the magnify icon for a supervisor, I get a pop-up on which I can see “Status”, “Statistics”, “Payload” etc. In the “Status”, I see the stanza pasted at the end of this email. The suspicious part IMO is that the “startingOffsets” and “latestOffsets” is set to 7229 for partition 0. Yet, the error is for sequence number 6357. Why is Druid trying to access a sequence number older than the starting/latest offset?

task.log (50.9 KB)

Are you using the Kafka Indexing Service?

What version of Druid are you running?

I will be honest and say I’m not sure so far what is going on with your issue.

No worries. Thanks for trying so far.

What version of Druid are you running?
0.18.1

Are you using the Kafka Indexing Service?
I’m using the AWS hosted Kafka service called AWS MSK. When creating the ingestion spec in Druid, I click ‘Apache Kafka’, and I then specify the Kafka broker addresses as returned by AWS MSK.

/FM