Poison messages & Kafka Indexing Service

Hi all,

We had an accident in our staging environment recently where a badly formed message made its way into the kafka topic being monitored by the kafka indexing service. The timestamp was not filled out. This turned out to be fairly dire for Druid because it does not appear that it will ever drop the poison message(s), meaning that it loops forever failing the indexing task.

2016-09-25T19:52:16,495 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Shutting down…

2016-09-25T19:52:16,502 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KafkaIndexTask{id=index_kafka_sor_shopkick_app_reviews_9eede2d4cc60df9_aaaghmac, type=index_kafka, dataSource=sor_shopkick_app_reviews}]

java.lang.NullPointerException: timestamp

at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229) ~[guava-16.0.1.jar:?]

at io.druid.indexing.common.actions.SegmentAllocateAction.(SegmentAllocateAction.java:104) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.appenderator.ActionBasedSegmentAllocator.allocate(ActionBasedSegmentAllocator.java:57) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.getSegment(FiniteAppenderatorDriver.java:341) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.add(FiniteAppenderatorDriver.java:195) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:417) ~[?:?]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_101]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_101]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_101]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]

2016-09-25T19:52:16,510 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_sor_shopkick_app_reviews_9eede2d4cc60df9_aaaghmac] status changed to [FAILED].

2016-09-25T19:52:16,515 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {

“id” : “index_kafka_sor_shopkick_app_reviews_9eede2d4cc60df9_aaaghmac”,

“status” : “FAILED”,

“duration” : 713

}

What should one do to recover from this circumstance?

As this is in stage at the moment I can just wipe the topic and start over, but if this ever happened in production I need a way to recover without losing data.

Thanks,

What parser are you using?

The string parser, at least, will throw parse exceptions on null timestamps. The Kafka index task (and probably other things too) depends on this behavior. Based on the exception you’re getting, I’d guess you are using a parser that returns null timestamps rather than throwing a parse exception.

I am using the Avro extension. So if the Avro parser throws a ParseException the Kafka Indexing Service will just drop the message?

It will as long as “reportParseExceptions”:“false” in the supervisor spec (which is the default).