Repeated KafkaIndexTask failures

We recently ran into an issue with repeated KafkaIndexTask failures with 0.16.0-incubating. Our datasource has 1 taskCount with 2 replicas. The ingestion has been very stable in general, until we hit this repeated failure issue.

After some in house debugging, we believe this is what happened.

  1. Task 1 and 2 have run more than taskDuration and were requested to do a checkpoint. Task 1 and 2 were moved to pendingCompletionTaskGroups.

  2. Task 1 and 2 are still working to publish segments to the deep storage and update metadata in the metadata database. For whatever reason, it took Task 1 and Task 2 a long time to publish segments and update metadata.

  3. Task 3 and 4 assumed that the above publish will succeed, and started reading from the endOffsets after a successful publish.

  4. Due to large amount of data in Kafka, Task 3 and Task 4 started to publish their new segments, but failed to update the metadata in the metadata database, since the startMetadata does not match with what we have in the metadata database. This makes sense since Task1 and Task2 are still publishing.

  5. Task 3 and 4 failed.

  6. Task 5 and 6 started reading from the same endOffsets as Task 3 and 4, and failed for the same reason.

  7. This repeats for a few rounds.

  8. Task 1 and 2 were finally finishing up, and succeeded.

  9. Task 11 and 12 have already started reading from the same endOffsets as Task 3 and 4. Multiple segments were being published at the same time. One of the segment failed with java.util.concurrent.ExecutionException: org.apache.druid.java.util.common.ISE: Failed to publish segments because of [java.lang.RuntimeException: Aborting transaction!]., while the segment with the lowest offset succeeded to update the metadata in the metadata database.

  10. Task 11 and 12 failed.

  11. From this point, new tasks repeatedly failed because overlord still thinks the startMetadata should be after Task 1 and 2 published their segments. It didn’t take into account that Task 11 and 12 have managed to update the metadata even though the tasks failed.

Does this make sense? The workaround is to suspend the datasource and resume it, which clears the in-memory states for the startMetadata.

I am wondering if overlord should mark the taskGroup to read from the metadata database when pendingCompletionTaskGroups tasks are succeeding, but when new tasks after that are failing. Any suggestions?

Thanks,

Joy

  1. Task 11 and 12 have already started reading from the same endOffsets as Task 3 and 4.

Task 11 and 12 should start from the endOffsets of Task 1 and 2. As those are the task that successfully finished. If its starting from Task 3 and 4 that means Task 1 and 2 have finished before Task 11,12 started and Task 3,4 is in publishing state. Otherwise 11 and 12 should (or would have) start from endOffsets of Task 1 and 2

The question to ask here is why Task 1 and 2 are taking a lot of time to publish the segments? Please check the deep storage and metadata is they are contributing to some slowness here.

Hi Joy,
In addition to what Muthu said, even though I don’t have concrete solution to the issue you are facing, I am thinking whether we can avoid this scenario by playing around with the following params in kafka ingestion spec

Hi Mushu,

Thanks for your reply!

Task 11 and 12 should start from the endOffsets of Task 1 and 2. As those are the task that successfully finished. If its starting from Task 3 and 4 that means Task 1 and 2 have finished before Task 11,12 started and Task 3,4 is in publishing state. Otherwise 11 and 12 should (or would have) start from endOffsets of Task 1 and 2

The question to ask here is why Task 1 and 2 are taking a lot of time to publish the segments? Please check the deep storage and metadata is they are contributing to some slowness here.

You are correct, Task 11 and 12 start from the endOffsets of Task 1 and 2. Sorry that I made that confusing.

It is a good question why Task 1 and 2 are taking a lot of time to publish segments. However, should we still consider the case when publishing might take some time? I agree that this is a corner case, but when we hit it, it took us quite some time to figure out what is the problem, and the ingestion basically stopped.

We would like to fix it if possible, and would like to hear your advice. Is it reasonable to let overlord should mark the taskGroup to read from the metadata database when pendingCompletionTaskGroups tasks are succeeding, but when new tasks after that are failing?

Thanks,

Joy

Hi Siva,

Thanks for your reply!

