Hi All,
I am trying to ingest a Kafka topic containing Avro messages. I am following the instructions here: http://druid.io/docs/latest/development/extensions-core/avro.html, but I am running into a problem. First I discovered that the example is incorrect - the parseSpec has a “type” child set to timeAndDims but that should really be “format” not “type”. Once making that change I now get the following error:
2016-06-24 22:14:53,403 [KafkaConsumer-0] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:
java.lang.UnsupportedOperationException: not supported
at io.druid.data.input.impl.TimeAndDimsParseSpec$1.parse(TimeAndDimsParseSpec.java:52) ~[io.druid.druid-api-0.3.16.jar:0.3.16]
at io.druid.data.input.impl.StringInputRowParser.parseString(StringInputRowParser.java:126) ~[io.druid.druid-api-0.3.16.jar:0.3.16]
at io.druid.data.input.impl.StringInputRowParser.buildStringKeyMap(StringInputRowParser.java:113) ~[io.druid.druid-api-0.3.16.jar:0.3.16]
at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:74) ~[io.druid.druid-api-0.3.16.jar:0.3.16]
at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:37) ~[io.druid.druid-api-0.3.16.jar:0.3.16]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:177) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:177) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:195) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:195) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2$$anonfun$2.apply(TransformingBeam.scala:36) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.twitter.util.Try$.apply(Try.scala:13) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:36) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:35) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0] at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at com.metamx.tranquility.beam.TransformingBeam.sendAll(TransformingBeam.scala:35) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.tranquilizer.Tranquilizer.com$metamx$tranquility$tranquilizer$Tranquilizer$$sendBuffer(Tranquilizer.scala:301) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at scala.Option.foreach(Option.scala:257) ~[org.scala-lang.scala-library-2.11.7.jar:na]
at com.metamx.tranquility.tranquilizer.Tranquilizer.send(Tranquilizer.scala:202) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:76) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_101]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_101]
Looking at the code it is unclear to me how the TimeAndDimsParseSpec should actually be used since the Parser returned by makeParser will always throw unsupported operation exceptions.
My tranquility ingestion spec is:
{
“dataSources”:{
“business_events_test”:{
“spec”:{
“dataSchema”:{
“dataSource”:“business_events_test”,
“parser”:{
“type”:“avro_stream”,
“avroBytesDecoder”:{
“type”:“schema_repo”,
“subjectAndIdConverter”:{
“type”:“avro_1124”,
“topic”:“raw_shopkick_pylons_weblog_avro_v2”
},
“schemaRepository”:{
“type”:“avro_1124_rest_client”,
“url”:“http://data-catalog001:8081”
}
},
“parseSpec”:{
“format”:“timeAndDims”,
“timestampSpec”:{
“column”:“event_time”,
“format”:“auto”
},
“dimensionsSpec”:{
“dimensions”:[
]
}
}
},
“granularitySpec”:{
“type”:“uniform”,
“segmentGranularity”:“hour”,
“queryGranularity”:“minute”
},
“metricsSpec”:[
{
“type”:“count”,
“name”:“count”
}
]
},
“ioConfig”:{
“type”:“realtime”
},
“tuningConfig”:{
“type”:“realtime”,
“maxRowsInMemory”:“100000”,
“intermediatePersistPeriod”:“PT10M”,
“windowPeriod”:“PT10M”,
“partitionsSpec”:{
“type”:“hashed”,
“targetPartitionSize”:5000000
}
}
},
“properties”:{
“task.partitions”:“1”,
“task.replicants”:“1”,
“topicPattern”:“raw_shopkick_pylons_weblog_avro_v1”
}
}
},
“properties”:{
“zookeeper.connect”:“kafka001”,
“druid.discovery.curator.path”:"/druid/discovery",
“druid.selectors.indexing.serviceName”:“druid/overlord”,
“commit.periodMillis”:“15000”,
“consumer.numThreads”:“2”,
“kafka.zookeeper.connect”:“kafka001”,
“kafka.group.id”:“tranquility-kafka”
}
}
Help please!
Thanks,
-Ben