Tranquility throwing KafkaConsumer NullPointer Exception "randomly"

I have a problem driving me nuts. I’m reading from Kafka, but Tranquility throws an NullPointer exception after what seems like a random amount of time, usually between 15-45 minutes after starting. Up until it breaks, everything works just fine and data is pouring into Druid.

Once the error appears tranquility tries to restart but the same error keeps repeating over and over. If I leave it for a long time, maybe a couple of hours, it sometimes starts working again and then the process repeats after a while. When the error occurs I can sometimes get it to start up again by restarting Tranquility after either changing the name of the consumer group or clearing out all Kafka/Tranquility data from zookeeper.

Kafka and zookeeper logs gives me nothing to go on. No errors, nothing out of the ordinary. I’m out of ideas.

This is what shows up in the Tranquility log;

2016-10-20 09:00:13,678 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {bodil-snmp-parser-out={receivedCount=1, sentCount=1, drop pedCount=0, unparseableCount=0}} pending messages in 9229ms and committed offsets in 55ms.

2016-10-20 09:00:32,725 [KafkaConsumer-0] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:

java.lang.NullPointerException: null

    at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) ~[na:1.7.0_91]
    at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:181) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:180) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:198) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:198) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2$$anonfun$2.apply(TransformingBeam.scala:36) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at com.twitter.util.Try$.apply(Try.scala:13) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]
    at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:36) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:35) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) ~[org.scala-lang.scala-library-2.11.7.jar:na]
    at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-library-2.11.7.jar:na]
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-2.11.7.jar:na]
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-lang.scala-library-2.11.7.jar:na]
    at com.metamx.tranquility.beam.TransformingBeam.sendAll(TransformingBeam.scala:35) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at [com.metamx.tranquility.tranquilizer.Tranquilizer.com](http://com.metamx.tranquility.tranquilizer.tranquilizer.com/)$metamx$tranquility$tranquilizer$Tranquilizer$$sendBuffer(Tranquilizer.scala:301) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$3.apply(Tranquilizer.scala:116) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
    at com.metamx.common.scala.concurrent.package$$anon$1.run(package.scala:37) ~[com.metamx.scala-util_2.11-1.11.6.jar:1.11.6]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_91]

2016-10-20 09:00:32,725 [KafkaConsumer-0] INFO c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets

2016-10-20 09:00:32,726 [KafkaConsumer-0] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac], ZKConsumerConnector shutting down

2016-10-20 09:00:32,737 [KafkaConsumer-0] INFO k.c.ZookeeperTopicEventWatcher - Shutting down topic event watcher.

2016-10-20 09:00:32,737 [KafkaConsumer-0] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1476953842862] Stopping leader finder thread

2016-10-20 09:00:32,737 [KafkaConsumer-0] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-leader-finder-thread], Shutting down

2016-10-20 09:00:32,738 [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-leader-finder-thread] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-leader-finder-thread], Stopped

2016-10-20 09:00:32,738 [KafkaConsumer-0] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-leader-finder-thread], Shutdown completed

2016-10-20 09:00:32,738 [KafkaConsumer-0] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1476953842862] Stopping all fetchers

2016-10-20 09:00:32,738 [KafkaConsumer-0] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-0-6], Shutting down

2016-10-20 09:00:32,739 [ConsumerFetcherThread-tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-0-6] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-0-6], Stopped

2016-10-20 09:00:32,739 [KafkaConsumer-0] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-0-6], Shutdown completed

I’m fairly new to Kafka/Tranquility/Druid so I’m having a hard time figuring out what’s happening here. Why does KafkaConsumer suddenly stop working? How come the problem persists once it appears? I can see in Zookeeper that the offset is reset quite a bit when this happens, is Tranquility trying to read something that isn’t there anymore and that’s why it keeps failing?

Any clues to what might be going on here? This is my first time asking for help in one of these groups so I apologize in advance if I haven’t provided enough info, please just let me know in that case.

Any help is appreciated.

Hi,

That exception is being thrown because your Kafka topic has null messages in it. Is it possible to have your Kafka producers not generate null events?

If you’re just starting ingesting from Kafka into Druid, I’d recommend taking a look at the Kafka indexing service to see if it fits your use case. It’s still considered experimental but is getting more usage and so far looks reasonably stable. It doesn’t require a separate tranquility-kafka process, provides an exactly-once ingestion guarantee, and can ingest historical data as well as recent data. These things should hopefully give you a better ingestion experience.

There’s documentation here if you’re interested: http://druid.io/docs/0.9.1.1/development/extensions-core/kafka-ingestion.html