Kafka Indexing Service


I’m reading up on the new experimental Kafka Indexing Service.

As I understand, I need to submit a supervisor spec for each of the data sources. The spec is then persisted to the metadata storage and will survive restarts of the overload, etc?

Also, does this indexing service mean that I can remove the specFile configuration from the realtime node?



As you’ll note from 0.9.0, we’ve changed our entire getting started process to use the indexing service and remove the focus from realtime nodes. If you choose to use the indexing service you should no longer need realtime nodes.

Hi Robin,

Yeah that is correct - each data source is managed by one supervisor so you’ll need to submit a spec for each data source. The spec is persisted to metadata storage and will automatically be loaded when an overlord restarts or takes leadership in a HA configuration.


Started on this today and I have run into a few problems.

First KafkaIndexTask didn’t work out that good.

The waiting for handoff appeared about 30 minutes once every minute before ending like this:

2016-06-07T12:02:25,928 INFO [coordinator_handoff_scheduled_0] com.metamx.http.client.pool.ChannelResourceFactory - Generating:

2016-06-07T12:02:25,972 INFO [coordinator_handoff_scheduled_0] io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier - Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2016-06-07T00:00:00.000Z/2016-06-08T00:00:00.000Z, version=‘2016-06-07T10:35:25.486Z’, partitionNumber=0}]]

2016-06-07T12:02:42,387 INFO [qtp1261810493-31] io.druid.indexing.kafka.KafkaIndexTask - Stopping gracefully.

2016-06-07T12:02:42,387 INFO [qtp1261810493-31] io.druid.indexing.kafka.KafkaIndexTask - Interrupting run thread (status: [PUBLISHING])

2016-06-07T12:02:42,512 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Shutting down…

2016-06-07T12:02:42,637 INFO [appenderator_persist_0] io.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing segment[datasource_2016-06-07T00:00:00.000Z_2016-06-08T00:00:00.000Z_2016-06-07T10:35:25.486Z] at path[/druid/segments/]

2016-06-07T12:02:42,638 INFO [appenderator_persist_0] io.druid.curator.announcement.Announcer - unannouncing [/druid/segments/]

2016-06-07T12:02:43,170 INFO [appenderator_persist_0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Removing sink for segment[datasource_2016-06-07T00:00:00.000Z_2016-06-08T00:00:00.000Z_2016-06-07T10:35:25.486Z].

2016-06-07T12:02:43,370 INFO [task-runner-0-priority-0] io.druid.indexing.kafka.KafkaIndexTask - The task was asked to stop before completing

2016-06-07T12:02:43,414 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_datasource_d173e68e1e4aa53_llplejna] status changed to [SUCCESS].

2016-06-07T12:02:43,671 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {

“id” : “index_kafka_datasource_d173e68e1e4aa53_llplejna”,

“status” : “SUCCESS”,

“duration” : 5418515


The second supervisor working on another kafka-topic didn’t publish any events at all, rather just logging the following a few times during it’s lifetime:

2016-06-07T11:00:24,799 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483647




I’m having the same problem as well with the kafka index task. The middle manager logs show it waiting for handoff and then marking the coordinator dead

{“t”: “2016-06-07T13:46:54.716+0000”, “message”: “Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2016-06-07T13:00:00.000Z/2016-06-07T14:00:00.000Z, version=‘2016-06-07T13:00:01.283Z’, partitionNumber=1}, SegmentDescriptor{interval=2016-06-07T12:00:00.000Z/2016-06-07T13:00:00.000Z, version=‘2016-06-07T12:00:27.406Z’, partitionNumber=1}, SegmentDescriptor{interval=2016-06-07T11:00:00.000Z/2016-06-07T12:00:00.000Z, version=‘2016-06-07T11:00:05.975Z’, partitionNumber=2}]]”, “level”: “INFO”, “logger_name”: “io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier”, “thread_name”: “coordinator_handoff_scheduled_0”}
{“t”: “2016-06-07T13:47:10.207+0000”, “message”: “Generating: http://localhost:4000”, “level”: “INFO”, “logger_name”: “com.metamx.http.client.pool.ChannelResourceFactory”, “thread_name”: “HttpPostEmitter-1-0”}
{“t”: “2016-06-07T13:47:11.762+0000”, “message”: “Generating: http://localhost:4000”, “level”: “INFO”, “logger_name”: “com.metamx.http.client.pool.ChannelResourceFactory”, “thread_name”: “HttpPostEmitter-1-0”}
{“t”: “2016-06-07T13:47:12.048+0000”, “message”: “Generating: http://localhost:4000”, “level”: “INFO”, “logger_name”: “com.metamx.http.client.pool.ChannelResourceFactory”, “thread_name”: “HttpPostEmitter-1-0”}
{“t”: “2016-06-07T13:47:12.192+0000”, “message”: “Marking the coordinator 2147483645 dead.”, “level”: “INFO”, “logger_name”: “org.apache.kafka.clients.consumer.internals.AbstractCoordinator”, “thread_name”: “task-runner-0-priority-0”}

The coordinator node is logging very actively and it’s not crashing (supervisord log is clean)

{“t”: “2016-06-07T13:53:52.827+0000”, “message”: “Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent a race condition in which the coordinator would drop all segments if it started running cleanup before it finished polling the metadata storage for available segments for the first time.”, “level”: “INFO”, “logger_name”: “io.druid.server.coordinator.helper.DruidCoordinatorCleanupUnneeded”, “thread_name”: “Coordinator-Exec–0”}
{“t”: “2016-06-07T13:53:52.827+0000”, “message”: “No segments found. Cannot balance.”, “level”: “INFO”, “logger_name”: “io.druid.server.coordinator.helper.DruidCoordinatorBalancer”, “thread_name”: “Coordinator-Exec–0”}
{“t”: “2016-06-07T13:53:52.828+0000”, “message”: “Load Queues:”, “level”: “INFO”, “logger_name”: “io.druid.server.coordinator.helper.DruidCoordinatorLogger”, “thread_name”: “Coordinator-Exec–0”}
{“t”: “2016-06-07T13:53:52.828+0000”, “message”: “Server[, historical, _default_tier] has 0 left to load, 0 left to drop, 0 bytes queued, 0 bytes served.”, “level”: “INFO”, “logger_name”: “io.druid.server.coordinator.helper.DruidCoordinatorLogger”, “thread_name”: “Coordinator-Exec–0”}
{“t”: “2016-06-07T13:53:52.828+0000”, “message”: “Server[, historical, _default_tier] has 0 left to load, 0 left to drop, 0 bytes queued, 0 bytes served.”, “level”: “INFO”, “logger_name”: “io.druid.server.coordinator.helper.DruidCoordinatorLogger”, “thread_name”: “Coordinator-Exec–0”}

Which is weird b/c there’s totally a bunch of active segments in mysql

This is interesting. The coordinator log shows a new segment added with size=0 (which might exclude it from being loaded)

{“t”: “2016-06-07T14:21:43.216+0000”, “message”: “Adding segment[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=4, partitions=0}, metrics=[events, tAbandon, tAbandon.cnt, tIvr, tIvr.cnt, tAnswered, tAnswered.cnt, tAcd, tAcd.cnt, tTalk, tTalk.cnt, tTalkCompleted, tTalkCompleted.cnt, tHeld, tHeld.cnt, tHeldCompleted, tHeldCompleted.cnt, tAcw, tAcw.cnt, tHandle, tHandle.cnt, tVoicemail, tVoicemail.cnt, tUserResponseTime, tUserResponseTime.cnt, tAgentResponseTime, tAgentResponseTime.cnt, nOffered, nOverSla, nTransferred, nDialerAttempted, nDialerConnected, nDialerAbandoned, nError, mCreatedVoicemailSize, mCreatedVoicemailDuration, mDeletedVoicemailSize, mDeletedVoicemailDuration, oMailboxVoicemailSize, oMailboxVoicemailDuration, oMailboxVoicemailCount], dimensions=, version=‘2016-06-07T14:00:10.365Z’, loadSpec={}, interval=2016-06-07T14:00:00.000Z/2016-06-07T15:00:00.000Z, dataSource=‘imetrics’, binaryVersion=‘null’}] for server[DruidServerMetadata{name=‘’, host=‘’, maxSize=0, tier=’_default_tier’, type=‘indexer-executor’, priority=‘0’}]”, “level”: “DEBUG”, “logger_name”: “io.druid.client.CoordinatorServerView”, “thread_name”: “CoordinatorServerView-0”}

But if you check mysql for that same segment, size=59840

select * from druid_segments where datasource = “imetrics” and start = “2016-06-07T14:00:00.000Z” limit 1 \G;
*************************** 1. row ***************************
id: imetrics_2016-06-07T14:00:00.000Z_2016-06-07T15:00:00.000Z_2016-06-07T14:00:10.365Z_1
dataSource: imetrics
created_date: 2016-06-07T14:21:09.100Z
start: 2016-06-07T14:00:00.000Z
end: 2016-06-07T15:00:00.000Z
partitioned: 1
version: 2016-06-07T14:00:10.365Z
used: 1
payload: {“dataSource”:“imetrics”,“interval”:“2016-06-07T14:00:00.000Z/2016-06-07T15:00:00.000Z”,“version”:“2016-06-07T14:00:10.365Z”,“loadSpec”:{“type”:“s3_zip”,“bucket”:“inin-dca-useast1-analytics”,“key”:“druid/analytics-druid-v3/v3/realtime/archive/imetrics/2016-06-07T14:00:00.000Z_2016-06-07T15:00:00.000Z/2016-06-07T14:00:10.365Z/1/index.zip”},“dimensions”:“orgid,cid,chat,chatroom,media,interaction_type,session_id,dg,direction,ani,callid,user,station,team,edge,dnis,wrapup_code,address_from,address_to,email”,“metrics”:“events,tAbandon,tAbandon.cnt,tIvr,tIvr.cnt,tAnswered,tAnswered.cnt,tAcd,tAcd.cnt,tTalk,tTalk.cnt,tTalkCompleted,tTalkCompleted.cnt,tHeld,tHeld.cnt,tHeldCompleted,tHeldCompleted.cnt,tAcw,tAcw.cnt,tHandle,tHandle.cnt,tVoicemail,tVoicemail.cnt,tUserResponseTime,tUserResponseTime.cnt,tAgentResponseTime,tAgentResponseTime.cnt,nOffered,nOverSla,nTransferred,nDialerAttempted,nDialerConnected,nDialerAbandoned,nError,mCreatedVoicemailSize,mCreatedVoicemailDuration,mDeletedVoicemailSize,mDeletedVoicemailDuration,oMailboxVoicemailSize,oMailboxVoicemailDuration,oMailboxVoicemailCount”,“shardSpec”:{“type”:“numbered”,“partitionNum”:1,“partitions”:0},“binaryVersion”:9,“size”:59840,“identifier”:“imetrics_2016-06-07T14:00:00.000Z_2016-06-07T15:00:00.000Z_2016-06-07T14:00:10.365Z_1”}
1 row in set (0.04 sec)

Hey Robin,

It sounds like you might have to set a completionTimeout > PT30M in your supervisor config file since the tasks are taking longer than 30 minutes to publish. You should make sure that your coordinator is alive and that you have sufficient capacity in your historical nodes to load the segments from deep storage, otherwise the segment will never be handed off and the task will sit around until it gets killed by the timeout.

For the second supervisor, hard to tell what’s going on without seeing the logs. I see that ‘marking the coordinator … dead’ log happen fairly frequently in my tasks and I find it usually recovers (but doesn’t tell you it recovered). If you post your logs (task + overlord) and supervisor spec I can see if there’s anything interesting going on in there.

Hey Drew,
The ‘coordinator’ in the logs isn’t actually the Druid coordinator; the message is generated by the Kafka client:
{“t”: “2016-06-07T13:47:12.192+0000”, “message”: “Marking the coordinator 2147483645 dead.”, “level”: “INFO”, “logger_name”: “org.apache.kafka.clients.consumer.internals.AbstractCoordinator”, “thread_name”: “task-runner-0-priority-0”}

I believe this message is logged whenever the Kafka consumer gets disconnected from a broker which can happen transiently for a number of different reasons.

If you’re still having trouble with segments being published / handed off, if you post your task + overlord logs and supervisor spec I can take a look.

Awesome, thanks David

druid.zip (1.76 MB)

Hey Drew,

Your task logs are pretty intense since your logger seems to be aggregating messages from all your tasks and it’s not immediately clear what’s going on with any given task. What issue(s) are you actually seeing with the indexing tasks? Are they not handing off to the historicals properly? I saw some tasks succeeding for both evalmetrics and imetrics, so are only some of the tasks failing to publish?

I’ll see a task complete, make its way deep storage, and get written to the druid_segments table. However the coordinator never seems to get the hint that it should assign the segment to a historical node. When it polls the mysql table it sees zero segments available. I enabled debug logging on the coordinator (why it’s so verbose) hoping to see an explanation of why it wont assign any segments, but I’m not seeing anything of interest.

Another interesting log message from the middle manager

{“t”: “2016-06-07T16:14:10.330+0000”, “message”: “[GET] Channel disconnected before response complete”, “level”: “WARN”, “logger_name”: “com.metamx.http.client.NettyHttpClient”, “thread_name”: “HttpClient-Netty-Worker-2”}
{“t”: “2016-06-07T16:14:10.330+0000”, “message”: “Exception while checking handoff for dataSource[imetrics] Segment[SegmentDescriptor{interval=2016-06-07T15:00:00.000Z/2016-06-07T16:00:00.000Z, version=‘2016-06-07T15:00:01.907Z’, partitionNumber=2}], Will try again after [60000]secs”, “level”: “ERROR”, “logger_name”: “io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier”, “thread_name”: “coordinator_handoff_scheduled_0”}
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.jboss.netty.channel.ChannelException: Channel disconnected
at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:98) ~[druid-server-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier.checkForSegmentHandoffs(CoordinatorBasedSegmentHandoffNotifier.java:101) [druid-server-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier$1.run(CoordinatorBasedSegmentHandoffNotifier.java:86) [druid-server-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [?:1.7.0_75]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [?:1.7.0_75]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [?:1.7.0_75]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.7.0_75]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_75]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_75]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
Caused by: java.util.concurrent.ExecutionException: org.jboss.netty.channel.ChannelException: Channel disconnected
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) ~[guava-16.0.1.jar:?]
at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[guava-16.0.1.jar:?]
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[guava-16.0.1.jar:?]
at io.druid.client.coordinator.CoordinatorClient.fetchServerView(CoordinatorClient.java:68) ~[druid-server-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]

