Unparse Avro message using tranquility kafka to ingest real time dat

Hi,

I’m using tranquility to ingest real time data from kafka to Druid. My data is store in kafka with avro format in kafka.

I used this conf file for tranquility kafka. My kafka message is nested Json. So i try to use FlattenSpec to extract field in my schema. I have a problem as you can see after config file i use.

Can you help me please?

Thanks in advance/

{

“dataSources”: [

{

“spec”: {

“dataSchema”: {

“dataSource”: “clients”,

“parser”: {

“type”: “avro_stream”,

“avroBytesDecoder”: {

“type”: “schema_repo”,

“subjectAndIdConverter”: {

“type”: “avro_1124”,

“topic”: “clients”

}

},

“schemaRepository”: {

“type”: “avro_1124_rest_client”,

“url”: “http://registry.integration.com

},

“parseSpec”: {

“format”: “json”,

“flattenSpec”: {

“useFieldDiscovery”: false,

“fields”: [

{

“type”: “path”,

“name”: “hostname”,

“expr”: “$.body.hostname”

},

{

“type”: “path”,

“name”: “mac”,

“expr”: “$.body.mac”

},

{

“type”: “path”,

“name”: “ip”,

“expr”: “$.body.ip”

},

{

“type”: “path”,

“name”: “component”,

“expr”: “$.body.component”

}

]

},

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”

},

“dimensionsSpec” : {

“dimensions” : ,

“dimensionsExclusions”: [“ignore_me”]

},

}

},

“metricsSpec”: [

{

“type”: “count”,

“name”: “body.version”

}

],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “hour”,

“queryGranularity”: “none”

}

},

“tuningConfig”: {

“type”: “realtime”,

“maxRowsInMemory”: 100000,

“intermediatePersistPeriod”: “PT200M”,

“windowPeriod”: “PT200M”

}

},

“properties”: {

“topicPattern”: “clients”,

“topicPattern.priority”: 1

}

}

],

“properties”: {

“zookeeper.connect”: “zookeeper.integration.com”,

“zookeeper.timeout”: “PT20S”,

“druid.zk.paths.base”: “/mnt/druid”,

“druid.discovery.curator.path”: “druid/discovery”,

“druid.selectors.indexing.serviceName”: “druid:overlord”,

“kafka.zookeeper.connect”: “zookeeper.integration:2181”,

“kafka.group.id”: “tranquility-kafka”,

“consumer.numThreads”: 1,

“commit.periodMillis”: 15000,

“reportDropsAsExceptions”: false

}

}

``

[root@master tranquility]# ./bin/tranquility kafka -configFile /opt/tranquility/conf/server.json

Exception in thread “main” while scanning for the next token

found character ‘\t’ that cannot start any token

in ‘reader’, line 21, column 8:

       		"format": "json",

       ^

at org.yaml.snakeyaml.scanner.ScannerImpl.fetchMoreTokens(ScannerImpl.java:415)

at org.yaml.snakeyaml.scanner.ScannerImpl.checkToken(ScannerImpl.java:226)

at org.yaml.snakeyaml.parser.ParserImpl$ParseFlowMappingKey.produce(ParserImpl.java:724)

at org.yaml.snakeyaml.parser.ParserImpl$ParseFlowMappingFirstKey.produce(ParserImpl.java:712)

at org.yaml.snakeyaml.parser.ParserImpl.peekEvent(ParserImpl.java:158)

at org.yaml.snakeyaml.parser.ParserImpl.checkEvent(ParserImpl.java:143)

at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:230)

at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159)

at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:237)

at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159)

at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:237)

at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159)

at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:237)

at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159)

at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:237)

at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159)

at org.yaml.snakeyaml.composer.Composer.composeSequenceNode(Composer.java:204)

at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:157)

at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:237)

at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159)

at org.yaml.snakeyaml.composer.Composer.composeDocument(Composer.java:122)

at org.yaml.snakeyaml.composer.Composer.getSingleNode(Composer.java:105)

at org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:120)

at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:481)

at org.yaml.snakeyaml.Yaml.load(Yaml.java:412)

at com.metamx.common.scala.Yaml$.load(Yaml.scala:34)

at com.metamx.common.scala.Yaml$.load(Yaml.scala:31)

at com.metamx.tranquility.config.TranquilityConfig$$anonfun$2.apply(TranquilityConfig.scala:63)

at com.metamx.tranquility.config.TranquilityConfig$$anonfun$2.apply(TranquilityConfig.scala:62)

at com.metamx.common.scala.Predef$FinallyOps$$anonfun$withFinally$1.apply(Predef.scala:56)

at com.metamx.common.scala.Predef$FinallyOps$$anonfun$withFinally$1.apply(Predef.scala:56)

at com.metamx.tranquility.config.TranquilityConfig$.read(TranquilityConfig.scala:62)

at com.metamx.tranquility.config.TranquilityConfig.read(TranquilityConfig.scala)

at com.metamx.tranquility.kafka.KafkaMain.run(KafkaMain.java:86)

at com.metamx.tranquility.kafka.KafkaMain.main(KafkaMain.java:71)

at com.metamx.tranquility.distribution.DistributionMain$.main(DistributionMain.scala:37)

at com.metamx.tranquility.distribution.DistributionMain.main(DistributionMain.scala)

``

I resolve exception that i mentioned in my first post. So now i can parse the message. Tnaquility received message but can note parse it.

2017-07-24 10:10:50,694 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {jcd_PlayerMetric={receivedCount=100, sentCount=0, droppedCount=0, unparseableCount=100}} pending messages in 1ms and committed offsets in 515ms.

``

Hi Baruch,

I’m facing the same issue. Can you share the details if you got it resolved?

Thanks,

Ramya

Hi,

Your parse spec is not well formed JSON so that’s why it’s complaining, use https://jsonlint.com to check it. However avro_stream will not support JSONParse spec, it only uses the time and dimension parts of any spec you pass it to will not use your flatten spec so you wont be able to consume nested fields right now. This means the avro_stream only supports extracting top level fields. I have modified the avro extensions to able to support JSONParseSpec but not submitted this to the community yet as it’s probably not the best way to do it. I can share it if you need a quick fix tho.

Hi Ramya,

I use hard coding for my solution. I use tranquility API

for spark : https://github.com/druid-io/tranquility/blob/master/docs/spark.md.

It’s work well.