Getting OffsetOutOfRangeException Error while kafka topic ingestion

I am ingesting topic from Kafka to druid. After some hour of ingestion I am facing below error.

“WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=67817897, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 1 rack: null)], epoch=2}} is out of range for partition …”

Please help in overcome this error.

Welcome @pankaj_kumar1! Usually when you get OffsetOutOfRangeException, it means the offsets the task is trying to read are out of date. In turn that is either because you lost some data in Kafka, or the ingestion fell behind the Kafka retention period. So you need to either fix Kafka (if it’s the first issue) or reset the Druid Kafka supervisor (if it’s the second issue). Let us know how it goes.

Hi Mark! Data in kafka is fine. Ingestion runs well but after sometime “OffsetOutOfRangeException” this error comes. Currently we have 3 middlemanager and 3 historical and each middlemanager has 4 taskCount. We have 99 partition topic which has more than 50M data per hours. I think there is a lag in reading of data from Druid side.

And size of data is more than 2.5GB per hrs and retention period is 2hrs.

Hi pankaj_kumar1,

As @Mark_Herrera mentioned, Offset out of range exceptions mean that the record we are trying to read is no longer in Kafka because it exceeded its retention period; you need to reset the supervisor to clear this condition.

The issue is that your lag is getting out of hand. We need to reduce this lag to ensure this does not happen in the future.

The easiest way to do this is to increase taskCount.

I wanted to share a note from one of Imply’s awesome engineers. Looks like a potential great way to manage your ingestion. You should also check out the METRICS page in the docs, it references key metrics related to Kafka ingestion that may help you see when a problem is coming up.

-----from a Druid Engineer----
Assuming they have enough capacity in their middle managers, they could use the lag based auto-scaler instead of manually setting the number of tasks

1 Like

I would add that for the autoscaler to work, you still need the available task slots for the ingestion you are attempting. As the autoscaler increases the task count, there must be enough worker slots available in the MMs to accommodate them.

The 99 Kafka partitions will be spread across the ingestion tasks that you have. How many tasks are you currently using for the ingestion? You may need more MMs to achieve this or more powerful MMs with more task slots.

taskCount=20 and peon=10 and 3 MM.

You mentioned that each MM has 4 worker slots, so that’s 12 total. A job that uses 20 tasks, will need 20 worker slots. So this is the likely reason for your problem. You could try to use 12 tasks, but that still may not be enough and you may need to add MMs or increase their CPU and memory resources to accommodate more worker tasks.

Also, not sure if this is what you meant, but if your ingestion duration is set to 2 hours and there are only 2 hours of retention in kafka, any delay could cause this issue, you’ll want to have more retention in the kafka topic to deal with this, or reduce the ingestion period such that it outputs segments more often (i.e. 1 hour, or 30 min).

After taking taskCount=20 and peon=10 and 3 MM task are running well.

1 Like