No space available in any of the local directories (Hadoop ingestion task)

Hi Team,

Seeing the below error on middle manager hadoop ingestion task.

java.lang.RuntimeException: org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories. at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?] at org.apache.druid.indexer.DetermineHashedPartitionsJob.run(DetermineHashedPartitionsJob.java:223) ~[druid-indexing-hadoop-0.14.2-incubating.jar:0.14.2-incubating] at org.apache.druid.indexer.JobHelper.runSingleJob(JobHelper.java:372) ~[druid-indexing-hadoop-0.14.2-incubating.jar:0.14.2-incubating] at org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob.run(HadoopDruidDetermineConfigurationJob.java:60) ~[druid-indexing-hadoop-0.14.2-incubating.jar:0.14.2-incubating] at org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner.runTask(HadoopIndexTask.java:617) ~[druid-indexing-service-0.14.2-incubating.jar:0.14.2-incubating] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_252] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252] at org.apache.druid.indexing.common.task.HadoopIndexTask.runInternal(HadoopIndexTask.java:309) ~[druid-indexing-service-0.14.2-incubating.jar:0.14.2-incubating] at org.apache.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:244) [druid-indexing-service-0.14.2-incubating.jar:0.14.2-incubating] at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.14.2-incubating.jar:0.14.2-incubating] at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.14.2-incubating.jar:0.14.2-incubating] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories. at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:399) ~[?:?] at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:151) ~[?:?] at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:132) ~[?:?] at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:116) ~[?:?] at org.apache.hadoop.mapred.LocalDistributedCacheManager.setup(LocalDistributedCacheManager.java:125) ~[?:?] at org.apache.hadoop.mapred.LocalJobRunner$Job.(LocalJobRunner.java:171) ~[?:?] at org.apache.hadoop.mapred.LocalJobRunner.submitJob(LocalJobRunner.java:758) ~[?:?] at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:242) ~[?:?] at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1341) ~[?:?] at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1338) ~[?:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_252] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_252]

Below is middlemanager configuration:

druid.service=druid/middleManager
druid.port=8091
druid.worker.ip=#{DRUID_HOST}

# Number of tasks per middleManager
druid.worker.capacity=5

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xms9g -Xmx9g -XX:MaxDirectMemorySize=6g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Daws.region=us-east-1 -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=/cdp/druid/var/druid/task

# Processing threads and buffers on Peons. Peons inherit middle manager properties (unless overridden)
# The number of cores and memory needs to be properly configured to avoid starvation of resources.
# Below configuration has been arrived based on the balance between ingestion and querying data on middle managers.
druid.indexer.fork.property.druid.segmentCache.locations=[{“path”: “/cdp/druid/var/druid/segment-cache”, “maxSize”:300000000000}]
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=536870912
druid.indexer.fork.property.druid.processing.numMergeBuffers=3
druid.indexer.fork.property.druid.processing.numThreads=3
druid.indexer.fork.property.druid.server.http.numThreads=10

# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=/cdp/druid/var/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=[“org.apache.hadoop:hadoop-client:2.8.3”]

Using AWS EC2 instance 'r5d.4xlarge’ for middle manager nodes. It has a memory of 128 GB and 16 vCPUs

Could someone please help me with this error?

Regards,
Vinay Patil

hadoop indexer will run the job on hadoop cluster. Seems like you are running out of space on your hadoop cluster not on Druid peon.

Hi Gaurav,

Thank you for the response. We are not running the job on a separate hadoop cluster. We are running hadoop jobs on the middle manager node itself. Below are more details:

a) Middle Manager AWS EC2 instance type - r5d.4xlarge

b) Ingestion happening from S3 prefix in a single ingestion task**: The S3 prefix has 256 files (total size of all files combined is 10 GB). File type is parquet**

c) Middle Manager configuration:

druid.service=druid/middleManager
druid.port=8091
druid.worker.ip={ip}

Number of tasks per middleManager

druid.worker.capacity=5

Task launch parameters

druid.indexer.runner.javaOpts=-server -Xms9g -Xmx9g -XX:MaxDirectMemorySize=6g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Daws.region=us-east-1 -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=/cdp/druid/var/druid/task

