TaskRunner does not process index_realtime_task

Hi every one,

I’m using tranquility to send events to Druid for one simple experiment, so all nodes as overlord, broker, coordinator, historical are running on same computer . The simple code to send events to Druid as below

public static void main(String args) throws Exception {
List<Map<String, Object>> listOfEvents = new ArrayList<Map<String, Object>>();
for(int i = 0; i < 2; i++) {
Map<String, Object> sample = new HashMap<String, Object>();
sample.put(“timestamp”, System.currentTimeMillis());
sample.put(“bar”, “i’m bar”);
sample.put(“qux”, “i’m qux”);
listOfEvents.add(sample);
}

            final CuratorFramework curator = CuratorFrameworkFactory
                .builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
                .build();
            curator.start();

            final Service<List<Map<String, Object>>, Integer> druidService = DruidBeams
                            .builder(new Timestamper<Map<String, Object>>() {
                                    @Override
                                    public DateTime timestamp(Map<String, Object> theMap) {
                                            return new DateTime(theMap.get("timestamp"));
                                    }
                            })
                            .curator(curator)
                            .discoveryPath("/druid/discovery")
                            .location(DruidLocation.create("overlord", "druid:firehose:%s", "foo5"))
                            .timestampSpec(new TimestampSpec("timestamp", "auto"))
                            .rollup(DruidRollup.create(DruidDimensions
                                            .specific(ImmutableList.of("bar", "qux")),
                                                    ImmutableList.of(new CountAggregatorFactory("count")),
                                                    QueryGranularity.DAY))
                            .tuning(ClusteredBeamTuning.builder()
                                            .segmentGranularity(Granularity.MINUTE)
                                            .windowPeriod(new Period("P100D"))
                                            .partitions(1).replicants(1).build())
                            .buildJavaService();

            final Future<Integer> numSentFuture = druidService.apply(listOfEvents);
            System.out.println("Wait for confirmation");
            final Integer numSent = Await.result(numSentFuture);
            System.out.println("numSent=" + numSent);
            Await.result(druidService.close());
            curator.close();
    }

``

For the first three times I sent events to Druid everything seems to be fine. Log:

2015-04-11T13:50:18,970 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_realtime_foo5_2015-04-11T13:50:00.000+07:00_0_0_iikgaaid

2015-04-11T13:50:18,971 INFO [pool-7-thread-2] io.druid.indexing.overlord.ForkingTaskRunner - Running command: java -cp libs io.druid.cli.Main internal peon /tmp/persistent/task/index_realtime_foo5_2015-04-11T13:50:00.000+07:00_0_0_iikgaaid/19e6ffb1-5b4d-4e06-8647-5bc940915241/task.json /tmp/persistent/task/index_realtime_foo5_2015-04-11T13:50:00.000+07:00_0_0_iikgaaid/19e6ffb1-5b4d-4e06-8647-5bc940915241/status.json --nodeType realtime

2015-04-11T13:50:18,979 INFO [pool-7-thread-2] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_realtime_foo5_2015-04-11T13:50:00.000+07:00_0_0_iikgaaid output to: /tmp/persistent/task/index_realtime_foo5_2015-04-11T13:50:00.000+07:00_0_0_iikgaaid/19e6ffb1-5b4d-4e06-8647-5bc940915241/log

2015-04-11T13:50:24,211 INFO [qtp263656423-44] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_realtime_foo5_2015-04-11T13:50:00.000+07:00_0_0_iikgaaid]: LockListAction{}

2015-04-11T13:50:24,386 INFO [qtp263656423-31] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_realtime_foo5_2015-04-11T13:50:00.000+07:00_0_0_iikgaaid]: LockAcquireAction{interval=2015-04-11T13:50:00.000+07:00/2015-04-11T13:51:00.000+07:00}

2015-04-11T13:50:24,386 INFO [qtp263656423-31] io.druid.indexing.overlord.TaskLockbox - Created new TaskLockPosse: TaskLockPosse{taskLock=TaskLock{groupId=index_realtime_foo5, dataSource=foo5, interval=2015-04-11T13:50:00.000+07:00/2015-04-11T13:51:00.000+07:00, version=2015-04-11T13:50:24.386+07:00}, taskIds=}

