Source created by Tranquility Kafka not getting queried

I used the following kafka.json file to create new dataSource and insert a row, but when I query, I cannot see the datasource.

Mostly I followed instructions in

I used tranquility kafka

bin/tranquility kafka -configFile /Users/striim/devWork/druid-0.9.2/conf-quickstart/tranquility/kafka.json

If I insert the following row using kafka producer, I don’t see any error reported by tranquility, but I don’t see my datasource being created.

When I use “time” format for timestamp column, the thing crashes, so I set to iso.

-MacBook-Pro:kafka_2.11- ./bin/ --broker-list localhost:9092 --topic pageviews

{“time”: “2017-03-24T20:11:02Z”, “url”: “foo”, “user”: “raj”, “latencyMs”: 32}

(Insertion of this didn’t give any error)

log output from tranquility

2017-03-25 00:25:35,933 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed 0 pending messages in 0ms and committed offsets in 0ms.

2017-03-25 00:25:47,123 [KafkaConsumer-1] INFO c.m.t.kafka.writer.WriterController - Creating EventWriter for topic [pageviews] using dataSource [pageviews-kafka]

2017-03-25 00:25:52,165 [KafkaConsumer-1] INFO o.a.c.f.imps.CuratorFrameworkImpl - Starting

2017-03-25 00:25:52,167 [KafkaConsumer-1] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@3f208b54

2017-03-25 00:25:52,169 [KafkaConsumer-1-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)

2017-03-25 00:25:52,170 [KafkaConsumer-1-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session

2017-03-25 00:25:52,171 [KafkaConsumer-1-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x15afd3e76c900ad, negotiated timeout = 40000

2017-03-25 00:25:52,176 [KafkaConsumer-1-EventThread] INFO o.a.c.f.state.ConnectionStateManager - State change: CONNECTED

2017-03-25 00:25:52,278 [KafkaConsumer-1] INFO c.m.t.finagle.FinagleRegistry - Adding resolver for scheme[disco].

2017-03-25 00:25:53,386 [KafkaConsumer-1] INFO o.h.validator.internal.util.Version - HV000001: Hibernate Validator 5.1.3.Final

2017-03-25 00:25:53,629 [KafkaConsumer-1] INFO io.druid.guice.JsonConfigurator - Loaded class[class io.druid.guice.ExtensionsConfig] from props[druid.extensions.] as [ExtensionsConfig{searchCurrentClassloader=true, directory=‘extensions’, hadoopDependenciesDir=‘hadoop-dependencies’, loadList=null}]

2017-03-25 00:25:53,860 [KafkaConsumer-1] INFO c.metamx.emitter.core.LoggingEmitter - Start: started [true]

2017-03-25 00:25:54,299 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {pageviews={receivedCount=1, sentCount=0, droppedCount=0, unparseableCount=1}} pending messages in 2ms and committed offsets in 4ms.

data Source query fails to show the new source pageviews-kafkac

curl -X ‘GET’ localhost:8082/druid/v2/datasources



This log says “unparseableCount=1” meaning that a message from Kafka couldn’t be parsed.

2017-03-25 00:25:54,299 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {pageviews={receivedCount=1, sentCount=0, droppedCount=0, unparseableCount=1}} pending messages in 2ms and committed offsets in 4ms.

Your message does look like valid JSON and ISO time, though, so I’m not sure exactly why. Maybe you have some funny characters or line breaks in there that are making the message unparseable? Something to do with how the kafka console producer runs in your environment? Maybe try doing export KAFKA_OPTS="-Dfile.encoding=UTF-8" before you run the kafka console producer, sometimes that helps.

Another thing you might run into after that is that your message timestamp looks old (its timestamp is 2017-03-24T20:11:02Z; your tranquillity logs look like their timestamps are a few hours later). Tranquility only loads “current” data. If you meant for the timestamp to be current in your test, then you might need to adjust it a bit.

Thanks Gian.

I tried setting the KAFKA_OPTS to the value that you suggested but I am still seeing unparseble Count. I agree it has to do with Kafka producer as I tried using posix timestamp as well and that also hit same issues.

I tried putting the json event to a file and run Kafka console producer but still hit the same issue.

MacBook-Pro:kafka_2.11- striim$ cat /tmp/data.1

{“time”: “1490636239”, “url”: “foo”, “user”: “alice”, “latencyMs”: “32”}

Any other ideas?