I can curl no problem and get the payload almost instantly.

Ah, ignore my last message, I think the log messages have changed and I was looking for the wrong thing. I’ll keep investigating.

Hey Drew,

My suspicion is that there might be something misconfigured with your load/drop rules (http://druid.io/docs/latest/operations/rule-configuration.html). Could you check the default rules in the coordinator console (default http://{coordinator_ip}:8081/#/datasources, click on the pencil next to ‘default rules’) and make sure that it has load rules for your segments? The easiest thing to do is add a load forever rule for _default_tier as rule #1. If that seems okay, maybe you can try ingesting into a new datasource in case the datasource level rules got messed up at some point.


No worries about the second supervisor. Turns out the producer was silently dropping new events since the kafka restart.

I’ll get back to you on the publish. However, taking more than 30 minutes to publish this small dataset sounds wrong. Could it be related to running threads? I found that the historical node was configured to run in one thread only.



Hi again,

No, changing the thread setting didn’t help at all.

Any hints on which nodes I should look so I can consolidate the logs during this time?

The second supervisor had about 20 events and the merged data was about 8k, so this must be related to something else.


This is complete supervisor log from where the two kafka supervisor tasks have been running for 3600s until they are killed 1800s later.


David, either the coordinator console UI is broken (firefox/chrome) or there’s a disagreement in state. The load rule is in mysql, shows up in the UI, but data sources are missing, and tasks are missing. Overlord & coordinator are on the same machine with the same common runtime config. The overlord seems to be running smooth. I attached some screenshots & mysql state so you can compare.

druid2.zip (1.01 MB)

Hey Drew,

Could you post a dump of your druid_segments table as well as your server configs (common.realtime.properties + realtime.properties for each node)? I haven’t seen any clear hints of what’s wrong yet. The ingestion part is working fine; the problem seem to be that the coordinator is unable to read the segments table for some reason.

Figured it out. This is what happens when you provide the mysql .9.0 extension to druid .9.1… Whoops. Fixing that got everything back on track. Thanks a bunch for your help David

Wow that’s brutal. Glad you figured it out!