Processing threads and buffers on Peons. Peons inherit middle manager properties (unless overridden)

The number of cores and memory needs to be properly configured to avoid starvation of resources.

Below configuration has been arrived based on the balance between ingestion and querying data on middle managers.

druid.indexer.fork.property.druid.segmentCache.locations=[{“path”: “/cdp/druid/var/druid/segment-cache”, “maxSize”:300000000000}]
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=536870912
druid.indexer.fork.property.druid.processing.numMergeBuffers=3
druid.indexer.fork.property.druid.processing.numThreads=3
druid.indexer.fork.property.druid.server.http.numThreads=10

Hadoop indexing

druid.indexer.task.hadoopWorkingPath=/cdp/druid/var/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=[“org.apache.hadoop:hadoop-client:2.8.3”]

Could you please let me know if any configuration is incorrect here, or we need to reduce the number of files ingested in a single ingestion task due to disk storage restriction or use a larger Ec2 instance with more storage?

Thank you in advance.

Regards,
Vinay Patil

Hi Vinay,

Are you able to successfully ingest 1 file? Try this test first. Also share your ingestion spec as well.

Hi Gaurav,

I am able to ingest a single file without any issue. Below is the ingestion spec which ingests from S3 prefix. The prefix has 256 files with total size of all files coming around 10 GB.

{
“type”: “index_hadoop”,
“spec”: {
“dataSchema”: {
“dataSource”: “test_data”,
“parser”: {
“type”: “parquet”,
“parseSpec”: {
“format”: “timeAndDims”,
“timestampSpec”: {
“column”: “DATE_TIME”,
“format”: “yyyy-MM-dd HH:mm:ss”
},
“dimensionsSpec”: {
“dimensions”: [
“HIT_DATE”,
“HIT_TIME_GMT”,
“VISITOR_ID”,
“VISIT_ID”,
“HIT_KEY”,
“POST_VISID_HIGH”,
“POST_VISID_LOW”,
“VISID_HIGH”,
“VISID_LOW”,
“VISIT_PAGE_NUM”
],
“dimensionExclusions”: ,
“spatialDimensions”:
}
}
},
“metricsSpec”: [
{
“type”: “longSum”,
“name”: “VIDEO_STARTS”,
“fieldName”: “VIDEO_START”,
“expression”: null
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “HOUR”,
“queryGranularity”: “HOUR”,
“rollup”: true,
“intervals”: [
“2020-09-03T21:00:00.000Z/2020-09-03T22:00:00.000Z”
]
},
“transformSpec”: {
“filter”: null,
“transforms”:
}
},
“ioConfig”: {
“type”: “hadoop”,
“inputSpec”: {
“type”: “multi”,
“children”: [
{
“type”: “static”,
“inputFormat”: “org.apache.druid.data.input.parquet.DruidParquetInputFormat”,
“paths”: “s3a://test-bucket/data/”
},
{
“type”: “dataSource”,
“ingestionSpec”: {
“dataSource”: “test_data”,
“ignoreWhenNoSegments”: true,
“intervals”: [
“2020-09-03T21:00:00.000Z/2020-09-03T22:00:00.000Z”
]
}
}
]
},
“metadataUpdateSpec”: null,
“segmentOutputPath”: null
}
}
}

Regards,
VInay Patil

Not sure if this will be helpful Vinay… I don’t pretent to know enough … well anything really about Hadoop… but in https://stackoverflow.com/questions/37868404/distcp-from-hadoop-to-s3-fails-with-no-space-available-in-any-of-the-local-dire I noted that there’s a Hadoop job property fs.s3a.fast.upload=true - anyway just thought I would mention it…!

(I believe you can add to https://druid.apache.org/docs/latest/ingestion/hadoop.html#jobproperties)

Hi Peter,

Will try setting this property, and see how it goes. Thank you for the help.

Druid Team,
For larger hadoop ingestion tasks, what are the recommended options? Will it be a better to run them on a separate Hadoop cluster, instead of running standalone hadoop ingestion tasks on middle managers?

Regards,
Vinay Patil