Tranquility fails to read via Kafka

kafka.json file Contents:

{

“dataSources” : [

{

“spec” : {

“dataSchema” : {

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “hour”,

“queryGranularity” : “none”

},

“dataSource” : “auction_data-kafka”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“timestampSpec” : {

“column” : “timestamp”,

“format” : “auto”

},

“format” : “json”,

“flattenSpec” : {

“useFieldDiscovery” : true,

“fields” : [

{

“type” : “root”,

“name” : “id”

},

{

“type” : “root”,

“name” : “timestamp”

},

{

“type” : “path”,

“name” : “banner-width”,

“expr” : “$.imp[0].banner.w”

},

{

“type” : “path”,

“name” : “banner-height”,

“expr” : “$.imp[0].banner.h”

},

{

“type” : “path”,

“name” : “banner-api”,

“expr” : “$.imp[0].banner.api”

},

{

“type” : “path”,

“name” : “banner-pos”,

“expr” : “$.imp[0].banner.pos”

},

{

“type” : “path”,

“name” : “video-width”,

“expr” : “$.imp[0].video.w”

},

{

“type” : “path”,

“name” : “video-height”,

“expr” : “$.imp[0].video.h”

},

{

“type” : “path”,

“name” : “video-api”,

“expr” : “$.imp[0].video.api”

},

{

“type” : “path”,

“name” : “video-skippable”,

“expr” : “$.imp[0].video.skippable”

},

{

“type” : “path”,

“name” : “video-min-dur”,

“expr” : “$.imp[0].video.minDuration”

},

{

“type” : “path”,

“name” : “video-max-dur”,

“expr” : “$.imp[0].video.maxDuration”

},

{

“type” : “path”,

“name” : “video-start-delay”,

“expr” : “$.imp[0].video.startDelay”

},

{

“type” : “path”,

“name” : “video-pos”,

“expr” : “$.imp[0].video.pos”

},

{

“type” : “path”,

“name” : “device-geo-country”,

“expr” : “$.device.geo.country”

},

{

“type” : “path”,

“name” : “device-geo-region”,

“expr” : “$.device.geo.region”

},

{

“type” : “path”,

“name” : “device-geo-zip”,

“expr” : “$.device.geo.zip”

},

{

“type” : “path”,

“name” : “device-geo-dma”,

“expr” : “$.device.geo.dma”

},

{

“type” : “path”,

“name” : “device-make”,

“expr” : “$.device.make”

},

{

“type” : “path”,

“name” : “device-model”,

“expr” : “$.device.model”

},

{

“type” : “path”,

“name” : “device-connection-type”,

“expr” : “$.device.connectiontype”

},

{

“type” : “path”,

“name” : “device-os”,

“expr” : “$.device.os”

},

{

“type” : “path”,

“name” : “device-type”,

“expr” : “$.device.devicetype”

},

{

“type” : “path”,

“name” : “device-ip”,

“expr” : “$.device.ip”

},

{

“type” : “path”,

“name” : “device-language”,

“expr” : “$.device.language”

},

{

“type” : “path”,

“name” : “device-carrier”,

“expr” : “$.device.carrier”

},

{

“type” : “path”,

“name” : “user-id”,

“expr” : “$.user.id”

},

{

“type” : “path”,

“name” : “user-data”,

“expr” : “$.user.data”

},

{

“type” : “root”,

“name” : “inv_source_id”

},

{

“type” : “path”,

“name” : “app-id”,

“expr” : “$.app.id”

},

{

“type” : “path”,

“name” : “app-cat”,

“expr” : “$.app.cat”

},

{

“type” : “path”,

“name” : “bid_response-timestamp”,

“expr” : “$.bid_response[0].timestamp”

},

{

“type” : “path”,

“name” : “bid_response-CUR”,

“expr” : “$.bid_response[0].CUR”

},

{

“type” : “path”,

“name” : “bid_response-crid”,

“expr” : “$.bid_response[0].seat_bid[0].bid[0].crid”

},

{

“type” : “path”,

“name” : “bid_response-cid”,

“expr” : “$.bid_response[0].seat_bid[0].bid[0].cid”

},

{

“type” : “path”,

“name” : “bid_response-price”,

“expr” : “$.bid_response[0].seat_bid[0].bid[0].price”

},

{

“type” : “path”,

“name” : “bid_response-advertiser-id”,

“expr” : “$.bid_response[0].seat_bid[0].bid[0].advertiser.aid”

},

{

“type” : “path”,

“name” : “bid_response-advertiser-cat”,

“expr” : “$.bid_response[0].seat_bid[0].bid[0].advertiser.cat”

}

]

},

}

},

“metricsSpec” : [

{

“type” : “count”,

“name” : “count”

}

]

},

“tuningConfig” : {

“type” : “realtime”,

“intermediatePersistPeriod” : “PT10M”,

“windowPeriod” : “PT10M”,

“maxRowsInMemory” : 750000

}

},

