batch indexing hadoop task failed with FileNotFoundException.

Hi team,

Our hadoop batch indexing task failed with

2015-04-01 03:16:16,419 ERROR [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_pulsar_event_merged_2015-04-01T03:15:47.492Z, type=index_hadoop, dataSource=pulsar_event_merged}]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:255)
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:218)
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:197)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: File /tmp/druid-indexing/pulsar_event_merged/2015-04-01T031547.494Z/segmentDescriptorInfo does not exist
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at io.druid.indexer.IndexGeneratorJob.getPublishedSegments(IndexGeneratorJob.java:128)
at io.druid.indexer.HadoopDruidIndexerJob$1.run(HadoopDruidIndexerJob.java:74)
at io.druid.indexer.JobHelper.runJobs(JobHelper.java:137)
at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:80)
at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:294)
… 11 more
Caused by: java.io.FileNotFoundException: File /tmp/druid-indexing/pulsar_event_merged/2015-04-01T031547.494Z/segmentDescriptorInfo does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:362)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1484)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1524)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:564)
at io.druid.indexer.IndexGeneratorJob.getPublishedSegments(IndexGeneratorJob.java:121)
… 15 more
2015-04-01 03:16:16,430 INFO [task-runner-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
“id” : “index_hadoop_pulsar_event_merged_2015-04-01T03:15:47.492Z”,
“status” : “FAILED”,
“duration” : 21468
}

I’ve created the folder /tmp/druid-indexing/ on hadoop with “drwxrwxrwx” attributes.

also attached the whole task log

Please advise, thanks.

xulu

index_hadoop_pulsar_event_merged.zip (576 KB)

Can you check if you had some input data size to be processed really? I have seen this happening when you start indexing job with no data to process. We tried to improve the messaging with https://github.com/druid-io/druid/pull/1179 .

– Himanshu

Hi Himanshu,

Thanks a lot. That should be the reason. The directory /pulsar/raw_soj/20150316/ has many sub directories(one for every 15 minute, in the day). The .avro files are under the subdirectories. After I changed the path to /pulsar/raw_soj/20150316/00_00, the task started working.

I hope to run a task daily to batch ingest all the data generated in the past day. Because we have 96 directories for a day, how could a task handle all of them?
Also from the task log:

2015-04-01 05:33:12,531 INFO [communication thread] org.apache.hadoop.mapred.LocalJobRunner - hdfs://xx:8020/pulsar/raw_soj/20150316/00_00/sojpulsaravro-m-00005.avro:0+139413666 > map
2015-04-01 05:33:12,533 INFO [communication thread] org.apache.hadoop.mapred.LocalJobRunner - hdfs://xx:8020/pulsar/raw_soj/20150316/00_00/sojpulsaravro-m-00001.avro:0+161695514 > map
2015-04-01 05:33:17,167 INFO [communication thread] org.apache.hadoop.mapred.LocalJobRunner - hdfs://xx:8020/pulsar/raw_soj/20150316/00_00/sojpulsaravro-m-00008.avro:0+143314486 > map
2015-04-01 05:33:17,169 INFO [communication thread] org.apache.hadoop.mapred.LocalJobRunner - hdfs://xx:8020/pulsar/raw_soj/20150316/00_00/sojpulsaravro-m-00002.avro:0+144980510 > map
2015-04-01 05:33:17,170 INFO [communication thread] org.apache.hadoop.mapred.LocalJobRunner - hdfs://xx:8020/pulsar/raw_soj/20150316/00_00/sojpulsaravro-m-00003.avro:0+139182446 > map
2015-04-01 05:42:07,880 INFO [communication thread] org.apache.hadoop.mapred.LocalJobRunner - hdfs://xx:8020/pulsar/raw_soj/20150316/00_00/sojpulsaravro-m-00144.avro:0+81797524 > map

the parallelism is not high. How to improve it?

thanks.

Are you talking about hadoop job launched by druid? Druid does not support avro as input out of the box, what setup are you using?

or is this one of your processing pipeline jobs before you launch druid indexer?

In any case, for parallelism for the later case, in case of map-reduce jobs… number of maps is dependent upon the Hadoop InputFormat you’re using (see relevant docs for what to tweak) and number of reducers can be adjusted by specifying “mapreduce.job.reduces” to your hadoop job config.

– Himanshu

This is druid hadoop job. I applied patch https://github.com/druid-io/druid/pull/1177/files#r26157693 and provided a customized AvroInputFormat(extends FileInputFormat) to read .avro files.

    "ioConfig": {
        "type": "hadoop",
        "inputSpec": {
            "type": "static",
            "paths": "hdfs://xx:8020/pulsar/raw_soj/20150316/00_00",
            "inputFormat": "com.ebay.tracking.druid.avro.AvroInputFormat"
        }
    }

If the druid task cannot iterate all subdirectories to look for .avro files, I have to submit 96 jobs for one day’s data. Correct?

thanks.
xulu

