All Kafka ingestion failed

Hello community,

Currently I am working on a Druid cluster which run on K8S. This cluster has 1 master, 3 middlemanagers, 3 historicals, and there are about 40+ Supervisors steaming data from Kafka.
My problem is all Supervisors are in the UNHEALTHY_TASKS status, and every task will run in a few minuts then FAILED (Log Triggering JVM shutdown). When I suspend some Supervisors, the rest Supervisors will OK, and then I resume them, all task FAILED again.
I wander if the reason is there is not enough system resourses to support so many Supervisors. Am I right? And is there any suggest (turning or add resourses) to support my requirement( 40+ Supervisors)?

There are some Logs append if helpful.

Thank you.
Zhaobo

===========================================================================================
Overlord:
2022-02-08T12:19:38,497 INFO [Coordinator-Exec–0] org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics - Server[10.0.74.30:8083, historical, _default_tier] has 7 left to load, 0 left to drop, 9,118 bytes queued, 27,108 bytes served.
2022-02-08T12:19:38,617 INFO [KafkaSupervisor-iot_chain_130_a35-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-aibhadlk-331, groupId=kafka-supervisor-aibhadlk] Seeking to LATEST offset of partition iot_chain_130-0
2022-02-08T12:19:38,618 INFO [KafkaSupervisor-iot_chain_130_a35-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-aibhadlk-331, groupId=kafka-supervisor-aibhadlk] Resetting offset for partition iot_chain_130-0 to position FetchPosition{offset=54, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.0.74.23:59092 (id: 0 rack: null)], epoch=0}}.
2022-02-08T12:19:38,710 INFO [KafkaSupervisor-iot_chain_130_a48-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-mmlkjhkk-344, groupId=kafka-supervisor-mmlkjhkk] Seeking to LATEST offset of partition iot_chain_130-0
2022-02-08T12:19:38,711 INFO [KafkaSupervisor-iot_chain_130_a48-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-mmlkjhkk-344, groupId=kafka-supervisor-mmlkjhkk] Resetting offset for partition iot_chain_130-0 to position FetchPosition{offset=54, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.0.74.23:59092 (id: 0 rack: null)], epoch=0}}.
2022-02-08T12:19:38,744 WARN [IndexTaskClient-iot_chain_130_10-0] org.apache.druid.indexing.common.IndexTaskClient - Retries exhausted for [http://10.0.74.30:8104/druid/worker/v1/chat/index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd/time/start], last exception:
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_161]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_161]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_161]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_161]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_161]
at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_161]
at java.net.Socket.connect(Socket.java:538) ~[?:1.8.0_161]
at java.net.Socket.(Socket.java:434) ~[?:1.8.0_161]
at java.net.Socket.(Socket.java:211) ~[?:1.8.0_161]
at org.apache.druid.indexing.common.IndexTaskClient.checkConnection(IndexTaskClient.java:209) ~[druid-indexing-service-0.21.0.jar:0.21.0]
at org.apache.druid.indexing.common.IndexTaskClient.submitRequest(IndexTaskClient.java:348) ~[druid-indexing-service-0.21.0.jar:0.21.0]
at org.apache.druid.indexing.common.IndexTaskClient.submitRequestWithEmptyContent(IndexTaskClient.java:220) ~[druid-indexing-service-0.21.0.jar:0.21.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient.getStartTime(SeekableStreamIndexTaskClient.java:190) ~[druid-indexing-service-0.21.0.jar:0.21.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient.lambda$getStartTimeAsync$3(SeekableStreamIndexTaskClient.java:333) ~[druid-indexing-service-0.21.0.jar:0.21.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
2022-02-08T12:19:38,744 WARN [KafkaSupervisor-iot_chain_130_10] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Task [index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd] failed to return start time, killing task
2022-02-08T12:19:38,744 INFO [KafkaSupervisor-iot_chain_130_10] org.apache.druid.indexing.overlord.RemoteTaskRunner - Shutdown [index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd] because: [Task [index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd] failed to return start time, killing task]
2022-02-08T12:19:38,746 INFO [IndexTaskClient-iot_chain_130_10-0] org.apache.druid.indexing.common.IndexTaskClient - submitRequest failed for [http://10.0.74.30:8104/druid/worker/v1/chat/index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd/offsets/current], with message [Connection refused (Connection refused)]
2022-02-08T12:19:38,746 INFO [KafkaSupervisor-iot_chain_130_10-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-kjaldhak-253, groupId=kafka-supervisor-kjaldhak] Seeking to LATEST offset of partition iot_chain_130-0
2022-02-08T12:19:38,747 INFO [KafkaSupervisor-iot_chain_130_10] org.apache.druid.indexing.overlord.RemoteTaskRunner - Sent shutdown message to worker: 10.0.74.30:8091, status 200 OK, response: {“task”:“index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd”}
2022-02-08T12:19:38,747 INFO [KafkaSupervisor-iot_chain_130_10] org.apache.druid.indexing.overlord.TaskLockbox - Removing task[index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd] from activeTasks
2022-02-08T12:19:38,747 INFO [KafkaSupervisor-iot_chain_130_10-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-kjaldhak-253, groupId=kafka-supervisor-kjaldhak] Resetting offset for partition iot_chain_130-0 to position FetchPosition{offset=54, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.0.74.23:59092 (id: 0 rack: null)], epoch=0}}.
2022-02-08T12:19:38,748 INFO [KafkaSupervisor-iot_chain_130_10] org.apache.druid.indexing.overlord.MetadataTaskStorage - Updating task index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd to status: TaskStatus{id=index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd, status=FAILED, duration=-1, errorMsg=null}
2022-02-08T12:19:38,749 INFO [IndexTaskClient-iot_chain_130_10-0] org.apache.druid.indexing.common.IndexTaskClient - submitRequest failed for [http://10.0.74.30:8104/druid/worker/v1/chat/index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd/offsets/current], with message [Connection refused (Connection refused)]
2022-02-08T12:19:38,749 INFO [KafkaSupervisor-iot_chain_130_10-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-kjaldhak-253, groupId=kafka-supervisor-kjaldhak] Seeking to LATEST offset of partition iot_chain_130-0
2022-02-08T12:19:38,750 INFO [KafkaSupervisor-iot_chain_130_10-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-kafka-supervisor-kjaldhak-253, groupId=kafka-supervisor-kjaldhak] Resetting offset for partition iot_chain_130-0 to position FetchPosition{offset=54, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.0.74.23:59092 (id: 0 rack: null)], epoch=0}}.
2022-02-08T12:19:38,753 INFO [KafkaSupervisor-iot_chain_130_10] org.apache.druid.indexing.overlord.TaskQueue - Task done: AbstractTask{id=‘index_kafka_iot_chain_130_10_4da0d6f277d9d91_pnahjhnd’, groupId=‘index_kafka_iot_chain_130_10’, taskResource=TaskResource{availabilityGroup=‘index_kafka_iot_chain_130_10_4da0d6f277d9d91’, requiredCapacity=1}, dataSource=‘iot_chain_130_10’, context={forceTimeChunkLock=true, checkpoints={“0”:{“0”:54}}, IS_INCREMENTAL_HANDOFF_SUPPORTED=true}}
2022-02-08T12:19:38,753 INFO [TaskQueue-Manager] org.apache.druid.indexing.overlord.RemoteTaskRunner - Assigned a task[index_kafka_iot_chain_130_32_e9ac5b58d0eb055_hpjdbneh] that is already pending!
2022-02-08T12:19:38,753 INFO [TaskQueue-Manager] org.apache.druid.indexing.overlord.RemoteTaskRunner - Assigned a task[index_kafka_iot_chain_130_31_2c1d906e9d8e2bb_jegfianp] that is already pending!
2022-02-08T12:19:38,753 INFO [TaskQueue-Manager] org.apache.druid.indexing.overlord.RemoteTaskRunner - Assigned a task[index_kafka_iot_chain_130_34_e7850ba14213483_dnlhlaah] that is already pending!

====================================
Task:
2022-02-07T02:23:10,708 INFO [main] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Attempting to lock file[var/druid/task/index_kafka_iot_27_e9d9078ca07b323_dadjljdk/lock].
2022-02-07T02:23:10,714 INFO [main] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Acquired lock file[var/druid/task/index_kafka_iot_27_e9d9078ca07b323_dadjljdk/lock] in 6ms.
2022-02-07T02:23:10,715 INFO [parent-monitor-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Triggering JVM shutdown.
2022-02-07T02:23:10,722 INFO [task-runner-0-priority-0] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Running task: index_kafka_iot_27_e9d9078ca07b323_dadjljdk
2022-02-07T02:23:10,724 INFO [main] org.apache.druid.server.coordination.SegmentLoadDropHandler - Starting…
2022-02-07T02:23:10,724 INFO [main] org.apache.druid.server.coordination.SegmentLoadDropHandler - Started.
2022-02-07T02:23:10,724 INFO [main] org.apache.druid.server.coordination.ZkCoordinator - Starting zkCoordinator for server[10.244.1.199:8108]
2022-02-07T02:23:10,734 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Starting lifecycle [module] stage [SERVER]
2022-02-07T02:23:10,739 INFO [main] org.eclipse.jetty.server.Server - jetty-9.4.30.v20200611; built: 2020-06-11T12:34:51.929Z; git: 271836e4c1f4612f12b7bb13ef5a92a927634b0d; jvm 1.8.0_265-8u265-b01-0+deb9u1-b01
2022-02-07T02:23:10,772 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Starting with sequences: [SequenceMetadata{sequenceId=0, sequenceName=‘index_kafka_iot_27_e9d9078ca07b323_0’, assignments=[0], startOffsets={0=432119}, exclusiveStartPartitions=, endOffsets={0=9223372036854775807}, sentinel=false, checkpointed=false}]
2022-02-07T02:23:10,788 INFO [main] org.eclipse.jetty.server.session - DefaultSessionIdManager workerName=node0
2022-02-07T02:23:10,788 INFO [main] org.eclipse.jetty.server.session - No SessionScavenger set, using defaults
2022-02-07T02:23:10,789 INFO [main] org.eclipse.jetty.server.session - node0 Scavenging every 600000ms
2022-02-07T02:23:10,875 INFO [main] com.sun.jersey.server.impl.application.WebApplicationImpl - Initiating Jersey application, version ‘Jersey: 1.19.3 10/24/2016 03:43 PM’
2022-02-07T02:23:11,386 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@61b8c9de{/,null,AVAILABLE}
2022-02-07T02:23:11,405 INFO [main] org.eclipse.jetty.server.AbstractConnector - Started ServerConnector@75c33608{HTTP/1.1, (http/1.1)}{0.0.0.0:8108}
2022-02-07T02:23:11,405 INFO [main] org.eclipse.jetty.server.Server - Started @514650ms
2022-02-07T02:23:11,405 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Starting lifecycle [module] stage [ANNOUNCEMENTS]
2022-02-07T02:23:11,405 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Successfully started lifecycle [module]
Error!
java.lang.IllegalStateException: Shutdown in progress
at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.druid.cli.CliPeon.run(CliPeon.java:294)
at org.apache.druid.cli.Main.main(Main.java:113)

It does sound like resource contention.
How many CPUs do you have per MiddleManager?
How many workers configured on each?
How many tasks are you using for each of those 40 supervisors?
You’ll need as many worker slots as tasks if you don’t want them to queue up.

There are other considerations for tuning. Here’s some more on that.

This capacity planning section of the docs should help you set this up for the capacity you need.

Thanks for your reply!
I have 5 virtual machines to deploy a dock environment, and my Druid cluster run on it. Every VM is 8 Core CPU and 64G RAM. So I think every MiddleManager has max 8 Core CPU?
Every Supervisor has one Task to steaming data from Kafka. So I think there should be about 40 tasks running in the Tasks list of Ingestion page. (But now every task can only run a few minuts and then failed)
I config 128 slots (druid.worker.capacity)for every MiddleManager, is it OK? Or it can NOT exceed the CPU Core count?
I read < Basic cluster tuning> and < Capacity Planning>, but can not found how to solve my problem.
Thanks again for your attention!

The default and recommended value for druid.worker.capacity is the number of available processors (vCPUs) minus 1 (to provide room for other overhead work on the middle manager).

In your case that would be 7, setting it to 128 “accommodates” the supervisor tasks you are asking it to run, but reality is that on a busy system, it will cause severe CPU contention. Since you get one Peon JVM per task, this implies that a new JVM is create for each task you are trying to run. :slight_smile: so, yes I’m pretty sure this is CPU, and likely memory, contention.

So if your total task count across all concurrent ingestions is 40, you will likely need a total of 6 middle managers of that size to address this. Another option may be to increase the CPU/Memory resources of the middle managers and adjust the druid.worker.capacity accordingly.

1 Like

Thank you! I will try to increase resource and will reply if there is any result.

Hi, I am back :frowning: . I have try to reduce the number of task and the system runs well for a few weeks. But now is go worse. I can not run even ONE task, a task can run a few minuts and FAILED, then the system will start a new one.
In the log, I notice that the task will halt at “org.apache.druid.cli.CliPeon - * user.timezone: UTC” for more than 10 minuts, and begin log about “org.apache.druid.guice.StorageNodeModule - Segment cache not configured on…”, but is seems that Overlord have deside to kill it and restart another one.
I wonder what was the task doing at that time, and I guess that is the key to solve my problem.

There are some Logs append if helpful.
Thank you again.
Zhaobo

================================================


2022-03-22T12:31:52,112 INFO [main] org.apache.druid.cli.CliPeon - * sun.boot.class.path: /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/rt.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/sunrsasign.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/jfr.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/classes
2022-03-22T12:31:52,112 INFO [main] org.apache.druid.cli.CliPeon - * sun.boot.library.path: /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64
2022-03-22T12:31:52,112 INFO [main] org.apache.druid.cli.CliPeon - * sun.cpu.endian: little
2022-03-22T12:31:52,113 INFO [main] org.apache.druid.cli.CliPeon - * sun.cpu.isalist:
2022-03-22T12:31:52,113 INFO [main] org.apache.druid.cli.CliPeon - * sun.io.unicode.encoding: UnicodeLittle
2022-03-22T12:31:52,113 INFO [main] org.apache.druid.cli.CliPeon - * sun.java.command: org.apache.druid.cli.Main internal peon var/druid/task/index_kafka_iot_18_c283a22aacd759c_hagehpld/task.json var/druid/task/index_kafka_iot_18_c283a22aacd759c_hagehpld/4fcd6c59-df2c-4d80-97aa-bb00551f2e2c/status.json var/druid/task/index_kafka_iot_18_c283a22aacd759c_hagehpld/4fcd6c59-df2c-4d80-97aa-bb00551f2e2c/report.json --loadBroadcastSegments true
2022-03-22T12:31:52,113 INFO [main] org.apache.druid.cli.CliPeon - * sun.java.launcher: SUN_STANDARD
2022-03-22T12:31:52,113 INFO [main] org.apache.druid.cli.CliPeon - * sun.jnu.encoding: UTF-8
2022-03-22T12:31:52,113 INFO [main] org.apache.druid.cli.CliPeon - * sun.management.compiler: HotSpot 64-Bit Tiered Compilers
2022-03-22T12:31:52,113 INFO [main] org.apache.druid.cli.CliPeon - * sun.os.patch.level: unknown
2022-03-22T12:31:52,113 INFO [main] org.apache.druid.cli.CliPeon - * user.dir: /opt/apache-druid-0.20.0
2022-03-22T12:31:52,114 INFO [main] org.apache.druid.cli.CliPeon - * user.home: /opt/druid
2022-03-22T12:31:52,114 INFO [main] org.apache.druid.cli.CliPeon - * user.language: en
2022-03-22T12:31:52,114 INFO [main] org.apache.druid.cli.CliPeon - * user.name: druid
2022-03-22T12:31:52,114 INFO [main] org.apache.druid.cli.CliPeon - * user.timezone: UTC
2022-03-22T12:47:23,279 INFO [main] org.apache.druid.guice.StorageNodeModule - Segment cache not configured on ServerType [indexer-executor]. It will not be assignable for segment placement
2022-03-22T12:47:23,345 INFO [main] org.eclipse.jetty.util.log - Logging initialized @937187ms to org.eclipse.jetty.util.log.Slf4jLog
2022-03-22T12:47:23,366 INFO [main] org.apache.druid.server.initialization.jetty.JettyServerModule - Creating http connector with port [8101]
2022-03-22T12:47:23,509 WARN [main] org.eclipse.jetty.server.handler.gzip.GzipHandler - minGzipSize of 0 is inefficient for short content, break even is size 23
2022-03-22T12:47:23,541 INFO [main] org.apache.druid.offheap.OffheapBufferGenerator - Allocating new intermediate processing buffer[0] of size[104,857,600]
2022-03-22T12:47:23,658 INFO [main] org.apache.druid.offheap.OffheapBufferGenerator - Allocating new result merging buffer[0] of size[104,857,600]
2022-03-22T12:47:23,725 INFO [main] org.apache.druid.offheap.OffheapBufferGenerator - Allocating new result merging buffer[1] of size[104,857,600]
2022-03-22T12:47:23,805 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Starting lifecycle [module] stage [INIT]
2022-03-22T12:47:23,805 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Starting lifecycle [module] stage [NORMAL]
2022-03-22T12:47:23,807 INFO [main] org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting

Hi Zhaobo,

From the looks of it, it sounds like the following parameter: druid.segmentCache.locations may not be set correctly or the folder indicated may have permissions issues. Please make sure the above is set to an accessible file location and try the ingestion again.

It’s done, You helped. Thank you!
I use a NAS as my deep storage, the storage type is “local”. And I found that the NAS folder may has some problem, I can not execute command like “ls” or “du” in druid storage directory, but other NAS folder is OK. I changed deep storage to another NFS folder and Druid system resumed.
Is there any incompatibility problem with Druid and NAS ?

1 Like

Hi zhaobao, we don’t generally recommend NAS as this adds latency to queries (unless your cluster is running entirely in memory). We prefer JBOD and then RAID: