AvroRuntimeException serializing Avro parsed messages

Hi all,

I am trying to use the Avro extension to parse Avro messages stored in Kafka. I’m afraid my understanding of tranquility/druid+jackson is still a little shaky so I am not understanding the source of the error I am running into (see below). The schema is successfully retrieved from the schema repo and the message is parsed, but it looks like the schema is embedded in the returned rows and the serialization of those rows fails because of it. I don’t know if this is a problem with my schema definition, avro/jackson interaction, or the avro extension. Can anyone here point me in the right direction?

The schema is quite large so I have not included the whole thing here, but this is the snippet defining the field that is failing to serialize (pulled from the schema repo):

{

"name" : "device_platform_type",

"type" : [ "null", {

  "type" : "enum",

  "name" : "device_platform_type_enum",

  "symbols" : [ "ios", "android", "web" ]

} ],

"default" : null

}

And here is the exception:

2016-06-29 02:17:54,488 [KafkaConsumer-0] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:

java.lang.RuntimeException: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {“type”:“enum”,“name”:“device_platform_type_enum”,“namespace”:“com.shopkick.data”,“symbols”:[“ios”,“android”,“web”]} (through reference chain: org.apache.avro.generic.EnumSymbol[“schema”]->org.apache.avro.EnumSchema[“valueType”])

    at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-16.0.1.jar:na]

    at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.maybeThrow(TranquilityEventWriter.java:138) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:105) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_101]

    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_101]

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_101]

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_101]

    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_101]

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {“type”:“enum”,“name”:“device_platform_type_enum”,“namespace”:“com.shopkick.data”,“symbols”:[“ios”,“android”,“web”]} (through reference chain: org.apache.avro.generic.EnumSymbol[“schema”]->org.apache.avro.EnumSchema[“valueType”])

    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:187) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:647) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:152) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:505) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:639) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:152) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]

    at com.metamx.tranquility.druid.input.InputRowObjectWriter$$anonfun$com$metamx$tranquility$druid$input$InputRowObjectWriter$$writeJson$2.apply(InputRowObjectWriter.scala:86) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.druid.input.InputRowObjectWriter$$anonfun$com$metamx$tranquility$druid$input$InputRowObjectWriter$$writeJson$2.apply(InputRowObjectWriter.scala:84) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at com.metamx.tranquility.druid.input.InputRowObjectWriter.com$metamx$tranquility$druid$input$InputRowObjectWriter$$writeJson(InputRowObjectWriter.scala:84) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.druid.input.InputRowObjectWriter$$anonfun$batchAsBytes$1.apply(InputRowObjectWriter.scala:67) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.druid.input.InputRowObjectWriter$$anonfun$batchAsBytes$1.apply(InputRowObjectWriter.scala:67) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at com.metamx.tranquility.druid.input.InputRowObjectWriter.batchAsBytes(InputRowObjectWriter.scala:67) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.beam.MessageHolder$$anon$3.batchAsBytes(MessageHolder.scala:83) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.druid.DruidBeam$$anonfun$3.apply(DruidBeam.scala:75) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.druid.DruidBeam$$anonfun$3.apply(DruidBeam.scala:75) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.AbstractIterator.to(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:292) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.AbstractIterator.toList(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at com.metamx.tranquility.druid.DruidBeam.sendAll(DruidBeam.scala:76) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.beam.MergingPartitioningBeam$$anonfun$sendAll$2.apply(MergingPartitioningBeam.scala:43) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.beam.MergingPartitioningBeam$$anonfun$sendAll$2.apply(MergingPartitioningBeam.scala:42) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-lang.scala-library-2.11.7.jar:na]

    at com.metamx.tranquility.beam.MergingPartitioningBeam.sendAll(MergingPartitioningBeam.scala:42) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$sendAll$2$$anonfun$26.apply(ClusteredBeam.scala:388) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$sendAll$2$$anonfun$26.apply(ClusteredBeam.scala:379) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

    at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:112) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.util.Promise$Transformer.k(Promise.scala:112) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.util.Promise$Transformer.apply(Promise.scala:122) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.util.Promise$Transformer.apply(Promise.scala:103) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.util.Promise$$anon$1.run(Promise.scala:366) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:178) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:136) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:207) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:92) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.util.Promise.runq(Promise.scala:350) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.util.Promise.updateIfEmpty(Promise.scala:721) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    at com.twitter.util.ExecutorServiceFuturePool$$anon$2.run(FuturePool.scala:107) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

    ... 5 common frames omitted

Caused by: org.apache.avro.AvroRuntimeException: Not a map: {“type”:“enum”,“name”:“device_platform_type_enum”,“namespace”:“com.shopkick.data”,“symbols”:[“ios”,“android”,“web”]}

    at org.apache.avro.Schema.getValueType(Schema.java:267) ~[na:na]

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_101]

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_101]

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_101]

    at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_101]

    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:466) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:639) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

Thanks!

–Ben

I upgraded to Druid 0.9.1 and tried the new Kafka Indexing Service and it works with the same messages and schema. Very excited that it doesn’t require a time window and guarantees exactly once semantics as well. I would still like to understand why Tranquility doesn’t work.

–Ben

Hey Ben,

Based on your stack trace it looks like Tranquility is getting confused by the EnumSymbol object returned by Avro. It’s trying to transcode that to JSON for sending to Druid (since Tranquility -> Druid communication is always JSON regardless of your input datatype) but failing. I filed this issue for that: https://github.com/druid-io/tranquility/issues/183

Thanks very much for your response Gian! So far the kafka indexing service has been stable, but if I run into problems with it I can try changing the enum to a string. I guess people are either not using a) Avro enums, or b) the Avro format for ingesting data with Tranquility. I wonder if I am making my life more difficult than necessary…

–Ben