Druid avro extension resulting in UnsupportedOperationException from Tranquility

Hi All,

I am trying to ingest a Kafka topic containing Avro messages. I am following the instructions here: http://druid.io/docs/latest/development/extensions-core/avro.html, but I am running into a problem. First I discovered that the example is incorrect - the parseSpec has a “type” child set to timeAndDims but that should really be “format” not “type”. Once making that change I now get the following error:

2016-06-24 22:14:53,403 [KafkaConsumer-0] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:

java.lang.UnsupportedOperationException: not supported

    at io.druid.data.input.impl.TimeAndDimsParseSpec$1.parse(TimeAndDimsParseSpec.java:52) ~[io.druid.druid-api-0.3.16.jar:0.3.16]

    at io.druid.data.input.impl.StringInputRowParser.parseString(StringInputRowParser.java:126) ~[io.druid.druid-api-0.3.16.jar:0.3.16]

    at io.druid.data.input.impl.StringInputRowParser.buildStringKeyMap(StringInputRowParser.java:113) ~[io.druid.druid-api-0.3.16.jar:0.3.16]

    at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:74) ~[io.druid.druid-api-0.3.16.jar:0.3.16]

    at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:37) ~[io.druid.druid-api-0.3.16.jar:0.3.16]

    at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:177) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:177) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:195) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:195) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2$$anonfun$2.apply(TransformingBeam.scala:36) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.twitter.util.Try$.apply(Try.scala:13) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:36) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:35) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at com.metamx.tranquility.beam.TransformingBeam.sendAll(TransformingBeam.scala:35) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.tranquilizer.Tranquilizer.com$metamx$tranquility$tranquilizer$Tranquilizer$$sendBuffer(Tranquilizer.scala:301) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at scala.Option.foreach(Option.scala:257) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at com.metamx.tranquility.tranquilizer.Tranquilizer.send(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:76) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_101]

    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_101]

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_101]

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_101]

    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_101]

Looking at the code it is unclear to me how the TimeAndDimsParseSpec should actually be used since the Parser returned by makeParser will always throw unsupported operation exceptions.

My tranquility ingestion spec is:

{

“dataSources”:{

“business_events_test”:{

“spec”:{

“dataSchema”:{

“dataSource”:“business_events_test”,

“parser”:{

“type”:“avro_stream”,

“avroBytesDecoder”:{

“type”:“schema_repo”,

“subjectAndIdConverter”:{

“type”:“avro_1124”,

“topic”:“raw_shopkick_pylons_weblog_avro_v2”

},

“schemaRepository”:{

“type”:“avro_1124_rest_client”,

“url”:“http://data-catalog001:8081

}

},

“parseSpec”:{

“format”:“timeAndDims”,

“timestampSpec”:{

“column”:“event_time”,

“format”:“auto”

},

“dimensionsSpec”:{

“dimensions”:[

]

}

}

},

“granularitySpec”:{

“type”:“uniform”,

“segmentGranularity”:“hour”,

“queryGranularity”:“minute”

},

“metricsSpec”:[

{

“type”:“count”,

“name”:“count”

}

]

},

“ioConfig”:{

“type”:“realtime”

},

“tuningConfig”:{

“type”:“realtime”,

“maxRowsInMemory”:“100000”,

“intermediatePersistPeriod”:“PT10M”,

“windowPeriod”:“PT10M”,

“partitionsSpec”:{

“type”:“hashed”,

“targetPartitionSize”:5000000

}

}

},

“properties”:{

“task.partitions”:“1”,

“task.replicants”:“1”,

“topicPattern”:“raw_shopkick_pylons_weblog_avro_v1”

}

}

},

“properties”:{

“zookeeper.connect”:“kafka001”,

“druid.discovery.curator.path”:"/druid/discovery",

“druid.selectors.indexing.serviceName”:“druid/overlord”,

“commit.periodMillis”:“15000”,

“consumer.numThreads”:“2”,

“kafka.zookeeper.connect”:“kafka001”,

“kafka.group.id”:“tranquility-kafka”

}

}

Help please!

Thanks,

-Ben

Hey Ben,

I’m pretty sure this means that the avro extension was not actually loaded by Tranquility, and it’s defaulting to the string parser. See https://github.com/druid-io/tranquility/blob/master/docs/configuration.md#loading-druid-extensions for docs on how to load extensions. In particular, something like this line should get it loaded (this uses paths from the Imply distro but the idea is the same no matter how you run things):

bin/tranquility -Ddruid.extensions.directory=dist/druid/extensions -Ddruid.extensions.loadList=’[“druid-avro-extensions”]’ kafka -configFile conf/tranquility/kafka.json

Please note that due to a packaging oversight, the avro extension is not actually included in Druid 0.9.0 (will be fixed in 0.9.1) so you will have to download it using pull-deps (imply distro: http://imply.io/docs/latest/extensions.html, see “community and third-party extensions”; community distro: http://druid.io/docs/latest/operations/including-extensions.html). In both cases the coordinate is io.druid.extensions:druid-avro-extensions:0.9.0.

Thank you that was the problem. I saw the loadList=null in the logs, but the documentation indicates that should load all extensions. I didn’t actually check that a core extension was missing.

Thanks again!

–Ben

Hi,

Did you ever made this work?
I keep getting :
ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:
java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:80) ~[na:na]

Any help would be appreciated.

Thanks,

Hi,

I had the same issue. It turned out that it wasn’t able to resolve the schema, end therefore it would become null, and cause the exception. Hope this helps for further reference.

Cheers, Fokko