“properties” : {

“task.partitions” : “1”,

“task.replicants” : “1”,

“topicPattern” : “auction”,

“topicPattern.priority” : “1”

}

}

],

“properties” : {

“zookeeper.connect” : “localhost:2181”,

“zookeeper.timeout” : “PT20S”,

“druid.selectors.indexing.serviceName” : “druid/overlord”,

“druid.discovery.curator.path” : “/druid/discovery”,

“kafka.zookeeper.connect” : “localhost:2181”,

“kafka.group.id” : “tranquility-kafka”,

“consumer.numThreads” : “2”,

“commit.periodMillis” : “150000”,

“reportDropsAsExceptions” : “false”

}

}

Error I am getting:

2018-08-03 03:30:07,720 [KafkaConsumer-1] INFO c.m.t.kafka.writer.WriterController - Creating EventWriter for topic [auction] using dataSource [auction-kafka]

2018-08-03 03:30:07,870 [KafkaConsumer-1] INFO o.a.c.f.imps.CuratorFrameworkImpl - Starting

2018-08-03 03:30:07,885 [KafkaConsumer-1] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@67e68b81

2018-08-03 03:30:07,893 [KafkaConsumer-1-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)

2018-08-03 03:30:07,894 [KafkaConsumer-1-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2181, initiating session

2018-08-03 03:30:07,901 [KafkaConsumer-1-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x164fd8899ae0015, negotiated timeout = 40000

2018-08-03 03:30:07,910 [KafkaConsumer-1-EventThread] INFO o.a.c.f.state.ConnectionStateManager - State change: CONNECTED

2018-08-03 03:30:08,094 [KafkaConsumer-1] INFO c.m.t.finagle.FinagleRegistry - Adding resolver for scheme[disco].

2018-08-03 03:30:10,059 [KafkaConsumer-1] INFO o.h.validator.internal.util.Version - HV000001: Hibernate Validator 5.1.3.Final

2018-08-03 03:30:10,447 [KafkaConsumer-1] INFO io.druid.guice.JsonConfigurator - Loaded class[class io.druid.guice.ExtensionsConfig] from props[druid.extensions.] as [ExtensionsConfig{searchCurrentClassloader=true, directory=‘extensions’, hadoopDependenciesDir=‘hadoop-dependencies’, hadoopContainerDruidClasspath=‘null’, loadList=null}]

2018-08-03 03:30:10,888 [KafkaConsumer-1] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:

java.lang.IllegalArgumentException: Instantiation of [simple type, class io.druid.data.input.impl.StringInputRowParser] value failed: json can not be null or empty

at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:2774) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:2700) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at io.druid.segment.indexing.DataSchema.getParser(DataSchema.java:101) ~[io.druid.druid-server-0.9.1.jar:0.9.1]

at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:301) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:204) ~[io.druid.tranquility-core-0.8.2.jar:0.8.2]

at com.metamx.tranquility.kafka.KafkaBeamUtils$.createTranquilizer(KafkaBeamUtils.scala:40) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

at com.metamx.tranquility.kafka.KafkaBeamUtils.createTranquilizer(KafkaBeamUtils.scala) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.(TranquilityEventWriter.java:64) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

at com.metamx.tranquility.kafka.writer.WriterController.createWriter(WriterController.java:171) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

at com.metamx.tranquility.kafka.writer.WriterController.getWriter(WriterController.java:98) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.2.jar:0.8.2]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_181]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_181]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_181]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_181]

at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Instantiation of [simple type, class io.druid.data.input.impl.StringInputRowParser] value failed: json can not be null or empty

at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapException(StdValueInstantiator.java:405) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:234) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:167) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:398) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1064) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:264) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:156) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:126) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:113) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:84) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:132) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:41) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:2769) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

