Unable to ingest data from Kafka to Druid through Tranquility

Hi,

I’m trying to push messages from Kafka to Druid through Tranquility. I’ve followed tutorial http://druid.io/docs/latest/tutorials/tutorial-loading-streaming-data.html to configure Tranquility-kafka.json. When I start Tranquility It’s able to consume messages but It’s failing to load into Druid. I can see only following from Tranquility log
22:22:41.872 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {pageviews={receivedCount=1, sentCount=0, failedCount=1}} pending messages in 0ms and committed offsets in 5ms.

Here it consumed 1 message but failed to send that to druid. I’m unable to figure out why it was failing to send to druid.

Thanks in Advance.

Hey Venkat,

Generally if you see “failed” messages but do not see any exceptions, that means you had some messages dropped. The most common reason for this is that your events are outside the configured windowPeriod, or that the timestamp in your messages can’t be parsed. So some things to check are:

  • Make sure you are sending events within the windowPeriod.

  • Make sure you have set up an appropriate timestampSpec in your config, so Tranquility and Druid both know what timestamp your events have.

Thanks a lot Gian for the quick reply. I’ve understand the importance of windowPeriod with your reply. Now I’ve set correct windowPeriod and segmentGranularity and tried to ingest data but no luck. I was getting following exception. I googled it but didn’t understand the issue. I’m new to Druid concepts and configurations. Can you please help me on following exception.

12:10:32.733 [ClusteredBeam-ZkFuturePool-b807524c-3008-48ba-b7bc-44ec184a9190] INFO c.m.t.finagle.FinagleRegistry - Created client for service: firehose:overlord:kafka

_test2-24-0000-0000

12:10:39.522 [finagle/netty3-1] INFO c.m.tranquility.druid.TaskClient - Task index_realtime_kafka_test2_2016-02-24T00:00:00.000Z_0_0 status changed from TaskRunning ->

TaskFailed

12:10:39.529 [finagle/netty3-1] WARN c.m.tranquility.druid.TaskClient - Emitting alert: [anomaly] Loss of Druid redundancy: kafka_test2

{

“dataSource” : “kafka_test2”,

“task” : “index_realtime_kafka_test2_2016-02-24T00:00:00.000Z_0_0”,

“status” : “Closed”

}

12:10:39.540 [finagle/netty3-1] INFO c.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“alerts”,“timestamp”:“2016-02-24T12:10:39.535Z”,“service”:“tranquility”,“host”:“localhost”,“severity”:“anomaly”,“description”:“Loss of Druid redundancy: kafka_test2”,“data”:{“dataSource”:“kafka_test2”,“task”:“index_realtime_kafka_test2_2016-02-24T00:00:00.000Z_0_0”,“status”:“Closed”}}]

12:10:39.564 [finagle/netty3-1] WARN c.m.tranquility.beam.ClusteredBeam - Emitting alert: [anomaly] Beam defunct: overlord/kafka_test2

{

“eventCount” : 1,

“timestamp” : “2016-02-24T00:00:00.000Z”,

“beam” : “MergingPartitioningBeam(DruidBeam(interval = 2016-02-24T00:00:00.000Z/2016-02-25T00:00:00.000Z, partition = 0, tasks = [index_realtime_kafka_test2_2016-02-24T00:00:00.000Z_0_0/kafka_test2-24-0000-0000]))”

}

com.metamx.tranquility.beam.DefunctBeamException: Tasks are all gone: index_realtime_kafka_test2_2016-02-24T00:00:00.000Z_0_0

Hey Venkat,

It looks like the Druid ingestion task index_realtime_kafka_test2_2016-02-24T00:00:00.000Z_0_0 failed. This is what tranquility is talking to.

You could check its logs by looking at the overlord console, try: http://OVERLORD_IP:PORT/console.html. If you are running on your local machine this would be http://localhost:8090/console.html. There should be a ‘index_realtime_kafka_test2_2016-02-24T00:00:00.000Z_0_0’ entry under “failed tasks” with a link to view the logs. Those logs should hopefully make it clear what went wrong.

Hi Gian,

Thank you for spending your valuable time.

