Cannot decode json encoded avro messages using kakfa indexing service

`2018-06-06T11:03:28,013 ERROR [task-runner-0-priority-0] io.druid.indexing.kafka.KafkaIndexTask - Encountered exception in run() before persisting.Enter code here…

com.jayway.jsonpath.PathNotFoundException: Exp`ected to find an object with property [‘string’] in path $[‘contentId’] but found ‘org.apache.avro.util.Utf8’. This is not a json object according to the JsonProvider: ‘io.druid.data.input.avro.GenericAvroJsonProvider’.``

``

at com.jayway.jsonpath.internal.path.PropertyPathToken.evaluate(PropertyPathToken.java:69) ~[json-path-2.1.0.jar:2.1.0] at com.jayway.jsonpath.internal.path.PathToken.handleObjectProperty(PathToken.java:81) ~[json-path-2.1.0.jar:2.1.0] at com.jayway.jsonpath.internal.path.PropertyPathToken.evaluate(PropertyPathToken.java:77) ~[json-path-2.1.0.jar:2.1.0] at com.jayway.jsonpath.internal.path.RootPathToken.evaluate(RootPathToken.java:62) ~[json-path-2.1.0.jar:2.1.0] at com.jayway.jsonpath.internal.path.CompiledPath.evaluate(CompiledPath.java:53) ~[json-path-2.1.0.jar:2.1.0] at com.jayway.jsonpath.internal.path.CompiledPath.evaluate(CompiledPath.java:61) ~[json-path-2.1.0.jar:2.1.0] at com.jayway.jsonpath.JsonPath.read(JsonPath.java:187) ~[json-path-2.1.0.jar:2.1.0] at io.druid.data.input.avro.AvroFlattenerMaker.lambda$makeJsonPathExtractor$1(AvroFlattenerMaker.java:90) ~[?:?] at io.druid.java.util.common.parsers.ObjectFlatteners$1$1.get(ObjectFlatteners.java:112) ~[java-util-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.java.util.common.parsers.ObjectFlatteners$1$1$1.getValue(ObjectFlatteners.java:182) ~[java-util-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at java.util.AbstractMap.toString(AbstractMap.java:556) ~[?:1.8.0_161] at java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_161] at java.lang.StringBuilder.append(StringBuilder.java:131) ~[?:1.8.0_161] at io.druid.data.input.MapBasedInputRow.toString(MapBasedInputRow.java:67) ~[druid-api-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886) ~[?:1.8.0_161] at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763) ~[?:1.8.0_161] at java.util.Formatter.format(Formatter.java:2520) ~[?:1.8.0_161] at java.util.Formatter.format(Formatter.java:2455) ~[?:1.8.0_161] at java.lang.String.format(String.java:2981) ~[?:1.8.0_161] at io.druid.java.util.common.StringUtils.nonStrictFormat(StringUtils.java:132) ~[java-util-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.java.util.common.logger.Logger.info(Logger.java:71) [java-util-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.getSegment(BaseAppenderatorDriver.java:242) ~[druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.append(BaseAppenderatorDriver.java:289) ~[druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.segment.realtime.appenderator.StreamAppenderatorDriver.add(StreamAppenderatorDriver.java:176) ~[druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.indexing.kafka.KafkaIndexTask.runInternal(KafkaIndexTask.java:731) [druid-kafka-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:434) [druid-kafka-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:456) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:428) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]

``

Hi Team,
I have Kafka Indexing Service running against a kafka topic that has avro messages. These avro messages are json encoded.
The schemas are published to schema registry (confluent).
The json encoded messages are nested constructs and therefore a flatten spec., is used to decode. However the AvroFlattener complains about a type mismatch against the payload.
Stack Trace:******
@Indexing Task**
{
“type” : “kafka”,
“dataSchema” : {
“dataSource” : “”,
“parser” : {
“type” : “avro_stream”,
“avroBytesDecoder”:{
“type”:“schema_registry”,
“url”:“http://”
},
“parseSpec” : {
“format” : “avro”,
“flattenSpec”: {
“useFieldDiscovery”: true,
“fields”: [
{
“type”:“path”,
“name”:“contentId”,
“expr”:"$.contentId.string"
},


“dimensionsSpec” : {},
“timestampSpec”: {
“column”: “eventTime”,
“format”: “auto”
}
}
},
“metricsSpec” : ,
“granularitySpec” : {
“type” : “uniform”,
“segmentGranularity” : “DAY”,
“queryGranularity” : “NONE”,
“rollup” : false
}
},
“ioConfig” : {
“topic” : “”,
“consumerProperties”: {
“bootstrap.servers”: “”,
“group.id”: “”
},
“replicas”:“2”,
“taskCount”:“1”,
“taskDuration”: “PT30M”,
“useEarliestOffset”:true
},
“tuningConfig” : {
“type” : “kafka”,
“resetOffsetAutomatically”:true
}
}

``

@AVRO Schema

{
“schema”: “{“type”:
“record”,
“name”:
“AtomicEvent”,
“namespace”:”<Masked",
“fields”:[
{

{“name”:“contentId”,“type”:[“null”,“string”],“default”:null}

}

``

Sorted:
Please forego this email

Hi Varaga - i have exactly the same issue. Can you give details on how you sorted it?

Hi John,

I vaguely remember that it was to do with path expression syntax. You can use the evaluator below to check the expressions.

https://jsonpath.herokuapp.com

Best Regards

Varaga

Hi Varaga - thanks for the quick reply. My path expression seems to work in the web-based evaluators. So this has nothing to do with the issue you reported about they keys being Avro UTF-8 rather than unicode?

Well, there is a bug that I raised: https://github.com/druid-io/druid/issues/5884
The problem was the dimensions came back with 0 cardinalities in their values, no exceptions raised.

The issue you report has exceptions.