In addition to what Muthu said, even though I don’t have concrete solution to the issue you are facing, I am thinking whether we can avoid this scenario by playing around with the following params in kafka ingestion spec
++++
maxRowsPerSegment – default 5 million – do we need to reduce this in your env to speed up segment publishing? You will get smaller segments and you might have to do compaction later > to get optimal segment size
taskDuration – default 1 hour - can we reduce this to 30 minutes in your env?
completionTimeout – default 30 minutes – do we need to increase this to 60 minutes in your env?
++++
As I said, even though I don’t have a concrete solution, is it possible for you to experiment a bit by tweaking values?
Of course, this is assuming your deep storage(hope s3 is not facing any slowness issues), mysql metastore(hope you have dedicated mysql for your metastore), coordinators(hope no leadership confusion among coordinators) and historicals(hope they are not too busy balancing segments or merging segments etc).

We are in general very happy about our current ingestion spec. We only hit this issue twice, in about 1 year of deployment. I will look more on potential issues you mentioned above. One thing I noticed is that some persisting seemed to take a long time:

2019-11-01T20:06:29,877 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver - Persisted pending data in 133,860ms.

2019-11-01T20:10:08,940 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver - Persisted pending data in 200,562ms.

Thanks,

Joy

Hi Joy,

Sorry Im not understanding this question / suggestion

“Is it reasonable to let overlord should mark the taskGroup to read from the metadata database when pendingCompletionTaskGroups tasks are succeeding, but when new tasks after that are failing?”

Are you saying that, in your scenario, the tasks that are started after the failure of 3 and 4, should read only from the offsets that are stored in the metadata. In other words they (tasks started after 3 and 4 failure) should read only from offsets that for sure have been published and written to the metadata.

Hi Muthu,

“Is it reasonable to let overlord should mark the taskGroup to read from the metadata database when pendingCompletionTaskGroups tasks are succeeding, but when new tasks after that are failing?”
Are you saying that, in your scenario, the tasks that are started after the failure of 3 and 4, should read only from the offsets that are stored in the metadata. In other words they (tasks started after 3 and 4 failure) should read only from offsets that for sure have been published and written to the metadata.

No quite. All tasks from 3 to 10 will still start from the offsets of Task 1 and 2, since Task 1 and 2 have not succeeded – Task 1 and 2 are still in pendingCompletionTaskGroups. If Task 1 and 2 succeeded, and some tasks started after Task 1 and 2 failed, the overlord should start the new tasks by reading the offsets stored in the metadata database. Is this possible?

Thanks,

Joy

Hi Joy,

The overall behavior of the system is as follows. Supervisor starts the relevant tasks (1,2 in your example) and asks it to start consuming data. As soon as the taskDuration or maxRowsPerSegment is reached the Supervisor will ask the task to stop consuming and start publishing (at this state nothing is written to the metadata). Not only that Supervisor will ask the task give me your end offset (again this end offset is not in metadata databse). Now supervisor will start another set of tasks providing the endOffset it got from the first set of task.

What you are asking is, is it possible to have the second set of tasks to get the offset info from the database rather than the one handed over by the supervisor. Not right away may be after detecting some failure behavior. If thats what you are asking. Its not possible. It used to be that way, dont remember the version. But its changed to the way I explained above.

Hope this help.

Regards,

Muthu Lalapet.

H Muthu,

The overall behavior of the system is as follows. Supervisor starts the relevant tasks (1,2 in your example) and asks it to start consuming data. As soon as the taskDuration or maxRowsPerSegment is reached the Supervisor will ask the task to stop consuming and start publishing (at this state nothing is written to the metadata). Not only that Supervisor will ask the task give me your end offset (again this end offset is not in metadata databse). Now supervisor will start another set of tasks providing the endOffset it got from the first set of task.

Okay, sounds good.

What you are asking is, is it possible to have the second set of tasks to get the offset info from the database rather than the one handed over by the supervisor. Not right away may be after detecting some failure behavior. If thats what you are asking. Its not possible. It used to be that way, dont remember the version. But its changed to the way I explained above.

In some failure scenarios, supervisor sets the taskGroup to getNotSetMarker() in partitionGroups. The new task will then be started with the offsets in the database. Is it not possible to do the same for this case? Is getNotSetMarker() the old way of getting the endOffset?

Thanks,

Joy

Hi Joy,

I dont know to the API method level details. But overall there is no way to go back to the previous behavior. There is no property to do so as well.

Regards,

Muthu Lalapet.

From the comments here SeekableStreamsSupervisor.java:460, it looks like it is possible to read the offsets from the database: