Druid Kafka Ingestion Failure

HI Team,

Can I get help to understand why Kafka indexing task fails with below error. Supervisor is running, but indexing fails. From logs I can see that Druid is able to read data from Kafka

2020-01-27T12:47:30,581 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - New segment[data_dummy_2019-10-25T00:00:00.000Z_2019-10-26T00:00:00.000Z_2020-01-27T11:57:01.987Z] for row[MapBasedInputRow{timestamp=2019-10-25T02:57:32.000Z, event={comment=Financial poor investment free star choose will environment., end_date=2019-11-26 17:15:10, ip=1.221.89.86, company=Newton, Richard and Harris, lattittude=47.172694, index=9, lpIDHash=2fce5a01-6993-4dea-861d-a8e3f0b19eaa, url=https://nash-dodson.com/, country=MK, zipcode=50752, longitude=23.502110, location=89772 David Mill Apt. 491, created_date=2019-10-25 02:57:32, user_name=Sonia Ford, email=emailid}, dimensions=[comment, end_date, ip, company, lattittude, index, lpIDHash, url, country, zipcode, longitude, location, created_date, user_name, email]}] sequenceName[index_kafka_data_dummy_5e7a3d8edc595b6_0].

2020-01-27T12:47:30,649 INFO [task-runner-0-priority-0] org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[data_dummy_2019-10-25T00:00:00.000Z_2019-10-26T00:00:00.000Z_2020-01-27T11:57:01.987Z] at new path[/druid/segments/:8101/:8101_indexer-executor__default_tier_2020-01-27T12:47:30.646Z_7efbbaeeaf564f148c43e9a25ecd276f0]

2020-01-27T12:47:30,692 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_kafka_data_dummy_5e7a3d8edc595b6_dhmnaldm]: SegmentAllocateAction{dataSource=‘data_dummy’, timestamp=2019-11-14T06:38:41.000Z, queryGranularity=NoneGranularity, preferredSegmentGranularity={type=period, period=P1D, timeZone=UTC, origin=null}, sequenceName=‘index_kafka_data_dummy_5e7a3d8edc595b6_0’, previousSegmentId=‘data_dummy_2019-10-25T00:00:00.000Z_2019-10-26T00:00:00.000Z_2020-01-27T11:57:01.987Z’, skipSegmentLineageCheck=true, shardSpecFactory=org.apache.druid.timeline.partition.NumberedShardSpecFactory@5af72e7c, lockGranularity=TIME_CHUNK}

2020-01-27T12:47:30,694 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_kafka_data_dummy_5e7a3d8edc595b6_dhmnaldm] to overlord: [SegmentAllocateAction{dataSource=‘data_dummy’, timestamp=2019-11-14T06:38:41.000Z, queryGranularity=NoneGranularity, preferredSegmentGranularity={type=period, period=P1D, timeZone=UTC, origin=null}, sequenceName=‘index_kafka_data_dummy_5e7a3d8edc595b6_0’, previousSegmentId=‘data_dummy_2019-10-25T00:00:00.000Z_2019-10-26T00:00:00.000Z_2020-01-27T11:57:01.987Z’, skipSegmentLineageCheck=true, shardSpecFactory=org.apache.druid.timeline.partition.NumberedShardSpecFactory@5af72e7c, lockGranularity=TIME_CHUNK}].

2020-01-27T12:47:30,708 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - New segment[data_dummy_2019-11-14T00:00:00.000Z_2019-11-15T00:00:00.000Z_2020-01-27T11:57:02.429Z] for row[MapBasedInputRow{timestamp=2019-11-14T06:38:41.000Z, event={comment=Wide alone project market trouble., end_date=2019-11-29 21:44:21, ip=203.0.112.156, company=Thornton-Jones, lattittude=-43.0198165, index=4, lpIDHash=f12a4108-41ff-4b1b-89e6-ff5be3ad4411, url=https://www.owens.org/, country=DZ, zipcode=64679, longitude=-12.292750, location=36025 Camacho Ports, created_date=2019-11-14 06:38:41, user_name=David Hopkins, email=emailid}, dimensions=[comment, end_date, ip, company, lattittude, index, lpIDHash, url, country, zipcode, longitude, location, created_date, user_name, email]}] sequenceName[index_kafka_data_dummy_5e7a3d8edc595b6_0].

2020-01-27T12:47:30,718 INFO [task-runner-0-priority-0] org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[data_dummy_2019-11-14T00:00:00.000Z_2019-11-15T00:00:00.000Z_2020-01-27T11:57:02.429Z] at existing path[/druid/segments/:8101/:8101_indexer-executor__default_tier_2020-01-27T12:47:30.646Z_7efbbaeeaf564f148c43e9a25ecd276f0]

2020-01-27T12:48:26,596 INFO [parent-monitor-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Triggering JVM shutdown.

2020-01-27T12:48:26,597 INFO [Thread-25] org.apache.druid.cli.CliPeon - Running shutdown hook

2020-01-27T12:48:26,602 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle [module] stage [ANNOUNCEMENTS]

2020-01-27T12:48:26,606 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.curator.announcement.Announcer.stop()] on object[org.apache.druid.curator.announcement.Announcer@5e541ef9].

2020-01-27T12:48:26,606 INFO [Thread-25] org.apache.druid.curator.announcement.Announcer - Stopping announcer

2020-01-27T12:48:26,607 INFO [Thread-25] org.apache.druid.curator.announcement.Announcer - unannouncing [/druid/announcements/:8101]

2020-01-27T12:48:26,659 INFO [Thread-25] org.apache.druid.curator.announcement.Announcer - unannouncing [/druid/listeners/lookups/__default/http::8101]

2020-01-27T12:48:26,679 INFO [Thread-25] org.apache.druid.curator.announcement.Announcer - unannouncing [/druid/segments/:8101/:8101_indexer-executor__default_tier_2020-01-27T12:47:30.646Z_7efbbaeeaf564f148c43e9a25ecd276f0]

2020-01-27T12:48:26,681 INFO [Thread-25] org.apache.druid.curator.announcement.Announcer - unannouncing [/druid/internal-discovery/PEON/:8101]

2020-01-27T12:48:26,683 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle [module] stage [SERVER]

2020-01-27T12:48:26,683 INFO [Thread-25] org.apache.druid.server.initialization.jetty.JettyServerModule - Stopping Jetty Server…

2020-01-27T12:48:26,688 INFO [Thread-25] org.eclipse.jetty.server.AbstractConnector - Stopped ServerConnector@50b4364{HTTP/1.1,[http/1.1]}{0.0.0.0:8101}

2020-01-27T12:48:26,688 INFO [Thread-25] org.eclipse.jetty.server.session - node0 Stopped scavenging

2020-01-27T12:48:26,692 INFO [Thread-25] org.eclipse.jetty.server.handler.ContextHandler - Stopped o.e.j.s.ServletContextHandler@4a336377{/,null,UNAVAILABLE}

2020-01-27T12:48:26,698 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle [module] stage [NORMAL]

2020-01-27T12:48:26,698 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.server.listener.announcer.ListenerResourceAnnouncer.stop()] on object[org.apache.druid.query.lookup.LookupResourceListenerAnnouncer@3af236d0].

2020-01-27T12:48:26,698 INFO [Thread-25] org.apache.druid.server.listener.announcer.ListenerResourceAnnouncer - Unannouncing start time on [/druid/listeners/lookups/__default/http::8101]

2020-01-27T12:48:26,698 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.query.lookup.LookupReferencesManager.stop()] on object[org.apache.druid.query.lookup.LookupReferencesManager@5514579e].

2020-01-27T12:48:26,698 INFO [Thread-25] org.apache.druid.query.lookup.LookupReferencesManager - LookupExtractorFactoryContainerProvider is stopping.

2020-01-27T12:48:26,698 INFO [LookupExtractorFactoryContainerProvider-MainThread] org.apache.druid.query.lookup.LookupReferencesManager - Lookup Management loop exited, Lookup notices are not handled anymore.

2020-01-27T12:48:26,698 INFO [Thread-25] org.apache.druid.query.lookup.LookupReferencesManager - LookupExtractorFactoryContainerProvider is stopped.

2020-01-27T12:48:26,699 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.indexing.worker.executor.ExecutorLifecycle.stop() throws java.lang.Exception] on object[org.apache.druid.indexing.worker.executor.ExecutorLifecycle@164d01ba].

2020-01-27T12:48:26,699 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner.stop()] on object[org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner@401e02b4].