I’ve tuned some configurations and able to ingest data into druid. But facing some issues with tasks. Here is scenario

I’m able to see data in druid and in deep storage. But my tasks are not at all shutting down. It’s leading to high number of pending tasks.

Overlord host: x.x.x.134:8080

coordinator host: x.x.x.135:8080

druid.selectors.indexing.serviceName=druid:overlord (common.runtime) (First I given overlord but by seeing exception and one of your answer, I changed it to druid:overlord as tranquility takes defualt value druid/overlord)

druid.selectors.coordinator.serviceName=druid:coordinator (common.runtime)

here is log from overlord console:

2016-02-25T20:14:33,983 INFO [coordinator_handoff_scheduled_0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://**x.x.x.134:8080**
2016-02-25T20:14:33,987 ERROR [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Exception while checking handoff for dataSource[kafka_test_1] Segment[SegmentDescriptor{interval=2016-02-25T20:00:00.000Z/2016-02-25T20:05:00.000Z, version='2016-02-25T20:00:28.923Z', partitionNumber=0}], Will try again after [60000]secs
com.metamx.common.ISE: Error while fetching **serverView status**[404 Not Found] content[]
	at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:87) ~[druid-server-0.8.3.jar:0.8.3]
	at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier.checkForSegmentHandoffs(CoordinatorBasedSegmentHandoffNotifier.java:101) [druid-server-0.8.3.jar:0.8.3]
	at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier$1.run(CoordinatorBasedSegmentHandoffNotifier.java:86) [druid-server-0.8.3.jar:0.8.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_72]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_72]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_72]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_72]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_72]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_72]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_72]
2016-02-25T20:14:33,991 INFO [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2016-02-25T20:00:00.000Z/2016-02-25T20:05:00.000Z, version='2016-02-25T20:00:28.923Z', partitionNumber=0}]]
2016-02-25T20:15:33,982 INFO [coordinator_handoff_scheduled_0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://**x.x.x.134:8080**
2016-02-25T20:15:33,986 ERROR [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Exception while checking handoff for dataSource[kafka_test_1] Segment[SegmentDescriptor{interval=2016-02-25T20:00:00.000Z/2016-02-25T20:05:00.000Z, version='2016-02-25T20:00:28.923Z', partitionNumber=0}], Will try again after [60000]secs
com.metamx.common.ISE: Error while fetching serverView status[404 Not Found] content[]
	at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:87) ~[druid-server-0.8.3.jar:0.8.3]
	at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier.checkForSegmentHandoffs(CoordinatorBasedSegmentHandoffNotifier.java:101) [druid-server-0.8.3.jar:0.8.3]
	at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier$1.run(CoordinatorBasedSegmentHandoffNotifier.java:86) [druid-server-0.8.3.jar:0.8.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_72]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_72]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_72]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_72]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_72]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_72]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_72]
2016-02-25T20:15:33,986 INFO [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2016-02-25T20:00:00.000Z/2016-02-25T20:05:00.000Z, version='2016-02-25T20:00:28.923Z', partitionNumber=0}]]

Here I need some clarifications:
1. I don't understand why [**coordinator_handoff_scheduled_0**] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://**x.x.x.134:8080** is generated? where my **coordinator node is x.x.x.135** and I think it should be generated for x.x.x.135:8080 right?
2. When checked zk base path get /druid/prod/**coordinator/_COORDINATOR**/_c_9cfa7d52-6f21-47c3-9d53-2ab79884b07e-latch-0000000001 Its returning **x.x.x.134:8080(which is overlord)**? Here I stumbled and don't know how did I get my overlord host in coordinator's path? Which service is setting this path?


Hey Venkat,

Is it possible that “druid.service” and “druid.host” are set to something conflicting or incorrect on your coordinator or overlord?

What’s in the children of the znode /druid/discovery/druid:coordinator? If you “get” the children of that znode what are their contents?

Thank you so much Gian. You made my day.
Its my bad. I’ve give wrong(x.x.x.134) on coordinator runtime.properties. I changed it now, hope everything go smoothly atleast from now :slight_smile:

THANKS A LOT!

Great to hear it’s working :slight_smile: