Tasks - "errorMsg": "Canceled for worker cleanup"

Hi.
We have Druid deployed on AWS EKS from Druid Operator, with 6 MiddleManagers, each with 2 workers.
Some tasks are going to the FAILED state, saying:

“errorMsg”: “Canceled for worker cleanup. See overlord logs for more details.”

I can’t see any related errors in the Coordinator logs (it also plays as Overlord).
Google returns only one result - to the Druid’s source code, RemoteTaskRunner.java.

Any ideas about what it means?
Memory issues? Too low? CPU? Something else?

Thanks in advance

Welcome @setevoy! Did the job complete? Are you able to share the coordinator-overlord logs so we can all take a look?

Hi, @Mark_Herrera !
Not sure about “did it complete” if its status is Failed?
From this Datasource and Supervisor, I see most of Success, 4 with Failed status, one is in the Running.

Regarding logs, I’ve found some records, and now seems I know, what happened:

2022-11-02T14:37:54,947 INFO [IndexTaskClient-###Session-0] org.apache.druid.indexing.common.IndexTaskClient - submitRequest failed for [http://10.0.112.122:8100/druid/worker/v1/chat/index_kin
esis
###_Session_502361959647783_mnjkkknh/offsets/current], with message [No route to host (Host unreachable)]

No route to host can mean, that K8s Pod was killed, and the Task wasn’t able to connect (to - where?)
Can’t post logs, unfortunately, as have to edit too much to be able to add them here.

Am I correctly understanding, that the Task was running on a Pod with IP 10.0.112.122, and at some moment, this Pod was terminated, so Coordinator put the record No route to host?
If so, the error message “canceled for worker cleanup” a bit misleading.
And if so, then such Failed tasks are OK? As Pods are created and terminated by HorizontalPodAutoscaler.

Hi @setevoy ,
That could be a reason. “No route to host” would certainly indicate that.

Another reason might be that the individual task failed and so, when it terminates, it stops listening for http requests on that port and this would also happen. So it doesn’t need to be the whole pod that fails, it might just be the task. Do you have the log for that task? That would be interesting to inspect for root cause.

Good morning, all)
Thanks for the replies.

First, as we are going a bit deeper into the issue, let me describe more about the setup, which I’m trying to implement.

So, we have an AWS Kinesis Stream with 1-5 shards.
Druid is running in EKS with 6 MiddleManagers - one by each Kinesis Stream, that’s the idea.
Each MM has 2 druid.worker.capacity=2, and -Xms1G -Xmx4G -XX:MaxDirectMemorySize=1G JVM’s parameters (see more configs below).
The idea is to have 1 MM per Stream with two concurrent Tasks - one for Ingestion, and one for Publishing (and what they are doing else there).

*I’ve read a lot of docs about the Ingestion process, but still not sure I’m understanding all the steps and operations performed during Ingest/Publish. *
Also, still not sure about the Peons processes - there are present on the Services page, but are they used?

Next… So, a Task is RUNNING for 1 hour, but sometimes it changes state to the FAILED state.
During this, here is what I’ve found in the logs for one such a Task:

