Native Batch Ingestion fails due to not being able to create a segment for the interval

I have DS 1 and 2

I want to get the data of DS1 and put it in DS2

Both DS are actively ingesting data from kafka
and both are being actively queried.

The span of ingestion, is the past ten days and both DS have segment granularity of MONTH.

Now, when I dispatch a native job, with priority set to 150 (the highest) and many number of parallel running tasks, it says:
org.apache.druid.java.util.common.ISE: Failed to add a row with timestamp[2022-10-02T00:00:00.000Z]
This is the error of the tasks.

Can you share the full stack trace… sounds like you may be running into locking contention given that your streaming ingestions will be locking the whole month.
Can you help us understand the use case?

Yes, here is the stack trace of one of the failed tasks, I have removed the DS name to DS_NAME:

2022-10-14T09:47:38,071 INFO [task-runner-0-priority-0] org.apache.druid.storage.s3.S3DataSegmentPuller - Loaded 132512698 bytes from [CloudObjectLocation{bucket='BUCKET_NAME', path='path/to/cloud/DS_NAME/2022-10-01T00:00:00.000Z_2022-11-01T00:00:00.000Z/2022-10-01T00:00:01.069Z/1002/22509da0-c723-4624-a462-253366ca40cd/index.zip'}] to [/path/to/storage/single_phase_sub_task_DS_NAME_gfbgplcn_2022-10-14T09:47:30.614Z/work/TEMP_DIR/DS_NAME/2022-10-01T00:00:00.000Z_2022-11-01T00:00:00.000Z/2022-10-01T00:00:01.069Z/1002]
2022-10-14T09:47:38,485 WARN [task-runner-0-priority-0] org.apache.druid.indexing.input.DruidSegmentInputEntity - Could not clean temporary segment file: /path/to/storage/single_phase_sub_task_DS_NAME_gfbgplcn_2022-10-14T09:47:30.614Z/work/TEMP_DIR/DS_NAME/2022-10-01T00:00:00.000Z_2022-11-01T00:00:00.000Z/2022-10-01T00:00:01.069Z/1002
2022-10-14T09:47:38,486 INFO [task-runner-0-priority-0] org.apache.druid.storage.s3.S3DataSegmentPuller - Pulling index at path[CloudObjectLocation{bucket='BUCKET_NAME', path='path/to/cloud/DS_NAME/2022-10-01T00:00:00.000Z_2022-11-01T00:00:00.000Z/2022-10-01T00:00:01.069Z/10015/ece34629-4b68-4237-80d8-dd00f76de4b6/index.zip'}] to outDir[/path/to/storage/single_phase_sub_task_DS_NAME_gfbgplcn_2022-10-14T09:47:30.614Z/work/TEMP_DIR/DS_NAME/2022-10-01T00:00:00.000Z_2022-11-01T00:00:00.000Z/2022-10-01T00:00:01.069Z/10015]
2022-10-14T09:47:39,494 INFO [task-runner-0-priority-0] org.apache.druid.storage.s3.S3DataSegmentPuller - Loaded 108100238 bytes from [CloudObjectLocation{bucket='BUCKET_NAME', path='path/to/cloud/DS_NAME/2022-10-01T00:00:00.000Z_2022-11-01T00:00:00.000Z/2022-10-01T00:00:01.069Z/10015/ece34629-4b68-4237-80d8-dd00f76de4b6/index.zip'}] to [/path/to/storage/single_phase_sub_task_DS_NAME_gfbgplcn_2022-10-14T09:47:30.614Z/work/TEMP_DIR/DS_NAME/2022-10-01T00:00:00.000Z_2022-11-01T00:00:00.000Z/2022-10-01T00:00:01.069Z/10015]
2022-10-14T09:47:40,062 WARN [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Cannot allocate segment for timestamp[2022-10-08T00:00:00.000Z], sequenceName[index_parallel_DS_NAME_dmfganjd_2022-10-14T09:46:37.669Z_2].
2022-10-14T09:47:40,064 WARN [task-runner-0-priority-0] org.apache.druid.indexing.input.DruidSegmentInputEntity - Could not clean temporary segment file: /path/to/storage/single_phase_sub_task_DS_NAME_gfbgplcn_2022-10-14T09:47:30.614Z/work/TEMP_DIR/DS_NAME/2022-10-01T00:00:00.000Z_2022-11-01T00:00:00.000Z/2022-10-01T00:00:01.069Z/10015
2022-10-14T09:47:40,065 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Exception while running task[AbstractTask{id='single_phase_sub_task_DS_NAME_gfbgplcn_2022-10-14T09:47:30.614Z', groupId='index_parallel_DS_NAME_dmfganjd_2022-10-14T09:46:37.669Z', taskResource=TaskResource{availabilityGroup='single_phase_sub_task_DS_NAME_gfbgplcn_2022-10-14T09:47:30.614Z', requiredCapacity=1}, dataSource='DS_NAME', context={forceTimeChunkLock=true, useLineageBasedSegmentAllocation=true}}]
org.apache.druid.java.util.common.ISE: Failed to add a row with timestamp[2022-10-08T00:00:00.000Z]
	at org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask.generateAndPushSegments(SinglePhaseSubTask.java:397) ~[druid-indexing-service-0.22.1.jar:0.22.1]
	at org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask.runTask(SinglePhaseSubTask.java:209) ~[druid-indexing-service-0.22.1.jar:0.22.1]
	at org.apache.druid.indexing.common.task.AbstractBatchIndexTask.run(AbstractBatchIndexTask.java:159) ~[druid-indexing-service-0.22.1.jar:0.22.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471) [druid-indexing-service-0.22.1.jar:0.22.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443) [druid-indexing-service-0.22.1.jar:0.22.1]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_282]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2022-10-14T09:47:40,069 INFO [task-runner-0-priority-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "single_phase_sub_task_DS_NAME_gfbgplcn_2022-10-14T09:47:30.614Z",
  "status" : "FAILED",
  "duration" : 4348,
  "errorMsg" : "org.apache.druid.java.util.common.ISE: Failed to add a row with timestamp[2022-10-08T00:00:00.000Z]",
  "location" : {
    "host" : null,
    "port" : -1,
    "tlsPort" : -1
  }
}```

The interesting point is that, when I dispatch it with only 1 max concurrent task, it doesn’t fail, but more than 1, they all fail shortly after the start.

In general when doing stream ingestion, the streaming process hold a lock at the segment granularity level. In your case this is the whole month.
So when the batch job tries to ingest for the same timeframe it will not be allowed to do so.
One possibility here is that you change the segment granularity to DAY and use the batch job to only update up to yesterday and therefore avoid the contention.

I’m curious, what is the use case? Perhaps with more info the community can help propose alternatives.

We have some data in another DS that need to transfer it to onto the DS2.
So you are telling me I need to create a compaction task for the month to be converted into a series of day-based segment and then do the native batch?

Yes and change the segment granularity in the real-time ingestion to DAY as well so that it only holds a lock on the most recent day.
Alternatively, if this is a one time process, then you can just suspend the real-time ingestion on DS2, do the batch and then resume the ingestion.

1 Like

If I suspend it, I must wait for some time (like an hour or two) for the current tasks to finish their job and release the locks, right?

Yes. You should wait for the tasks to complete graciously. Not sure how long that will take.

1 Like