how to retry failed persist-n-merge of realtime task

Hello,
I am ingesting realtime data in druid using tranquility-kafka, with segmentGranularity:DAY and windowPeriod:“PT10M”
Yesterday I stopped the server before the end of the day (it’s a test installation running on my personal computer) and when I restarted it this morning it tried to do the handoff of the segment of the previous day.
It failed throwing an OutOfMemoryError, and the task shows up on the overlord console with status FAILED. All the data pertaining to that segment is no longer queryable.
I would like to increase the heap size of the peon and try again, but I don’t know how to re-submit a FAILED realtime task. Can you point me in the right direction?
Here is the relevant part of the log file
2016-05-27T07:06:04,248 ERROR [content20stats_stage-2016-05-26T00:00:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Failed to persist merged index[content20stats_stage]: {class=io.druid.segment.realtime.plumber.RealtimePlumber, exceptionType=class java.io.IOException, exceptionMessage=Map failed, interval=2016-05-26T00:00:00.000Z/2016-05-27T00:00:00.000Z}
java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940) ~[?:1.8.0_72-internal]
at com.google.common.io.Files.map(Files.java:864) ~[guava-16.0.1.jar:?]
at com.google.common.io.Files.map(Files.java:851) ~[guava-16.0.1.jar:?]
at com.google.common.io.Files.map(Files.java:818) ~[guava-16.0.1.jar:?]
at com.google.common.io.Files.map(Files.java:790) ~[guava-16.0.1.jar:?]
at com.metamx.common.io.smoosh.SmooshedFileMapper.mapFile(SmooshedFileMapper.java:124) ~[java-util-0.27.7.jar:?]
at io.druid.segment.IndexIO$DefaultIndexIOHandler.convertV8toV9(IndexIO.java:559) ~[druid-processing-0.9.0.jar:0.9.0]
at io.druid.segment.IndexMerger.makeIndexFiles(IndexMerger.java:1009) ~[druid-processing-0.9.0.jar:0.9.0]
at io.druid.segment.IndexMerger.merge(IndexMerger.java:421) ~[druid-processing-0.9.0.jar:0.9.0]
at io.druid.segment.IndexMerger.mergeQueryableIndex(IndexMerger.java:242) ~[druid-processing-0.9.0.jar:0.9.0]
at io.druid.segment.IndexMerger.mergeQueryableIndex(IndexMerger.java:215) ~[druid-processing-0.9.0.jar:0.9.0]
at io.druid.segment.realtime.plumber.RealtimePlumber$4.doRun(RealtimePlumber.java:536) [druid-server-0.9.0.jar:0.9.0]
at io.druid.common.guava.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:42) [druid-common-0.9.0.jar:0.9.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_72-internal]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_72-internal]
Caused by: java.lang.OutOfMemoryError: Map failed
at sun.nio.ch.FileChannelImpl.map0(Native Method) ~[?:1.8.0_72-internal]
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937) ~[?:1.8.0_72-internal]
… 15 more
2016-05-27T07:06:04,305 INFO [content20stats_stage-2016-05-26T00:00:00.000Z-persist-n-merge] io.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing segment[content20stats_stage_2016-05-26T00:00:00.000Z_2016-05-27T00:00:00.000Z_2016-05-26T09:48:47.100Z] at path[/druid/segments/192.168.1.58:8100/192.168.1.58:8100_realtime__default_tier_2016-05-27T07:04:37.678Z_dae424b64edb4b94a5033dba955aa22b0]
2016-05-27T07:06:04,305 INFO [content20stats_stage-2016-05-26T00:00:00.000Z-persist-n-merge] io.druid.curator.announcement.Announcer - unannouncing [/druid/segments/192.168.1.58:8100/192.168.1.58:8100_realtime__default_tier_2016-05-27T07:04:37.678Z_dae424b64edb4b94a5033dba955aa22b0]
2016-05-27T07:06:04,708 INFO [content20stats_stage-2016-05-26T00:00:00.000Z-persist-n-merge] io.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_realtime_content20stats_stage_2016-05-26T00:00:00.000Z_0_0]: LockReleaseAction{interval=2016-05-26T00:00:00.000Z/2016-05-27T00:00:00.000Z}
2016-05-27T07:06:04,710 INFO [content20stats_stage-2016-05-26T00:00:00.000Z-persist-n-merge] io.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_realtime_content20stats_stage_2016-05-26T00:00:00.000Z_0_0] to overlord[http://192.168.1.58:8084/druid/indexer/v1/action]: LockReleaseAction{interval=2016-05-26T00:00:00.000Z/2016-05-27T00:00:00.000Z}
2016-05-27T07:06:06,531 INFO [content20stats_stage-2016-05-26T00:00:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Deleting Index File[/data/tmp/druid/task/index_realtime_content20stats_stage_2016-05-26T00:00:00.000Z_0_0/work/persist/content20stats_stage/2016-05-26T00:00:00.000Z_2016-05-27T00:00:00.000Z]
2016-05-27T07:06:06,585 INFO [content20stats_stage-2016-05-26T00:00:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Removing sinkKey 1464220800000 for segment content20stats_stage_2016-05-26T00:00:00.000Z_2016-05-27T00:00:00.000Z_2016-05-26T09:48:47.100Z
2016-05-27T07:06:06,587 ERROR [task-runner-0-priority-0] io.druid.indexing.common.task.RealtimeIndexTask - Failed to finish realtime task: {class=io.druid.indexing.common.task.RealtimeIndexTask, exceptionType=class com.metamx.common.ISE, exceptionMessage=Exception occurred during persist and merge.}
com.metamx.common.ISE: Exception occurred during persist and merge.
at io.druid.segment.realtime.plumber.RealtimePlumber.finishJob(RealtimePlumber.java:644) ~[druid-server-0.9.0.jar:0.9.0]
at io.druid.indexing.common.task.RealtimeIndexTask.run(RealtimeIndexTask.java:405) [druid-indexing-service-0.9.0.jar:0.9.0]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:338) [druid-indexing-service-0.9.0.jar:0.9.0]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:318) [druid-indexing-service-0.9.0.jar:0.9.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_72-internal]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_72-internal]
2016-05-27T07:06:06,588 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[RealtimeIndexTask{id=index_realtime_content20stats_stage_2016-05-26T00:00:00.000Z_0_0, type=index_realtime, dataSource=content20stats_stage}]
com.metamx.common.ISE: Exception occurred during persist and merge.
at io.druid.segment.realtime.plumber.RealtimePlumber.finishJob(RealtimePlumber.java:644) ~[druid-server-0.9.0.jar:0.9.0]
at io.druid.indexing.common.task.RealtimeIndexTask.run(RealtimeIndexTask.java:405) ~[druid-indexing-service-0.9.0.jar:0.9.0]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:338) [druid-indexing-service-0.9.0.jar:0.9.0]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:318) [druid-indexing-service-0.9.0.jar:0.9.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_72-internal]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_72-internal]
2016-05-27T07:06:06,591 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
“id” : “index_realtime_content20stats_stage_2016-05-26T00:00:00.000Z_0_0”,
“status” : “FAILED”,
“duration” : 98888
}
and these are the configurations of overlord and middleManager
$ cat conf/druid/middleManager/runtime.properties
druid.service=middleManager
druid.port=8083