Not really, you shouldn’t have to fire 96 indexer jobs. Here are the options…

  1. Implementation of “com.ebay.tracking.druid.avro.AvroInputFormat” can recursively follow subdirectories and read data. From what I can remember, FileInputFormat was already capable of doing so, check your impl

(see docs for following options http://druid.io/docs/latest/Batch-ingestion.html#path-specification )

  1. in the static “pathSpec” give comma separated list of all 96 directories

  2. see if “granularity” path spec is better option for you

It seems FileInputFormat was buggy for some hadoop versions: https://issues.apache.org/jira/browse/MAPREDUCE-3193

make sure you’re using the one that can arbitrarily recurse and read all data in the subdirectories.

– Himanshu

maybe you’re just missing specifying

`mapreduce.input.fileinputformat.input.dir.recursive=true

`

(if you used [https://hadoop.apache.org/docs/current/api/index.html?org/apache/hadoop/mapred/FileInputFormat.html](https://hadoop.apache.org/docs/current/api/index.html?org/apache/hadoop/mapred/FileInputFormat.html) )

`to job properties.

`

-- Himanshu

thanks a lot for all the information. Will try it out and update.

thanks,
xulu

Hi Himanshu,

Some updates. Now the job can run on remote hadoop and map/reduce jobs run successfully, but the druid index job failed with below exception. I can find the temp files are generated, but there’s no segmentDescriptorInfo.

what might be the reason?

thanks.

2015-04-08 07:10:36,091 ERROR [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_pulsar_event_merged_2015-04-08T07:03:21.251Z, type=index_hadoop, dataSource=pulsar_event_merged}]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:255)
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:218)
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:197)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: File /tmp/druid-indexing/pulsar_event_merged/2015-04-08T070324.336Z/segmentDescriptorInfo does not exist.
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at io.druid.indexer.IndexGeneratorJob.getPublishedSegments(IndexGeneratorJob.java:127)
at io.druid.indexer.HadoopDruidIndexerJob$1.run(HadoopDruidIndexerJob.java:74)
at io.druid.indexer.JobHelper.runJobs(JobHelper.java:137)
at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:80)
at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:294)
… 11 more
Caused by: java.io.FileNotFoundException: File /tmp/druid-indexing/pulsar_event_merged/2015-04-08T070324.336Z/segmentDescriptorInfo does not exist.
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:650)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:98)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:704)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:704)
at io.druid.indexer.IndexGeneratorJob.getPublishedSegments(IndexGeneratorJob.java:120)
… 15 more

This is my job definition:

{
“type”: “index_hadoop”,
“spec”: {
“dataSchema”: {
“dataSource”: “pulsar_event_merged”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“timestampSpec”: {
“column”: “timestamp”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”: ,
“dimensionExclusions”: [“guid”, “uid”,“js_evt_kafka_produce_ts”, “timestamp”,“js_ev_type”],
“spatialDimensions”:
}
}
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “count”
},
{
“type” : “hyperUnique”,
“fieldName”: “guid”,
“name”: “guid_hll”
},
{
“type” : “longSum”,
“fieldName”: “dwell”,
“name”: “dwell_ag”
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “hour”,
“queryGranularity”: “NONE”,
“intervals”: [
“2015-03-25/2015-03-26”
]
}
},
“ioConfig”: {
“type”: “hadoop”,
“inputSpec”: {
“type”: “static”,
“paths”: “hdfs://artemis:8020/pulsar/raw_soj/20150325/00_00”,
“inputFormat”: “com.ebay.tracking.druid.avro.AvroInputFormat”
}
},
“tuningConfig”: {
“type”: “hadoop”,
“partitionsSpec”: {
“type”: “hashed”,
“numShards”: 10
},
“shardSpecs”: {},
“leaveIntermediate”: false,
“cleanupOnFailure”: true,
“overwriteFiles”: false,
“ignoreInvalidRows”: true,
“jobProperties”: {
“mapreduce.input.fileinputformat.input.dir.recursive”: “true”,
“mapreduce.input.fileinputformat.list-status.num-threads”: “10”,
“mapreduce.job.reduces”: “10”
},
“combineText”: false,
“persistInHeap”: false,
“ingestOffheap”: false,
“bufferSize”: 134217728,
“aggregationBufferRatio”: 0.5,
“rowFlushBoundary”: 500000
}
}
}

That errors makes me wonder if no segments actually got created. To confirm that: did any index-generator hadoop jobs run? Did they complete successfully? Can you post a log from one of the index-generator reducers?

Also, out of curiosity, what version of Druid are you using?

Hi,

I am using 0.6.171.

From the task log and Hadoop jobtracker, the hadoop map/reduce jobs ran successfully. The files on hadoop after the job failed is shown in this picture:

below is a more detailed druid task log, reducer log also attached.

2015-04-08 09:50:52,058 INFO [task-runner-0] org.apache.hadoop.mapreduce.Job - Job job_1426279437401_565266 completed successfully
2015-04-08 09:50:52,236 INFO [task-runner-0] org.apache.hadoop.mapreduce.Job - Counters: 53
File System Counters
FILE: Number of bytes read=5175293687
FILE: Number of bytes written=10880875352
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=36760981983
HDFS: Number of bytes written=2682017189
HDFS: Number of read operations=3474
HDFS: Number of large read operations=0
HDFS: Number of write operations=20
Job Counters
Killed map tasks=6
Launched map tasks=580
Launched reduce tasks=10
Other local map tasks=11
Data-local map tasks=528
Rack-local map tasks=41
Total time spent by all maps in occupied slots (ms)=97614345
Total time spent by all reduces in occupied slots (ms)=5731419
Total time spent by all map tasks (ms)=32538115
Total time spent by all reduce tasks (ms)=1910473
Total vcore-seconds taken by all map tasks=32538115
Total vcore-seconds taken by all reduce tasks=1910473
Total megabyte-seconds taken by all map tasks=133276119040
Total megabyte-seconds taken by all reduce tasks=7825297408
Map-Reduce Framework
Map input records=62555721
Map output records=52722302
Map output bytes=36381101937
Map output materialized bytes=5594877400
Input split bytes=90118
Combine input records=0
Combine output records=0
Reduce input groups=20
Reduce shuffle bytes=5594877400
Reduce input records=52722302
Reduce output records=52722302
Spilled Records=104042959
Shuffled Maps =5740
Failed Shuffles=0
Merged Map outputs=5740
GC time elapsed (ms)=309442
CPU time spent (ms)=35744580
Physical memory (bytes) snapshot=1098268250112
Virtual memory (bytes) snapshot=2925780250624
Total committed heap usage (bytes)=1219227549696
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
io.druid.indexer.HadoopDruidIndexerConfig$IndexJobCounters
INVALID_ROW_COUNTER=9833415
File Input Format Counters
Bytes Read=36760891865
File Output Format Counters
Bytes Written=2682017189
ALERT:/tmp/druid-indexing
ALERT:pulsar_event_merged
ALERT:2015-04-08T093021.124Z
2015-04-08 09:50:52,293 ERROR [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_
pulsar_event_merged_2015-04-08T09:30:21.121Z, type=index_hadoop, dataSource=pulsar_event_merged}]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:255)
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:218)
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:197)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: File /tmp/druid-indexing/pulsar_event_merged/2015-04-08T093021.124Z/segmentDescriptorIn
fo does not exist.
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at io.druid.indexer.IndexGeneratorJob.getPublishedSegments(IndexGeneratorJob.java:127)
at io.druid.indexer.HadoopDruidIndexerJob$1.run(HadoopDruidIndexerJob.java:74)
at io.druid.indexer.JobHelper.runJobs(JobHelper.java:137)
at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:80)
at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:294)
… 11 more
Caused by: java.io.FileNotFoundException: File /tmp/druid-indexing/pulsar_event_merged/2015-04-08T093021.124Z/segmentDescriptorInfo does not exist.
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:650)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:98)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:704)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:704)
at io.druid.indexer.IndexGeneratorJob.getPublishedSegments(IndexGeneratorJob.java:120)
… 15 more
2015-04-08 09:50:52,306 INFO [task-runner-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
“id” : “index_hadoop_pulsar_event_merged_2015-04-08T09:30:21.121Z”,
“status” : “FAILED”,
“duration” : 1223161
}

thanks.

reducer.log (676 KB)

Hi Team,

please advise. thanks.

Hi Lu, those counters and logs look like they’re for the determine-partitions job. Do you have counters and logs for the index-generator job? It should have run immediately after the determine-partitions job finished. That one is interesting because it is supposed to generate the segments determined by your determine-partitions job, but the File Not Found error message you got indicates that the index-generator may not have actually written any segments. I’m hoping to figure out why.

Also, it looks like some nodes in your cluster are not using UTC for their timezone, based on the fact that the shardSpecs are binned under the -07:00 TZ. This may or may not be a problem (I’m not sure if it works properly). Is it possible for you to try with your cluster in UTC?

Also, out of curiosity, what is your fs.defaultFS set to in your core-site.xml?

If it’s set to something other than the Hadoop NameNode, then setting it to that might help. (Something like: hdfs://name-node.example.com:9000)

Hi,

I am mostly offline for a while with little connectivity, hence the late reply.

One thing I noticed that you are manually supplying number of reducers in your indexer json. Druid itself figures out correct number of reducer for these MR jobs and you are probably overriding all that. Can you try removing the “mapreduce.job.reduces” and rerun your task?

If it does not work, then please attach your indexer task log.

– Himanshu

Hi Himanshu,

I removed the “mapreduce.job.reduces” and reran the task and failed with the same error. Attached the whole indexing task log.

thanks.

index_hadoop_pulsar_event_merged_2015-04-13T01_50_32.841Z.log (159 KB)

Hi Lu, did you have a chance to pull logs from an index-generator reducer? The error you’re getting suggests that the index-generator Hadoop job didn’t actually write any segment descriptors. The index-generator reducers are supposed to do this. The counters you posted indicated they did actually get some data, so I’m wondering if they wrote their segments to the wrong place.