Tranquility does not work with remote Kafka 10?

Hello,

It looks like tranquility require a zookeeper for Kafka although Kafka 10 consumer does not require a zookeeper.

/data/tranquility-distribution-0.9.0-SNAPSHOT/bin/tranquility kafka -configFile myconfig.json

Tranquility (tranquility-distribution-0.9.0-SNAPSHOT) works with localhost Kafka where I just put

“kafka.zookeeper.connect” : “localhost”,

in the myconfig.json.

Here I built 0.9.0-SNAPSHOT tranquility distribution from the source. It works with local Kafka 10 server.

But I could not get tranquility to work with remote Kafka 10 server.

“kafka.zookeeper.connect” : “zookeeperNode”,

Here I removed :9092 from the original zookeeper string.

Below are some logs.

2017-11-02 18:19:12,726 [main] INFO c.m.tranquility.kafka.KafkaConsumer - Kafka topic filter [(MyKafkaTopic)]

2017-11-02 18:19:12,746 [main] INFO k.c.ZookeeperConsumerConnector - [mygroup_myhost-1509646752595-64dd3cd9], begin registering consumer mygroup_myhost-1509646752595-64dd3cd9 in ZK

2017-11-02 18:19:12,769 [main] INFO kafka.utils.ZKCheckedEphemeral - Creating /consumers/clicks/ids/mygroup_myhost-1509646752595-64dd3cd9 (is it secure? false)

2017-11-02 18:19:12,776 [main] INFO kafka.utils.ZKCheckedEphemeral - Result of znode creation is: OK

2017-11-02 18:19:12,776 [main] INFO k.c.ZookeeperConsumerConnector - [mygroup_myhost-1509646752595-64dd3cd9], end registering consumer mygroup_myhost-1509646752595-64dd3cd9 in ZK

2017-11-02 18:19:12,781 [mygroup_myhost-1509646752595-64dd3cd9_watcher_executor] INFO k.c.ZookeeperConsumerConnector - [mygroup_myhost-1509646752595-64dd3cd9], starting watcher executor thread for consumer mygroup_myhost-1509646752595-64dd3cd9

2017-11-02 18:19:12,794 [main] INFO k.c.ZookeeperConsumerConnector - [mygroup_myhost-1509646752595-64dd3cd9], begin rebalancing consumer mygroup_myhost-1509646752595-64dd3cd9 try #0

2017-11-02 18:19:12,876 [main] WARN k.c.ZookeeperConsumerConnector - [mygroup_myhost-1509646752595-64dd3cd9], no brokers found when trying to rebalance.

2017-11-02 18:19:12,879 [main] INFO k.c.ZookeeperConsumerConnector - [mygroup_myhost-1509646752595-64dd3cd9], end rebalancing consumer mygroup_myhost-1509646752595-64dd3cd9 try #0

2017-11-02 18:19:12,882 [main] INFO k.c.ZookeeperConsumerConnector - [mygroup_myhost-1509646752595-64dd3cd9], Creating topic event watcher for topics (MyKafkaTopic)

2017-11-02 18:19:12,889 [main] INFO k.c.ZookeeperConsumerConnector - [mygroup_myhost-1509646752595-64dd3cd9], Topics to consume = List()

2017-11-02 18:19:27,731 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed 0 pending messages in 0ms and committed offsets in 3ms.

2017-11-02 18:19:42,732 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed 0 pending messages in 0ms and committed offsets in 0ms.

Basically the messages are always 0. I did not even get chance to see dropped messages.

Is it Kafka 10 issue? Or something else? Please help!

Thanks,

Joanne

Joanne,

I’m still quite new to Druid, however I have played with Tranquility (0.9.0 snapshot). It looks like Tranquility uses the older zookeeper based offset tracking method found in pre 0.10.x Kafka platforms. The ZK consumer offset tracking method still works with 0.10.x. This means you need to point the kakfa zookeeper field to the zookeeper of your kafka cluster like so:

“properties”: {

“zookeeper.connect”: “zookeeper-address-for-druid-cluster:2181”,

“zookeeper.timeout”: “PT20S”,

“druid.selectors.indexing.serviceName”: “druid/overlord”,

“druid.discovery.curator.path”: “/druid/discovery”,

“kafka.zookeeper.connect”: “zookeeper-address-for-kafka-cluster:2181”,

“kafka.group.id”: “tranquility-group-1”,

“consumer.numThreads”: “2”,

“commit.periodMillis”: “15000”,

“reportDropsAsExceptions”: “false”,

“kafka.consumer.id”: “kafka-tranquility-scus-1”

}

I think Philip is correct here – you should be able to specify the Kafka ZK connect string and things should work even with Kafka 0.10.x.

Please also note that for reading from Kafka, you could evaluate the Druid Kafka indexing service. It’s newer and still under rapid development, but we are intending to gradually transition to that being the recommended way to read from Kafka. It uses the new consumer api and doesn’t need a Kafka ZK connection.

Thank you. Yes, I did specify that string.
“kafka.zookeeper.connect” : “zookeeperNode”,

zookeeperNode is the one for remote Kafka.

Currently my druid uses localhost for druid’s zookeeper. I understand they are different zookeepers.

But did any one get experience on getting remote Kafka ?

I tried with :2181 and without, both are the same.

Basically no data.

I checked the https://github.com/druid-io/tranquility/blob/master/build.sbt their val kafkaVersion = “0.10.1.1” but, they require zookeeper string and does not take bootstrap-server string. I wonder if the zookeeper will tell tranquility about the bootstrap server info?

I did a test on my druid node under kafka_2.11-0.10.2.0:

bin/kafka-console-consumer.sh --bootstrap-server kafka-bootstrapserver1:9092 --new-consumer --topic MyKafkaTopic

has data

bin/kafka-console-consumer.sh --zookeeper zookeeperNode:2181,zoo02:2181,zoo03:2181/ml --topic MyKafkaTopic

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

just hang there forever no data.

I wonder if somebody really get this work before???

Thanks!

Hi Group,

Just let you know that I got Kafka indexing service http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html works with remote Kafka .

Now I can see data from superset.

Thanks!

Joanne