Number of tasks per middleManager

#leave default, which is Number of available processors - 1
#druid.worker.capacity=3

Task launch parameters

druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dlog-file=peon
druid.indexer.task.baseTaskDir=/data/tmp/druid/task

HTTP server threads

druid.server.http.numThreads=25

Processing threads and buffers

druid.processing.buffer.sizeBytes=536870912
druid.processing.numThreads=2

Hadoop indexing

druid.indexer.task.hadoopWorkingPath=/data/tmp/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=[“org.apache.hadoop:hadoop-client:2.3.0”]
druid.indexer.task.restoreTasksOnRestart=true
$ cat conf/druid/overlord/runtime.properties
druid.port=8084
druid.service=overlord

druid.indexer.queue.startDelay=PT30S

druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata

Thanks,
Tommaso

Hey Tommaso,

Currently Tranquility based tasks stay failed once they have failed. It is designed to “move on” in that case and not go back and revisit older failed time ranges. In development or testing, you can deal with this by using a shorter segmentGranularity or by creating a new datasource to submit a new task for the same interval. In production, people usually approach this by having replicas (so if one fails there are others that are still running) or by running hybrid batch/realtime pipelines.

Fwiw, in Druid 0.9.1 there will be a new experimental Kafka-based ingestion mechanism that can go back and re-read data when it fails to index for whatever reason. If this sounds interesting to you then stay on the lookout for Druid 0.9.1 related news.

Hi Gian, I am facing similar issue. I am using 0.9.1 and using indexing services via Java tranquillity. Once the task fails, it’s not allowing to submit the task with same datasource again at all - not after end of segment duration as well. I am quite frustrated to figure out what’s happening actually because there are no logs on Druid as well on why it is not creating the task.

I am not talking about loading earlier data, but at least the new task should be created for ingesting current data keeping the same data source right ?

Changing the data source names does no looks feasible and good solution to me.

Regards,

Arpan Khagram

Arpan, this is by design.

What are you trying to do? If you are loading streaming data from kafka for example, I highly recommend using the Kafka exactly once indexing task:

https://imply.io/docs/latest/tutorial-kafka-indexing-service.html

Yes we started using KAFKA Indexing Service now although it’s still in BETA. The results are impressive and it’s quite stable as well.

Also below problem is there while we were using tranquillity jars to insert data into Druid through Apache Flink. The problem is - if you are running on single node set up of Druid and if you restart middle Manager then it does not allow data to be inserted into same datasource through tranquillity and we don’t see any task getting created as well.

Regards,

Arpan Khagram

+91 8308993200

Hi Arpan, if you restart the MM, tasks should come back up and resume and Tranquility should be able to continue to push data. Do you have any logs when tasks fail?