… 15 common frames omitted

Caused by: java.lang.IllegalArgumentException: json can not be null or empty

at com.jayway.jsonpath.internal.Utils.notEmpty(Utils.java:383) ~[com.jayway.jsonpath.json-path-2.1.0.jar:2.1.0]

at com.jayway.jsonpath.JsonPath.compile(JsonPath.java:465) ~[com.jayway.jsonpath.json-path-2.1.0.jar:2.1.0]

at com.metamx.common.parsers.JSONPathParser.generateFieldPaths(JSONPathParser.java:139) ~[com.metamx.java-util-0.27.9.jar:na]

at com.metamx.common.parsers.JSONPathParser.(JSONPathParser.java:64) ~[com.metamx.java-util-0.27.9.jar:na]

at io.druid.data.input.impl.JSONParseSpec.makeParser(JSONParseSpec.java:74) ~[io.druid.druid-api-0.9.1.jar:0.9.1]

at io.druid.data.input.impl.StringInputRowParser.(StringInputRowParser.java:56) ~[io.druid.druid-api-0.9.1.jar:0.9.1]

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_181]

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_181]

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_181]

at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_181]

at com.fasterxml.jackson.databind.introspect.AnnotatedConstructor.call(AnnotatedConstructor.java:125) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:230) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

… 26 common frames omitted

2018-08-03 03:30:10,896 [KafkaConsumer-1] INFO c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets

2018-08-03 03:30:10,899 [Curator-Framework-0] INFO o.a.c.f.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting

2018-08-03 03:30:10,909 [KafkaConsumer-1] INFO org.apache.zookeeper.ZooKeeper - Session: 0x164fd8899ae0015 closed

2018-08-03 03:30:10,910 [KafkaConsumer-1-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x164fd8899ae0015

2018-08-03 03:30:10,915 [KafkaConsumer-1] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a], ZKConsumerConnector shutting down

2018-08-03 03:30:10,928 [KafkaConsumer-1] INFO k.c.ZookeeperTopicEventWatcher - Shutting down topic event watcher.

2018-08-03 03:30:10,931 [KafkaConsumer-1] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1533267007090] Stopping leader finder thread

2018-08-03 03:30:10,932 [KafkaConsumer-1] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a-leader-finder-thread], Shutting down

2018-08-03 03:30:10,933 [tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a-leader-finder-thread] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a-leader-finder-thread], Stopped

2018-08-03 03:30:10,934 [KafkaConsumer-1] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a-leader-finder-thread], Shutdown completed

2018-08-03 03:30:10,934 [KafkaConsumer-1] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1533267007090] Stopping all fetchers

2018-08-03 03:30:10,934 [KafkaConsumer-1] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a-0-0], Shutting down

2018-08-03 03:30:10,935 [ConsumerFetcherThread-tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a-0-0] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a-0-0], Stopped

2018-08-03 03:30:10,936 [KafkaConsumer-1] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a-0-0], Shutdown completed

2018-08-03 03:30:10,937 [KafkaConsumer-1] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1533267007090] All connections stopped

2018-08-03 03:30:10,939 [ZkClient-EventThread-12-localhost] INFO org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread.

2018-08-03 03:30:10,943 [KafkaConsumer-1] INFO org.apache.zookeeper.ZooKeeper - Session: 0x164fd8899ae0014 closed

2018-08-03 03:30:10,944 [main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x164fd8899ae0014

2018-08-03 03:30:10,944 [KafkaConsumer-1] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a], ZKConsumerConnector shutdown completed in 29 ms

2018-08-03 03:30:10,945 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Commit thread interrupted.

2018-08-03 03:30:10,946 [KafkaConsumer-1] INFO c.m.tranquility.kafka.KafkaConsumer - Finished clean shutdown.

2018-08-03 03:30:11,175 [tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a_watcher_executor] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a], stopping watcher executor thread for consumer tranquility-kafka_ip-10-0-2-15-1533267006935-ab2be96a

any help would be greatly appreciated. Thanks

Hi Rahul,

I believe Tranquility does not yet support flattenSpecs. The Kafka indexing service built-in to Druid does support this, I’d suggest trying that instead: http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html

Thanks Gian. Big Help. am able to achieve it through Druid indexing service. Thanks a ton :).