Kafka Avro Tranquility failure

Hi,

Been having a heck of a time getting druid to ingest messages off a kafka feed. I trying to ingest an avro serialized message using latest druid 0.9.2 and tranquility but getting the following exception :

2017-01-09 21:55:47,494 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed 0 pending messages in 0ms and committed offsets in 9ms.

``
2017-01-09 21:55:47,506 [KafkaConsumer-1] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:

java.lang.NoClassDefFoundError: io/druid/indexing/common/task/AbstractTask

    at java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.8.0_45]

    at java.lang.ClassLoader.defineClass(ClassLoader.java:760) ~[na:1.8.0_45]

    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[na:1.8.0_45]

    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[na:1.8.0_45]

    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[na:1.8.0_45]

    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[na:1.8.0_45]

    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[na:1.8.0_45]

    at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_45]

    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) ~[na:1.8.0_45]

    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_45]

    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_45]

    at io.druid.indexing.kafka.KafkaIndexTaskModule.getJacksonModules(KafkaIndexTaskModule.java:39) ~[na:na]

    at com.metamx.tranquility.druid.DruidGuicer.registerWithJackson$1(DruidGuicer.scala:80) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.druid.DruidGuicer.com$metamx$tranquility$druid$DruidGuicer$$toGuiceModule$1(DruidGuicer.scala:89) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.druid.DruidGuicer$$anonfun$3.apply(DruidGuicer.scala:127) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.druid.DruidGuicer$$anonfun$3.apply(DruidGuicer.scala:127) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.generic.Growable$class.loop$1(Growable.scala:54) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.TraversableLike$class.$plus$plus(TraversableLike.scala:158) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.immutable.List.$plus$plus(List.scala:196) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at com.metamx.tranquility.druid.DruidGuicer.<init>(DruidGuicer.scala:128) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.druid.DruidGuicer$.<init>(DruidGuicer.scala:138) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.druid.DruidGuicer$.<clinit>(DruidGuicer.scala) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.druid.DruidBeams$.makeFireDepartment(DruidBeams.scala:433) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:299) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:204) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.kafka.KafkaBeamUtils$.createTranquilizer(KafkaBeamUtils.scala:40) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.kafka.KafkaBeamUtils.createTranquilizer(KafkaBeamUtils.scala) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.<init>(TranquilityEventWriter.java:64) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.kafka.writer.WriterController.createWriter(WriterController.java:171) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.kafka.writer.WriterController.getWriter(WriterController.java:98) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

    at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]

    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]

    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

Caused by: java.lang.ClassNotFoundException: io.druid.indexing.common.task.AbstractTask

    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_45]

    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_45]

    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_45]

    ... 43 common frames omitted

2017-01-09 21:55:47,530 [KafkaConsumer-1] INFO c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets

2017-01-09 21:55:47,531 [Curator-Framework-0] INFO o.a.c.f.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting

2017-01-09 21:55:47,541 [KafkaConsumer-1] INFO org.apache.zookeeper.ZooKeeper - Session: 0x159853a76f10000 closed

2017-01-09 21:55:47,541 [KafkaConsumer-1-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x159853a76f10000

2017-01-09 21:55:47,544 [KafkaConsumer-1] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka4_a502d875b05e-1483998924132-c4c44796], ZKConsumerConnector shutting down

AbstractTask appears to just use jackson to do some mapping but the NoClassFound bit seems odd.

json is pretty simple:

{

“dataSources”: [

{

“spec”: {

“dataSchema”: {

“dataSource”: “LogEvents”,

“parser”: {

“type”: “avro_stream”,

“avroBytesDecoder”: {

“type”: “schema_repo”,

“subjectAndIdConverter”: {

“type”: “avro_1124”,

“topic”: “logEvents”

}

},

“schemaRepository”: {

“type”: “avro_1124_rest_client”,

“url”: “kafka-schema:8081”

},

“parseSpec”: {

“format”: “timeAndDims”,

“timestampSpec”: {

“column”: “time”,

“format”: “millis”

},

“dimensionsSpec”: {

“dimensions”: [

“threadname”,

“requestMarker”,

“server”,

“service”,

“logLevel”,

“logger”,

“message”

]

}

}

},

“metricsSpec”: [

{

“type”: “count”,

“name”: “time”

}

],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “hour”,

“queryGranularity”: “none”

}

},

“tuningConfig”: {

“type”: “realtime”,

“maxRowsInMemory”: 100000,

“intermediatePersistPeriod”: “PT200M”,

“windowPeriod”: “PT200M”

}

},

“properties”: {

“topicPattern”: “logEvents”,

“topicPattern.priority”: 1

}

}

],

“properties”: {

“zookeeper.connect”: “localhost:2181”,

“zookeeper.timeout”: “PT20S”,

“druid.zk.paths.base”: “/druid”,

“druid.discovery.curator.path”: “druid/discovery”,

“druid.selectors.indexing.serviceName”: “druid:overlord”,

“kafka.zookeeper.connect”: “zk000:2181,zk001:2181,zk002:2181”,

“kafka.group.id”: “tranquility-kafka4”,

“consumer.numThreads”: 2,

“commit.periodMillis”: 15000,

“reportDropsAsExceptions”: false

}

}

``

Hey Colin,

How are you running tranquility? What command & classpath?

A couple things jump out:

  1. There’s no reason for Tranquility to be loading the KafkaIndexTaskModule (from the druid-kafka-indexing-service extension), it doesn’t use that.

  2. io.druid.indexing.common.task.AbstractTask should be available on the classpath when Tranquility is run normally.

Both of those point to possible classpath or config issues.

Pretty simply just calling tranquility like

/tranquility/bin/tranquility kafka -configFile /tranquility/conf/kafka.json

``

though a supervisor process

Any idea how that module from druid-kafka-indexing-service got loaded? Is that specified in your tranquility config or is it on the classpath?

Hi Gian,

Thanks for the helpful hint.

So after some further experimentation it turns out that by leaving out the -Ddruid.extensions.loadList option off from the tranquility startup params did indeed take the default druid behavior of loading all extensions which explains the AbstractTask issue. The abstractTask error was resolved by adding the -Ddruid.extensions.loadList= to the command. This brings up other issues but I will visit that in another thread