2015-04-11T13:50:24,386 INFO [qtp263656423-31] io.druid.indexing.overlord.TaskLockbox - Added task[index_realtime_foo5_2015-04-11T13:50:00.000+07:00_0_0_iikgaaid] to TaskLock[index_realtime_foo5]

``

Query with {
“queryType”:“timeBoundary”,
“dataSource”:“foo5”
}

The result:

[ {

“timestamp” : “2015-04-11T13:47:00.000+07:00”,

“result” : {

“maxTime” : “2015-04-11T13:50:00.000+07:00”,

“minTime” : “2015-04-11T13:47:00.000+07:00”

}

}

But in the next times, events were not processed although overlord had asked taskRunner to run task, log of overlord:

2015-04-11T13:51:32,714 INFO [qtp263656423-40] io.druid.indexing.overlord.HeapMemoryTaskStorage - Inserting task index_realtime_foo5_2015-04-11T13:51:00.000+07:00_0_0_bnfoocan with status: TaskStatus{id=index_realtime_foo5_2015-04-11T13:51:00.000+07:00_0_0_bnfoocan, status=RUNNING, duration=-1}

2015-04-11T13:51:32,714 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_realtime_foo5_2015-04-11T13:51:00.000+07:00_0_0_bnfoocan

2015-04-11T13:51:52,417 INFO [TaskQueue-StorageSync] io.druid.indexing.overlord.TaskQueue - Synced 4 tasks from storage (0 tasks added, 0 tasks removed).

2015-04-11T13:52:52,417 INFO [TaskQueue-StorageSync] io.druid.indexing.overlord.TaskQueue - Synced 4 tasks from storage (0 tasks added, 0 tasks removed).
io.druid.indexing.overlord.HeapMemoryTaskStorage - Inserting task index_realtime_foo3_2015-04-11T11:38:00.000+07:00_0_0_kjcjmfeb with status: TaskStatus{id=index_realtime_foo3_2015-04-11T11:38:00.000+07:00_0_0_kjcjmfeb, status=RUNNING, duration=-1}

2015-04-11T11:38:05,708 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_realtime_foo3_2015-04-11T11:38:00.000+07:00_0_0_kjcjmfeb

``

Nothing occurred next, I made the query to check result then the result still not change and client returned with exception:

2255 [ClusteredBeam-ZkFuturePool-38f33ee2-3475-481d-83bb-0aff3b5d2f6d] INFO com.metamx.tranquility.finagle.FinagleRegistry - Created client for service: druid:firehose:foo5-51-0000-0000

92282 [Finagle Default Timer-1] WARN com.metamx.tranquility.beam.ClusteredBeam - Emitting alert: [anomaly] Failed to propagate events: overlord/foo5

{

“eventCount” : 2,

“timestamp” : “2015-04-11T13:51:00.000+07:00”,

“beams” : “HashPartitionBeam(DruidBeam(interval = 2015-04-11T13:51:00.000/2015-04-11T13:52:00.000, partition = 0, tasks = [index_realtime_foo5_2015-04-11T13:51:00.000+07:00_0_0_bnfoocan/foo5-51-0000-0000]))”

}

com.twitter.finagle.GlobalRequestTimeoutException: exceeded 1.minutes+30.seconds to druid:firehose:foo5-51-0000-0000 while waiting for a response for the request, including retries (if applicable)

  at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]

92298 [Finagle Default Timer-1] INFO com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“alerts”,“timestamp”:“2015-04-11T13:53:02.880+07:00”,“service”:“tranquility”,“host”:“localhost”,“severity”:“anomaly”,“description”:“Failed to propagate events: overlord/foo5”,“data”:{“exceptionType”:“com.twitter.finagle.GlobalRequestTimeoutException”,“exceptionStackTrace”:“com.twitter.finagle.GlobalRequestTimeoutException: exceeded 1.minutes+30.seconds to druid:firehose:foo5-51-0000-0000 while waiting for a response for the request, including retries (if applicable)\n\tat com.twitter.finagle.NoStacktrace(Unknown Source)\n”,“timestamp”:“2015-04-11T13:51:00.000+07:00”,“beams”:“HashPartitionBeam(DruidBeam(interval = 2015-04-11T13:51:00.000/2015-04-11T13:52:00.000, partition = 0, tasks = [index_realtime_foo5_2015-04-11T13:51:00.000+07:00_0_0_bnfoocan/foo5-51-0000-0000]))”,“eventCount”:2,“exceptionMessage”:“exceeded 1.minutes+30.seconds to druid:firehose:foo5-51-0000-0000 while waiting for a response for the request, including retries (if applicable)”}}]

numSent=0

``

My configurations for common, broker, coordinator, historical, overlord attacked below (I’m using tranquility so I don’t start the realtime node)

Please tell me know what’s wrong with me.

broker_runtime.properties (1.08 KB)

common.runtime.properties (1.81 KB)

coordinator_runtime.properties (1.05 KB)

historical_runtime.properties (1.51 KB)

overlord_runtime.properties (1.72 KB)

Can you check the overlord web console to see if you’re running out of task slots, and the task logs (linked from the console) to see if they are handing off? I suspect that handoff is not occurring promptly, either because windowPeriod is too long or because something is not working with handoff. That would eventually lead to running out of task slots, if you don’t have that many.

The background here is that every segmentGranularity gets a new task, and then that tasks lasts as long as needed for handoff to happen. This would be the end of the segmentGranularity period + windowPeriod + the time it takes to merge and push the segment + the time it takes historicals to load it. Usually those last two things take a few minutes. During that time, you may want to post data for the next period, so you’ll need an extra task slot. With segmentGranularity “minute” you probably need a few extra task slots per partition to accommodate that. With segmentGranularity “hour” you generally would need one extra task slot per partition to accommodate that.

@Gian:

Thank you a lot, it seems relating to task slots that I’m having.

I tried to increase the number of thread by “druid.indexer.fork.property.druid.processing.numThreads” up to 20 then I got the task failed in overlord console

2015-04-13T15:49:12,308 INFO [pool-7-thread-1] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb output to: /tmp/persistent/task/index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb/e4722663-771d-497e-aded-624f841d8497/log

2015-04-13T15:49:15,637 INFO [pool-7-thread-1] io.druid.indexing.overlord.ForkingTaskRunner - Process exited with status[1] for task: index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb

2015-04-13T15:49:15,641 INFO [pool-7-thread-1] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: log/index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb.log

2015-04-13T15:49:15,642 INFO [pool-7-thread-1] io.druid.indexing.overlord.ForkingTaskRunner - Removing temporary directory: /tmp/persistent/task/index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb/e4722663-771d-497e-aded-624f841d8497

2015-04-13T15:49:15,650 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskQueue - Received FAILED status for task: index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb

2015-04-13T15:49:15,651 INFO [pool-7-thread-1] io.druid.indexing.overlord.ForkingTaskRunner - Ignoring request to cancel unknown task: index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb

2015-04-13T15:49:15,651 INFO [pool-7-thread-1] io.druid.indexing.overlord.HeapMemoryTaskStorage - Updating task index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb to status: TaskStatus{id=index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb, status=FAILED, duration=-1}

2015-04-13T15:49:15,652 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskQueue - Task done: RealtimeIndexTask{id=index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb, type=index_realtime, dataSource=foo3}

2015-04-13T15:49:15,652 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskQueue - Task FAILED: RealtimeIndexTask{id=index_realtime_foo3_2015-04-13T15:49:00.000+07:00_0_0_kmoiahdb, type=index_realtime, dataSource=foo3} (-1 run duration)

``

The log was removed instantly so I cannot know why it failed.

I reduced the number of thread down to 10, it worked but the number of task slots is still limited, only 3 tasks although I’ve restarted all from zookeeper, mysql (deleted all tables) to druid nodes.

Hope to see your help soon!

Are you trying to increase the number of tasks that can run at once? If so, that’s actually a different property: “druid.worker.capacity”.

“druid.processing.numThreads” is the number of query threads per task. Setting it high may have caused a failure because each query thread needs to reserve some space off-heap, and you might not have had enough off-heap memory available for that. Fwiw we usually set druid.processing.numThreads to 1 or 2 on realtime nodes.

Actually we generally set druid.processing.numThreads to 2 on realtime nodes.

Oh, great. It works. Thank you once again for your support.

Great to hear!