2020-01-27T12:48:26,699 INFO [Thread-25] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_data_dummy_5e7a3d8edc595b6_dhmnaldm].

2020-01-27T12:48:26,700 INFO [Thread-25] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping forcefully (status: [READING])

2020-01-27T12:48:26,700 INFO [Thread-25] org.apache.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_data_dummy_5e7a3d8edc595b6_dhmnaldm] status changed to [FAILED].

2020-01-27T12:48:26,702 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.indexing.worker.IntermediaryDataManager.stop() throws java.lang.InterruptedException] on object[org.apache.druid.indexing.worker.IntermediaryDataManager@7f2b39a].

2020-01-27T12:48:26,703 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.client.cache.CaffeineCache.close()] on object[org.apache.druid.client.cache.CaffeineCache@7f5cde6e].

2020-01-27T12:48:26,705 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.discovery.DruidLeaderClient.stop()] on object[org.apache.druid.discovery.DruidLeaderClient@2450256f].

2020-01-27T12:48:26,705 INFO [Thread-25] org.apache.druid.discovery.DruidLeaderClient - Stopped.

2020-01-27T12:48:26,705 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.curator.discovery.ServerDiscoverySelector.stop() throws java.io.IOException] on object[org.apache.druid.curator.discovery.ServerDiscoverySelector@63917fe1].

