Index kafka task fails with InterruptedException

Hello,
I am getting occasional index kafka ingestion failure with the following logs

2022-02-28T17:47:33,430 INFO [Thread-96] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip].
2022-02-28T17:47:33,430 INFO [Thread-96] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping forcefully (status: [READING])
2022-02-28T17:47:33,433 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception in run() before persisting.
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:126) ~[?:?]
	at org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:99) ~[?:?]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:604) [druid-indexing-service-0.21.1.jar:0.21.1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:268) [druid-indexing-service-0.21.1.jar:0.21.1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146) [druid-indexing-service-0.21.1.jar:0.21.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:451) [druid-indexing-service-0.21.1.jar:0.21.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:423) [druid-indexing-service-0.21.1.jar:0.21.1]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.lang.InterruptedException
	... 17 more
2022-02-28T17:47:33,465 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
2022-02-28T17:47:33,466 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022-02-28T17:47:33,467 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
2022-02-28T17:47:33,467 INFO [Thread-96] org.apache.druid.security.basic.authorization.db.cache.CoordinatorPollingBasicAuthorizerCacheManager - CoordinatorPollingBasicAuthorizerCacheManager is stopping.
2022-02-28T17:47:33,468 INFO [Thread-96] org.apache.druid.security.basic.authorization.db.cache.CoordinatorPollingBasicAuthorizerCacheManager - CoordinatorPollingBasicAuthorizerCacheManager is stopped.
2022-02-28T17:47:33,468 INFO [Thread-96] org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager - CoordinatorPollingBasicAuthenticatorCacheManager is stopping.
2022-02-28T17:47:33,468 INFO [Thread-96] org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager - CoordinatorPollingBasicAuthenticatorCacheManager is stopped.
2022-02-28T17:47:33,468 INFO [LookupExtractorFactoryContainerProvider-MainThread] org.apache.druid.query.lookup.LookupReferencesManager - Lookup Management loop exited. Lookup notices are not handled anymore.
2022-02-28T17:47:33,471 INFO [Curator-Framework-0] org.apache.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting
2022-02-28T17:47:33,475 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-kafka-supervisor-lmbdeoeg-1 unregistered
2022-02-28T17:47:33,478 INFO [Thread-96] org.apache.zookeeper.ZooKeeper - Session: 0x20c78557c6c000e closed
2022-02-28T17:47:33,478 INFO [main-EventThread] org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x20c78557c6c000e
2022-02-28T17:47:33,482 INFO [task-runner-0-priority-0] org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing segment[imp_kafka_2022-02-28T17:00:00.000Z_2022-02-28T18:00:00.000Z_2022-02-28T17:02:57.270Z_1] at path[/druiduat/segments/dod0045.atl1.turn.com:8102/dod0045.atl1.turn.com:8102_indexer-executor__default_tier_2022-02-28T17:18:29.819Z_af501e4e925d45769af3475bb264c1400]
2022-02-28T17:47:33,482 INFO [task-runner-0-priority-0] org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing segment[imp_kafka_2022-02-28T16:00:00.000Z_2022-02-28T17:00:00.000Z_2022-02-28T16:00:32.175Z_1] at path[/druiduat/segments/dod0045.atl1.turn.com:8102/dod0045.atl1.turn.com:8102_indexer-executor__default_tier_2022-02-28T17:18:29.819Z_af501e4e925d45769af3475bb264c1400]
2022-02-28T17:47:33,519 ERROR [[index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Incremental persist failed: {class=org.apache.druid.segment.realtime.appenderator.AppenderatorImpl, segment=imp_kafka_2022-02-28T17:00:00.000Z_2022-02-28T18:00:00.000Z_2022-02-28T17:02:57.270Z_1, dataSource=imp_kafka, count=2}
2022-02-28T17:47:33,522 INFO [task-runner-0-priority-0] org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannounced self [{"druidNode":{"service":"druid/middleManager","host":"dod0045.atl1.turn.com","bindOnHost":false,"plaintextPort":8102,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","priority":0},"lookupNodeService":{"type":"lookupNodeService","lookupTier":"__null"}}}].
2022-02-28T17:47:33,522 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception while running task.
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:126) ~[?:?]
	at org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:99) ~[?:?]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:604) ~[druid-indexing-service-0.21.1.jar:0.21.1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:268) [druid-indexing-service-0.21.1.jar:0.21.1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146) [druid-indexing-service-0.21.1.jar:0.21.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:451) [druid-indexing-service-0.21.1.jar:0.21.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:423) [druid-indexing-service-0.21.1.jar:0.21.1]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
	Suppressed: java.lang.InterruptedException
		at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) ~[?:1.8.0_121]
		at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:1.8.0_121]
		at org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver.persist(StreamAppenderatorDriver.java:231) ~[druid-server-0.21.1.jar:0.21.1]
		at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:756) ~[druid-indexing-service-0.21.1.jar:0.21.1]
		at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:268) [druid-indexing-service-0.21.1.jar:0.21.1]
		at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146) [druid-indexing-service-0.21.1.jar:0.21.1]
		at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:451) [druid-indexing-service-0.21.1.jar:0.21.1]
		at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:423) [druid-indexing-service-0.21.1.jar:0.21.1]
		at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
		at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.lang.InterruptedException
	... 17 more
2022-02-28T17:47:33,540 INFO [task-runner-0-priority-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip",
  "status" : "FAILED",
  "duration" : 1812414,
  "errorMsg" : "org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException\n\tat org.apache.kaf...",
  "location" : {
    "host" : null,
    "port" : -1,
    "tlsPort" : -1
  }
}

Can someone please help with this error?

Thanks,
Ling Tu

Hi Ling,

It looks like the Overlord issued a kill command to this task. I’d investigate the Overlord logs and grep this task to check what the issue might be.

Thanks!

@Vijeth_Sagar,
thanks for the pointer on overlord. I’ve looked into the overlord log and grep the task. The job went from RUNNING state to shutdown “because: [No task in the corresponding pending completion taskGroup[0] succeeded before completion timeout elapsed]”
What could have caused this?

The is the log with task id.

2022-02-28T17:47:03,328 INFO [KafkaSupervisor-imp_kafka] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - {id='imp_kafka', generationTime=2022-02-28T17:47:03.328Z, payload=KafkaSupervisorReportPayload{dataSource='imp_kafka', topic='ILR_topic', partitions=1, replicas=1, durationSeconds=3600, active=[{id='index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip', startTime=2022-02-28T17:17:21.311Z, remainingSeconds=1817}], publishing=[{id='index_kafka_imp_kafka_e02d80f8e3905d7_ddmleefo', startTime=2022-02-28T16:17:12.140Z, remainingSeconds=9}], suspended=false, healthy=true, state=RUNNING, detailedState=RUNNING, recentErrors=[]}}
2022-02-28T17:47:33,358 INFO [KafkaSupervisor-imp_kafka] org.apache.druid.indexing.overlord.RemoteTaskRunner - Shutdown [index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip] because: [No task in the corresponding pending completion taskGroup[0] succeeded before completion timeout elapsed]
2022-02-28T17:47:33,362 INFO [KafkaSupervisor-imp_kafka] org.apache.druid.indexing.overlord.RemoteTaskRunner - Sent shutdown message to worker: dod0045.atl1.turn.com:8091, status 200 OK, response: {"task":"index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip"}
2022-02-28T17:47:33,362 INFO [KafkaSupervisor-imp_kafka] org.apache.druid.indexing.overlord.TaskLockbox - Removing task[index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip] from activeTasks
2022-02-28T17:47:33,362 INFO [KafkaSupervisor-imp_kafka] org.apache.druid.indexing.overlord.TaskLockbox - Removing task[index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip] from TaskLock[TimeChunkLock{type=EXCLUSIVE, groupId='index_kafka_imp_kafka', dataSource='imp_kafka', interval=2022-02-28T16:00:00.000Z/2022-02-28T17:00:00.000Z, version='2022-02-28T16:18:23.771Z', priority=75, revoked=false}]
2022-02-28T17:47:33,363 INFO [KafkaSupervisor-imp_kafka] org.apache.druid.indexing.overlord.TaskLockbox - Removing task[index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip] from TaskLock[TimeChunkLock{type=EXCLUSIVE, groupId='index_kafka_imp_kafka', dataSource='imp_kafka', interval=2022-02-28T17:00:00.000Z/2022-02-28T18:00:00.000Z, version='2022-02-28T17:02:57.270Z', priority=75, revoked=false}]
2022-02-28T17:47:33,364 INFO [KafkaSupervisor-imp_kafka] org.apache.druid.indexing.overlord.MetadataTaskStorage - Updating task index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip to status: TaskStatus{id=index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip, status=FAILED, duration=-1, errorMsg=null}
2022-02-28T17:47:33,366 INFO [KafkaSupervisor-imp_kafka] org.apache.druid.indexing.overlord.TaskQueue - Task done: AbstractTask{id='index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip', groupId='index_kafka_imp_kafka', taskResource=TaskResource{availabilityGroup='index_kafka_imp_kafka_32fd87f120005b1', requiredCapacity=1}, dataSource='imp_kafka', context={forceTimeChunkLock=true, checkpoints={"0":{"0":847367134}}, IS_INCREMENTAL_HANDOFF_SUPPORTED=true}}
2022-02-28T17:47:33,370 INFO [TaskQueue-Manager] org.apache.druid.indexing.overlord.RemoteTaskRunner - Shutdown [index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip] because: [task is not in knownTaskIds[[index_hadoop_dsp_media_and_bids_s_lniabmmg_2022-02-28T17:02:37.850Z, index_hadoop_dsp_media_and_bids_hndikijj_2022-02-28T16:40:36.483Z, index_hadoop_dsp_audience_hkfghfcj_2022-02-28T17:11:00.687Z]]]
2022-02-28T17:47:33,374 INFO [TaskQueue-Manager] org.apache.druid.indexing.overlord.RemoteTaskRunner - Sent shutdown message to worker: dod0045.atl1.turn.com:8091, status 200 OK, response: {"task":"index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip"}
2022-02-28T17:47:33,380 INFO [TaskQueue-Manager] org.apache.druid.indexing.overlord.RemoteTaskRunner - Shutdown [index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip] because: [task is not in knownTaskIds[[index_hadoop_dsp_media_and_bids_s_lniabmmg_2022-02-28T17:02:37.850Z, index_hadoop_dsp_media_and_bids_hndikijj_2022-02-28T16:40:36.483Z, index_hadoop_dsp_audience_hkfghfcj_2022-02-28T17:11:00.687Z, index_kafka_imp_kafka_32fd87f120005b1_amfjmhoc]]]
2022-02-28T17:47:33,385 INFO [TaskQueue-Manager] org.apache.druid.indexing.overlord.RemoteTaskRunner - Sent shutdown message to worker: dod0045.atl1.turn.com:8091, status 200 OK, response: {"task":"index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip"}
2022-02-28T17:47:38,247 INFO [Curator-PathChildrenCache-5] org.apache.druid.indexing.overlord.RemoteTaskRunner - Worker[dod0045.atl1.turn.com:8091] wrote FAILED status for task [index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip] on [TaskLocation{host='dod0045.atl1.turn.com', port=8102, tlsPort=-1}]
2022-02-28T17:47:38,247 INFO [Curator-PathChildrenCache-5] org.apache.druid.indexing.overlord.RemoteTaskRunner - Worker[dod0045.atl1.turn.com:8091] completed task[index_kafka_imp_kafka_32fd87f120005b1_kbjcbaip] with status[FAILED]

Thanks,
Ling

Hi Ling,

Thanks for sending this information. What is your completionTimeout set to? We can test with increasing this to a larger duration. This error above is letting us know that some other task has failed and therefore all the tasks in that group are being killed.

You could try using ‘DEBUG’ logs and isolating the issue to figure out which task failed and why.

Also, are you expecting any future dated data? Do you set earlyMessageRejectionPeriod?

Thanks!

Hello Vijeth,
Thanks for the suggestion.
For completionTimeout, this was not set and default to PT30M. The taskDuration is also default toPT1H. I will try to extend completionTimeout to 1 hour and see if this improves.

We were not expecting future date data so earlyMessageRejectionPeriod is not set.

I will try to use ‘DEBUG’ logs in the overload to observe the error.

Thanks,
Ling

Hi Ling,

I wish I could be more helpful, but it is difficult to debug a situation without having access to the system and logs.

You’ll probably have to look through the individual task logs to identify and figure out what’s going on with the failed task. You could report back once you find it if you need help.

Thanks!

Hi @Ling_Tu did you find the root cause of this problem? We are something similar where coordinator is starting the ingestion tasks and killing those in a minute or two with “because: [task is not in knownTaskIds” error. Any pointers will be super helpful.

Thank you.