Help with Ingesting KAFKA data into Druid using Tranquility

hi, i am trying to consume a kafka topic using tranquility to index into druid based on examples from this doc, http://druid.io/docs/0.9.1.1/tutorials/tutorial-kafka.html. However, I do not see any messages being pulled off of kafka. There is no indexing task being created.

I am using,

druid - 0.9.1

kafka - 0.9.0

tranquility - 0.8.0

I can see the messages are getting into KAFKA because i can consume them using kafka console-consumer.

Tranquility does establish a connection with Zookeeper successfully. However, there seems to be no activity in tranquility about consuming the KAKFA topic.

Following the tutorial, I was able to successfully push data into the metrics data source using, bin/generate-example-metrics | curl -XPOST -H’Content-Type: application/json’ --data-binary @- http://localhost:8200/v1/post/metrics

Trying to get this working via kafka into the pageviews data source, as explained in the tutorial, does not work for me.

The message i am posting looks like,{“time”: “2016-06-30T21:36:30Z”, “url”: “foobar”, “user”: “alice”, “latencyMs”: 32}

I also made sure the time is within the window period.

Following is how my tranquility config looks like

{
  "dataSources" : {
    "pageviews" : {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "pageviews",
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "time",
                "format" : "auto"
              },
              "dimensionsSpec" : {
                "dimensions" : ["url", "user"]
              },
              "format" : "json"
            }
          },
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "hour",
            "queryGranularity" : "none"
          },
          "metricsSpec" : [
            {
              "type" : "count",
              "name" : "views"
            },
            {
              "name" : "latencyMs",
              "type" : "doubleSum",
              "fieldName" : "latencyMs"
            }
          ]
        },
        "ioConfig" : {
          "type" : "realtime"
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT10M"
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "pageviews"
      }
    }
  },
  "properties" : {
    "zookeeper.connect" : "localhost:2181",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "localhost:2181",
    "[kafka.group.id](http://kafka.group.id/)" : "tranquility-kafka"
  }
}

Let me know if someone has any idea on how to get this working.

~ Ankush

Hey Ankush,

Could you check your tranquility-kafka log (when you run “bin/tranquillity kafka …”) to see if any messages have been dropped or were unparseable, or if there are any exceptions? You could also try producing some more messages to your Kafka topic. It’s possible that the tranquility-kafka consumer started after your messages were inserted and didn’t rewind to the beginning.

Alternatively, if you are just getting started, you could try the new Kafka indexing service in 0.9.1.1. It is currently in “beta” / “experimental” status but it is worth a look if you want to use Druid with Kafka. It offers exactly-once ingestion of both realtime and historical data. Reference docs are here: http://druid.io/docs/0.9.1.1/development/extensions-core/kafka-ingestion.html and we also have written a tutorial here: http://imply.io/docs/latest/tutorial-kafka-indexing-service.html.

Hi Gian,

Thanks for the reply

With the new kafka indexing service things worked great.

With the new indexing service, i was had trouble consuming messages produced by a .Net producer. Not sure if there is any config that caused this. I switched to producing messages via Scala and the message successfully went through tranquility and to druid.

A few clarifications,

Can the indexing service pickup KAFKA messages that were produced before the service comes alive? so e.g. I want to produce a set of messages to a certain topic and then turn on the indexing service to consume.

What config would I need to change to make this happen?

Thanks,

Ankush

Inline.

Hi Gian,

Thanks for the reply

With the new kafka indexing service things worked great.

With the new indexing service, i was had trouble consuming messages produced by a .Net producer. Not sure if there is any config that caused this. I switched to producing messages via Scala and the message successfully went through tranquility and to druid.

Are you using the new Kafka indexing task? Regardless, there are probably error messages in the task logs about why messages couldn’t be consumed.

A few clarifications,

Can the indexing service pickup KAFKA messages that were produced before the service comes alive? so e.g. I want to produce a set of messages to a certain topic and then turn on the indexing service to consume.

What config would I need to change to make this happen?

The new Kafka indexing task can do this. Please note that the new Kafka indexing task doesn’t require Tranquility. The tutorial Gian linked should be helpful in getting started.

Hey Ankush,

Just to add to FJ’s response, by default the Kafka indexing service will begin ingesting events for a new datasource starting from the latest offset available in Kafka. If you want to read events that were produced before the service is started up, you should set ‘useEarliestOffset’ to true in the supervisor config.

Yes, I have been using the new kafka indexing service, and things worked great. I have been able to process my messages. I had the wrong impression that tranquility was somehow involved.

How does the indexing service configure the number of kafka consumer threads? Is it based on the number of partitions for the topic?

Hey Ankush,

The Kafka indexing task uses a single thread for ingestion and utilizes the new Java consumer introduced in Kafka 0.9.

Hi David,

Thanks for the quick reply.

Seems like i had some caching up to do on the new Java Consumer for KAFKA. I have been using the High Level Consumer until now with a thread per partition.

Since the new Java consumer is not thread safe, would you expect multiple Kafka Indexing Service processes to be running to get a notion of parallelism over a KAFKA topic?

Multiple consumer processes joining the same consumer group seems to be the recommended approach from the kafka docs.

~ ankush

Hey Ankush,

Yes, to scale out you would set taskCount > 1 in your supervisor spec which would create multiple indexing tasks to read from the same topic. The indexing service will split partitions across different tasks as best it can; note that partitions can’t be split between tasks, so if you have 2 Kafka partitions and taskCount=3, you’ll wind up with two tasks each reading 1 partition. For most balanced operation, it’s best that the number of Kafka partitions be a multiple of taskCount, e.g. 3 partitions/3 tasks, 6 partitions/3 tasks, 18 partitions/3 tasks, etc.

Hi…
I have got 3 topics configured in kafka which are consumed by Tranquility. I see a very strange thing that during ingestion, the druid task keep runnning without emitting a success message.
On seeing the logs, i came to know that they keep waiting for segment handoff and are not notified by coordinator that the newly created segment has been successfully handed over to historical node.
Observations:

  1. Tranquility runs on a hour based granularity and tasks are submitted on per hour basis.
  2. While some tasks run to completion with success message, other tasks keep waiting to get notified from coordinator.
    Success Logs:
2016-12-07T13:15:23,198 INFO [pixel-feed-2016-12-07T12:00:00.000Z-persist-n-merge] io.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_realtime_pixel-feed_2016-12-07T12:00:00.000Z_0_0] to overlord[http://54.159.125.81:8090/druid/indexer/v1/action]: SegmentInsertAction{segments=[DataSegment{size=23330739, shardSpec=LinearShardSpec{partitionNum=0}, metrics=[count], dimensions=[uid, pixel_id, u1, u2, u3, u4, u5, u6, u7, u8, u9, u10, u11, u12, u13, u14, u15, u16, u17, u18, u19, u20, u21, u22, u23, u24, u25], version='2016-12-07T11:57:03.885Z', loadSpec={type=s3_zip, bucket=analyst-adhoc, key=druid/prod/v1/pixel-feed/2016-12-07T12:00:00.000Z_2016-12-07T13:00:00.000Z/2016-12-07T11:57:03.885Z/0/index.zip}, interval=2016-12-07T12:00:00.000Z/2016-12-07T13:00:00.000Z, dataSource='pixel-feed', binaryVersion='9'}]}
2016-12-07T13:15:23,198 INFO [pixel-feed-2016-12-07T12:00:00.000Z-persist-n-merge] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://54.159.125.81:8090
2016-12-07T13:16:01,293 INFO [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2016-12-07T12:00:00.000Z/2016-12-07T13:00:00.000Z, version='2016-12-07T11:57:03.885Z', partitionNumber=0}]]
2016-12-07T13:17:01,293 INFO [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Segment Handoff complete for dataSource[pixel-feed] Segment[SegmentDescriptor{interval=2016-12-07T12:00:00.000Z/2016-12-07T13:00:00.000Z, version='2016-12-07T11:57:03.885Z', partitionNumber=0}]
2016-12-07T13:17:01,294 INFO [plumber_merge_0] io.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing segment[pixel-feed_2016-12-07T12:00:00.000Z_2016-12-07T13:00:00.000Z_2016-12-07T11:57:03.885Z] at path[/druid/prod/segments/54.197.229.247:8101/54.197.229.247:8101_realtime__default_tier_2016-12-07T11:57:04.002Z_e22e07f97e304e95bcc67ba7ad2cd4d30]
2016-12-07T13:17:01,294 INFO [plumber_merge_0] io.druid.curator.announcement.Announcer - unannouncing [/druid/prod/segments/54.197.229.247:8101/54.197.229.247:8101_realtime__default_tier_2016-12-07T11:57:04.002Z_e22e07f97e304e95bcc67ba7ad2cd4d30]
2016-12-07T13:17:01,321 INFO [plumber_merge_0] io.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_realtime_pixel-feed_2016-12-07T12:00:00.000Z_0_0]: LockReleaseAction{interval=2016-12-07T12:00:00.000Z/2016-12-07T13:00:00.000Z}
2016-12-07T13:17:01,336 INFO [plumber_merge_0] io.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_realtime_pixel-feed_2016-12-07T12:00:00.000Z_0_0] to overlord[http://54.159.125.81:8090/druid/indexer/v1/action]: LockReleaseAction{interval=2016-12-07T12:00:00.000Z/2016-12-07T13:00:00.000Z}
2016-12-07T13:17:01,336 INFO [plumber_merge_0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://54.159.125.81:8090
2016-12-07T13:17:01,383 INFO [plumber_merge_0] io.druid.segment.realtime.plumber.RealtimePlumber - Deleting Index File[/mnt/xvdb/druid/prod/persistent/task/index_realtime_pixel-feed_2016-12-07T12:00:00.000Z_0_0/work/persist/pixel-feed/2016-12-07T12:00:00.000Z_2016-12-07T13:00:00.000Z]
2016-12-07T13:17:01,397 INFO [plumber_merge_0] io.druid.segment.realtime.plumber.RealtimePlumber - Removing sinkKey 1481112000000 for segment pixel-feed_2016-12-07T12:00:00.000Z_2016-12-07T13:00:00.000Z_2016-12-07T11:57:03.885Z
2016-12-07T13:17:01,405 INFO [task-runner-0-priority-0] io.druid.indexing.common.task.RealtimeIndexTask - Job done!
2016-12-07T13:17:01,412 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_realtime_pixel-feed_2016-12-07T12:00:00.000Z_0_0",
  "status" : "SUCCESS",
  "duration" : 4800269
}

**Logs of a task which keep on running**

inatorClient.fetchServerView(CoordinatorClient.java:98) ~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier.checkForSegmentHandoffs(CoordinatorBasedSegmentHandoffNotifier.java:101) [druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier$1.run(CoordinatorBasedSegmentHandoffNotifier.java:86) [druid-server-0.9.1.1.jar:0.9.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_91]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_91]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_91]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.util.concurrent.ExecutionException: org.jboss.netty.channel.ChannelException: Faulty channel in resource pool
at com.google.common.util.concurrent.Futures$ImmediateFailedFuture.get(Futures.java:186) ~[guava-16.0.1.jar:?]
at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:82) ~[druid-server-0.9.1.1.jar:0.9.1.1]
… 9 more
Caused by: org.jboss.netty.channel.ChannelException: Faulty channel in resource pool
at com.metamx.http.client.NettyHttpClient.go(NettyHttpClient.java:137) ~[http-client-1.0.4.jar:?]
at com.metamx.http.client.AbstractHttpClient.go(AbstractHttpClient.java:14) ~[http-client-1.0.4.jar:?]
at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:68) ~[druid-server-0.9.1.1.jar:0.9.1.1]
… 9 more
Caused by: org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /54.159.125.81:8081
at org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:139) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.10.4.Final.jar:?]
… 3 more
2016-12-08T07:51:43,632 INFO [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2016-12-07T11:00:00.000Z/2016-12-07T12:00:00.000Z, version=‘2016-12-07T10:57:00.076Z’, partitionNumber=0}]]
2016-12-08T07:52:33,619 INFO [coordinator_handoff_scheduled_0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://54.159.125.81:8081
2016-12-08T07:52:43,631 WARN [HttpClient-Netty-Boss-0] org.jboss.netty.channel.SimpleChannelUpstreamHandler - EXCEPTION, please implement org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught() for proper handling.
org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /54.159.125.81:8081
at org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:139) [netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83) [netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) [netty-3.10.4.Final.jar:?]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.10.4.Final.jar:?]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.10.4.Final.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
2016-12-08T07:52:43,631 ERROR [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Exception while checking handoff for dataSource[pixel-feed] Segment[SegmentDescriptor{interval=2016-12-07T11:00:00.000Z/2016-12-07T12:00:00.000Z, version=‘2016-12-07T10:57:00.076Z’, partitionNumber=0}], Will try again after [60000]secs
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.jboss.netty.channel.ChannelException: Faulty channel in resource pool
at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:98) ~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier.checkForSegmentHandoffs(CoordinatorBasedSegmentHandoffNotifier.java:101) [druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier$1.run(CoordinatorBasedSegmentHandoffNotifier.java:86) [druid-server-0.9.1.1.jar:0.9.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_91]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_91]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_91]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.util.concurrent.ExecutionException: org.jboss.netty.channel.ChannelException: Faulty channel in resource pool
at com.google.common.util.concurrent.Futures$ImmediateFailedFuture.get(Futures.java:186) ~[guava-16.0.1.jar:?]
at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:82) ~[druid-server-0.9.1.1.jar:0.9.1.1]
… 9 more
Caused by: org.jboss.netty.channel.ChannelException: Faulty channel in resource pool
at com.metamx.http.client.NettyHttpClient.go(NettyHttpClient.java:137) ~[http-client-1.0.4.jar:?]
at com.metamx.http.client.AbstractHttpClient.go(AbstractHttpClient.java:14) ~[http-client-1.0.4.jar:?]
at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:68) ~[druid-server-0.9.1.1.jar:0.9.1.1]
… 9 more
Caused by: org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /54.159.125.81:8081
at org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:139) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.10.4.Final.jar:?]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.10.4.Final.jar:?]
… 3 more
2016-12-08T07:52:43,632 INFO [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2016-12-07T11:00:00.000Z/2016-12-07T12:00:00.000Z, version=‘2016-12-07T10:57:00.076Z’, partitionNumber=0}]]

My Problem:

Why is that some task cannot connect to coordinator for segment handoffs while other are connecting to it succesfully and running to completion ??? I have configured my ports properly and none of the ports are blocking.
Can anyone please help me out ??

Hi Guys,
I have a Spark Job which reads a data and send it to Kafka Broker. From there I couldn’t able to write it to druid. But i can able to see all the posted data in Kafka Consumer. Same way I want to write that to druid. Please help me on this. Thanks in advance.

Thanks,

Raja