2022-11-02T15:47:23,523 INFO [IndexTaskClient-staging_re_rollup_bot_Api-0] org.apache.druid.indexing.common.IndexTaskClient - submitRequest failed for [http://10.0.124.199:8101/druid/worker/v1/chat/index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo/offsets/current], with message [No route to host (Host unreachable)]

2022-11-02T15:47:31,460 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.RemoteTaskRunner - Running scheduled cleanup for Worker[10.0.124.199:8080]

2022-11-02T15:47:31,462 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.RemoteTaskRunner - Failing task[index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo]
2022-11-02T15:47:31,462 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.TaskQueue - Received FAILED status for task: index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoi
go
2022-11-02T15:47:31,462 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.RemoteTaskRunner - Shutdown [index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo] because: [no
tified status change from task]
2022-11-02T15:47:31,462 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.RemoteTaskRunner - Can’t shutdown! No worker running task index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96
f9bd48_khojoigo
2022-11-02T15:47:31,462 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.TaskLockbox - Removing task[index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo] from activeTa
sks
2022-11-02T15:47:31,462 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.TaskLockbox - Removing task[index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo] from TaskLock
[TimeChunkLock{type=EXCLUSIVE, groupId=‘index_kinesis_staging_re_rollup_bot_Api’, dataSource=‘staging_re_rollup_bot_Api’, interval=2022-11-01T00:00:00.000Z/2022-12-01T00:00:00.000Z, version=‘2022-11-02T13:25:54.
529Z’, priority=75, revoked=false}]
2022-11-02T15:47:31,462 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.TaskLockbox - TaskLock is now empty: TimeChunkLock{type=EXCLUSIVE, groupId=‘index_kinesis_staging_re_rollup
_bot_Api’, dataSource=‘staging_re_rollup_bot_Api’, interval=2022-11-01T00:00:00.000Z/2022-12-01T00:00:00.000Z, version=‘2022-11-02T13:25:54.529Z’, priority=75, revoked=false}
2022-11-02T15:47:31,464 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.MetadataTaskStorage - Deleting TaskLock with id[17876]: TimeChunkLock{type=EXCLUSIVE, groupId=‘index_kinesi
s_staging_re_rollup_bot_Api’, dataSource=‘staging_re_rollup_bot_Api’, interval=2022-11-01T00:00:00.000Z/2022-12-01T00:00:00.000Z, version=‘2022-11-02T13:25:54.529Z’, priority=75, revoked=false}
2022-11-02T15:47:31,468 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.MetadataTaskStorage - Updating task index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo to sta
tus: TaskStatus{id=index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo, status=FAILED, duration=-1, errorMsg=Canceled for worker cleanup. See overlord logs for more details.}
2022-11-02T15:47:31,470 INFO [KinesisSupervisor-staging_re_rollup_provider_Telephony_text] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - [staging_re_rollup_provider_Telephony_tex
t] supervisor is running.
2022-11-02T15:47:31,470 INFO [KinesisSupervisor-staging_re_rollup_provider_Telephony_text] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - {id=‘staging_re_rollup_provider_Telephony
text’, generationTime=2022-11-02T15:47:31.470Z, payload=KinesisSupervisorReportPayload{dataSource=‘staging_re_rollup_provider_Telephony_text’, stream=‘staging_re_provider_Telephony_text’, partitions=1, replicas
=1, durationSeconds=3600, active=[{id=‘index_kinesis_staging_re_rollup_provider_Telephony_text_1b477c8d4b27997_dmoecdbe’, startTime=2022-11-02T15:26:05.910Z, remainingSeconds=2314}], publishing=[], suspended=fal
se, healthy=true, state=RUNNING, detailedState=RUNNING, recentErrors=[]}}
2022-11-02T15:47:31,471 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.TaskQueue - Task done: AbstractTask{id=‘index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo’,
groupId=‘index_kinesis_staging_re_rollup_bot_Api’, taskResource=TaskResource{availabilityGroup=‘index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48’, requiredCapacity=1}, dataSource=‘staging_re_rollup_bot_Ap
i’, context={forceTimeChunkLock=true, checkpoints={“0”:{“shardId-000000000006”:“49634530849357554903334591049073919772814899022653292642”}}, useLineageBasedSegmentAllocation=true}}
2022-11-02T15:47:31,476 INFO [RemoteTaskRunner-Scheduled-Cleanup–0] org.apache.druid.indexing.overlord.TaskQueue - Task FAILED: AbstractTask{id=‘index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48_khojoigo’
, groupId=‘index_kinesis_staging_re_rollup_bot_Api’, taskResource=TaskResource{availabilityGroup=‘index_kinesis_staging_re_rollup_bot_Api_8b5c4cf96f9bd48’, requiredCapacity=1}, dataSource='staging_re_rollup_bot

Api’, context={forceTimeChunkLock=true, checkpoints={“0”:{“shardId-000000000006”:“49634530849357554903334591049073919772814899022653292642”}}, useLineageBasedSegmentAllocation=true}} (-1 run duration)

Also, I had a HorisontalPodAutoscaler, and that was more or less fine on QA env, but got many more FAILED tasks on Staging (that’s when I came here, to the Forum).

I suspect that during performing a Task, a JVM in the Pod was killed/restarted, although can’t find a way to check it with any logs or Druid Metrics (we have Prometheus/Grafana for observability).
Once a JVM process is (?) killed/restarted - a Task writes to the log message about “No route to host” and goes to the FAILED state.

Also, had a suspicion that related Kubernetes Pod is moved between EKS WokerNodes, but that not seems to be true as they all have the same AGE when they were created.

Here is the latest MM’s config applied (it was changed a bit since yesterday, will see in an hour or two if there is any effect):

    middlemanagers:
      druid.port: 8080
      extra.jvm.options: |-
        -Xmx4G
        -Xms1G
      kind: StatefulSet
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/data/middleManager
      nodeType: middleManager
      replicas: 6
      resources:
        limits:
          cpu: 2000m
          memory: 6Gi
        requests:
          cpu: 2000m
          memory: 6Gi
      runtime.properties: |
        druid.service=druid/middleManager
        druid.worker.capacity=2
        druid.indexer.runner.javaOpts=-server -Xms1G -Xmx4G -XX:MaxDirectMemorySize=1G -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
        druid.indexer.task.baseTaskDir=var/druid/task
        # HTTP server threads
        druid.server.http.numThreads=60
        # Processing threads and buffers on Peons
        druid.indexer.fork.property.druid.processing.numMergeBuffers=2
        druid.indexer.fork.property.druid.processing.buffer.sizeBytes=200MiB
        druid.indexer.fork.property.druid.processing.numThreads=3

A few things I notice:
You don’t need to think of streaming ingestion as an MM per stream. Streaming ingestion can be parallelized using multiple tasks (running on peons) across multiple MMs. A few points to think about:

  • MMs providing worker slots
  • Each worker slot is another JVM called a Peon, the druid.indexer.runner.javaOpts[Array] specify their JVM settings
  • MM JVM coordinates work of its local Peons
  • Each MM will consume a CPU for its coordination work and one CPU per worker druid.worker.capacity

In your current setup you are overtaxing the resources:

  • MM pods have 2 CPUS, you’ll need 3 if you are using 2 workers. CPU limit should be druid.worker.capacity + 1
  • total memory needed in the pod will be druid.worker.capacity * (peon Heap + peon direct memory) + MM JVM Heap

In your current setup you are asking for 2 peons with 5GB total memory = 10GB which exceeds the total memory in the pod, this is likely causing the task failure when the pod runs out of memory.

Streaming ingestion needs will be determined by the throughput of each stream. Some streams with higher message rate, may need more ingestion tasks ioConfig.taskCount in the ingestion spec.

What instance types are you running on? That’ll determine the biggest pod you can deploy. It will likely be better (less MM overhead) if you have less MMs that are bigger, with more CPU, more memory, more worker capacity.

Hope this helps. Let us know how it goes.

1 Like

Many thanks, @Sergio_Ferragut!
Unfortunately, was out of electricity today all day. Will check it tomorrow morning, and will come back here with some results.

Ji, @Sergio_Ferragut

Answering your question “What instance types are you running on” - currently, we are using AWS EC2 t3.large - 4CPU, 16GB RAM

Also, correct me please if I’m wrong, but after reading the docs (again), here is my understanding for a possible configuration for 1 MiddleManager:

Let 1 MM run 2 Tasks:
druid.worker.capacity=2

CPU
Each Task would run 1 thread:
druid.indexer.fork.property.druid.processing.numThreads=1

Total CPU: I guess, 1 thread per Task will use 1 CPU core, so 2 Tasks will use 2 threads - am I right here?

RAM
Each Task would use 3 GB RAM for 1 Thread:

  • 1GB heap (why 1GB? Task heap sizing)
  • 1 GB for lookups (not sure how many our will use, let it be 1 GB.
  • 1 GB DirectMemory (1 numThreads + 2 numMergeBuffers + 1) * 200MB sizeBytes == 800MB)

Total RAM: 2 Tasks by 3GB each == 6GB RAM used - am I right here?

Total CPU/RAM per a Pod:

  • 3 CPU in total: 2 Tasks will use 2 CPU + 1 CPU for MM’s JVM
  • 6GB RAM in total: 2 Tasks will use 6GB RAM + 128MB for MM’s JVM (MiddleManager heap sizing)

Thus, a Pod’s config could be 4 CPU and 7 GB RAM - additionaly 1 CPU and 1GB RAM for MM’s 128MB and other OS-level processes)

Does it make any sense? :slight_smile:

If calculations about are more or less correct, then a t3.large instance with 4 CPU and 16GB RAM running 1 K8s Pod with 1 MiddleManager will have 8GB RAM excessive.

@Sergio_Ferragut @Mark_Herrera Sorry guys, any chance to assist with that?

Hey @setevoy, sorry for the delay.
I think your calculations are almost there. I think you could keep the instance type and add another worker druid.worker.capacity=3. You can also make use of more heap (it helps in processing faster avoiding more disk I/O). So you could set the heap for each task at 4GB with 1GB for direct memory, giving them each 5GB total and still have plenty left for the MM JVM.

Let us know how it goes.

1 Like