2020-01-27T12:48:26,707 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.discovery.DruidLeaderClient.stop()] on object[org.apache.druid.discovery.DruidLeaderClient@18cf5c52].

2020-01-27T12:48:26,708 INFO [Thread-25] org.apache.druid.discovery.DruidLeaderClient - Stopped.

2020-01-27T12:48:26,708 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.curator.discovery.ServerDiscoverySelector.stop() throws java.io.IOException] on object[org.apache.druid.curator.discovery.ServerDiscoverySelector@4f811029].

2020-01-27T12:48:26,708 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.curator.discovery.CuratorDruidNodeDiscoveryProvider.stop()] on object[org.apache.druid.curator.discovery.CuratorDruidNodeDiscoveryProvider@3f68a7f8].

2020-01-27T12:48:26,708 INFO [Thread-25] org.apache.druid.curator.discovery.CuratorDruidNodeDiscoveryProvider - stopping

2020-01-27T12:48:26,708 INFO [Thread-25] org.apache.druid.curator.discovery.CuratorDruidNodeDiscoveryProvider - stopped

2020-01-27T12:48:26,708 INFO [Thread-25] org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void org.apache.druid.java.util.http.client.NettyHttpClient.stop()] on object[org.apache.druid.java.util.http.client.NettyHttpClient@9785903].

2020-01-27T12:48:26,700 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception in run() before persisting.

org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:493) ~[?:?]

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[?:?]

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[?:?]

at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243) ~[?:?]

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188) ~[?:?]

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) ~[?:?]

at org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:119) ~[?:?]

at org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:111) ~[?:?]

at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:575) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]

at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:259) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]

at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:177) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]

at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]

at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_212]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]

Caused by: java.lang.InterruptedException

… 17 more

2020-01-27T12:48:26,714 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Persisting all pending data

2020-01-27T12:48:26,718 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver - Persisting data.

Thanks

Soumya

Hi Soumya,
Do you suspect there is any pause or suspension of reading the topic?

The reason I ask is if there is gap in reading the data from Kafka topic and by the time you resume the task, druid tries to read from where it left off before suspension and if the data is dropped from topic by that time due to Kafka topic retention rules(default is 24 hours retention), the job might fail.

If this is not the case, you might have to dig into your overlord and middleManager logs for additional clues.

Thank you,

–siva