KafkaSupervisor killed a task too early

I'm investigating some failed Kafka ingestion tasks right now, digging
into the KafkaSupervisor code. (Note that we are running an unreleased
version of Druid, specifically from commit dabaf4caf.)

Our taskDuration is one hour and our completionTimeout is the default
30 minutes.

What I'm seeing is that under some circumstances, KafkaSupervisor in
overlord logs the error:

org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor - No task
in [[index_kafka_query_stats_e725e3bf99e0e53_d]] for taskGroup [0]
succeeded before the completion timeout elapsed [PT1800S]!:
{class=org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor}

and kills the task... only half an hour after it started! Not 1.5
hours! ie, it looks like it somehow immediately declared the task to
be completed as soon as it started for some reason.

(Looking at the task's logs itself, it's still chugging away doing
normal ingestion, not switched to publishing, not even incremental
handoff as far as I can tell by poking at logs.)

Has anyone else seen this?

Am currently auditing KafkaSupervisor.java to figure out if maybe
there's a race condition that can get a new task accidentally stuck
into a TaskGroup in pendingCompletionTaskGroups or something.

--dave

Hi David,

the completionTimeout is checked after taskDuration. Here is the quote from Druid’s doc (http://druid.io/docs/latest/development/extensions-core/kafka-ingestion#kafkasupervisorioconfig).

The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after taskDuration elapses.

So, I guess it’s the normal behavior.

(Looking at the task’s logs itself, it’s still chugging away doing normal ingestion, not switched to publishing, not even incremental handoff as far as I can tell by poking at logs.)

What was your maxRowsPerSegment and handoffConditionTimeout? Both configurations would trigger incremental handoff.

Jihoon

I think I see the problem. It's:

* Monitors [pendingCompletionTaskGroups] for tasks that have
completed. If any task in a task group has completed, we
* can safely stop the rest of the tasks in that group. If a task group
has exceeded its publishing timeout, then
* we need to stop all tasks in not only that task group but also 1)
any subsequent task group that is also pending
* completion and 2) the current task group that is running, because
the assumption that we have handled up to the
* starting offset for subsequent task groups is no longer valid, and
subsequent tasks would fail as soon as they
* attempted to publish because of the contiguous range consistency check.

Specifically the (2) there. This was a cascading failure from another
job that actually was killed after the previous 1.5 hours. So not a
bug in Druid, just a completionTimeout that's too low for the
**PREVIOUS** task.

--dave

One note — these issues would be easier to debug if Druid saved end
times for tasks and displayed them in the overlord console.

Aha! The bad tasks that are overrunning their completionTimeout are
blocking on segment handoff, and specifically of segment handoff for
an interval that's a couple weeks in the future! (We don't set an
earlyMessageRejectionPeriod.)

Could this be something where if we don't have an
earlyMessageRejectionPeriod on the Kafka ingester, but the coordinator
isn't configured to be happy with early segments (I don't understand
much about coordinator config yet), things could break?

Aha, looks like



Oh yes, future data can block kafkaIndexTasks to complete.
https://github.com/apache/incubator-druid/pull/6415 and https://github.com/apache/incubator-druid/pull/6414 will be included in 0.13.1 which allows future data in load rules.

Jihoon

Thanks! There's a good workaround documented in #5869 too.

Now I just need to understand why this eventually *did* fix itself —
the future data was from a date that still hasn't happened (in
November), so I wonder why retrying the same offsets didn't end up
with the same error...

Hmm and now I'm more confused. My cluster's default retention rules are:

- load P3M: 2 in _default_tier
- drop P6M

and there is no data-source specific rules. This seems to be basically
the situation from the workaround documented in #5869: this should
have loaded the future segment and succeeded in handoff, right?

--dave

Oh, I think I'm misinterpreting how default rules work. It's not that
Druid itself has a default "loadForever" that comes after the
datasource rules and the cluster rules. It's that the cluster has a
default rule chain, and in my cluster it got edited to the above. And
I guess if no rules match, then it defaults to drop?

Hi David,

yes, I think you’re correct. The default rule describes what it does by itself and no loadForever rule comes after it.

And, if no rules match with the interval of a given segment, Druid does nothing but leaves a log about how many segments are not matched with any rules.

Jihoon