Trouble with batch ingest/indexing of HDFS files

I have a file “/words.txt” on HDFS in TSV format that I am trying to batch ingest with Druid.

I can successfully launch the indexing service, coordinator node, and historic node but the indexing fails for the batch.spec file I have (See attached).

I have also attached my common.runtime.properties config file. Also, I have added both the /path/to/hadoop/config and /path/to/hadoop/lib directories to the indexing service classpath.

Stack trace:

[io.druid.extensions:druid-examples, io.druid.extensions:druid-kafka-eight, io.druid.extensions:mysql-metadata-storage, io.druid.extensions:druid-hdfs-storage:0.8.1, org.apache.hadoop:hadoop-client:2.7.1], defaultVersion=‘0.8.1’, localRepository=‘extensions-repo’, remoteRepositories=[https://repo1.maven.org/maven2/, https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local]}]
2015-10-30T18:54:09,571 INFO [task-runner-0] io.druid.indexing.common.task.HadoopIndexTask - Starting a hadoop determine configuration job…
2015-10-30T18:54:09,985 WARN [task-runner-0] org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
2015-10-30T18:54:10,036 INFO [task-runner-0] io.druid.indexer.path.StaticPathSpec - Adding paths[words.txt]
2015-10-30T18:54:10,049 ERROR [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_hdfs-words_2015-10-30T18:53:48.749Z, type=index_hadoop, dataSource=hdfs-words}]
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at com.google.api.client.repackaged.com.google.common.base.Throwables.propagate(Throwables.java:160) ~[google-http-client-1.15.0-rc.jar:?]
at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:132) ~[druid-indexing-service-0.8.1.jar:0.8.1]
at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:173) ~[druid-indexing-service-0.8.1.jar:0.8.1]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:235) [druid-indexing-service-0.8.1.jar:0.8.1]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:214) [druid-indexing-service-0.8.1.jar:0.8.1]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) [?:1.7.0_79]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_79]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_79]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_79]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_79]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_79]
at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_79]
at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:129) ~[druid-indexing-service-0.8.1.jar:0.8.1]
… 7 more
Caused by: java.lang.IllegalStateException: Optional.get() cannot be called on an absent value
at com.google.common.base.Absent.get(Absent.java:47) ~[guava-16.0.1.jar:?]
at io.druid.indexer.HadoopDruidDetermineConfigurationJob.run(HadoopDruidDetermineConfigurationJob.java:61) ~[druid-indexing-hadoop-0.8.1.jar:0.8.1]
at io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing.runTask(HadoopIndexTask.java:289) ~[druid-indexing-service-0.8.1.jar:0.8.1]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_79]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_79]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_79]
at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_79]
at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:129) ~[druid-indexing-service-0.8.1.jar:0.8.1]
… 7 more
2015-10-30T18:54:10,069 INFO [task-runner-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
“id” : “index_hadoop_hdfs-words_2015-10-30T18:53:48.749Z”,
“status” : “FAILED”,
“duration” : 9299
}
2015-10-


``

# Extensions (no deep storage model is listed - using local fs for deep storage - not recommended for production)

# Also, for production to use mysql add, "io.druid.extensions:mysql-metadata-storage"

druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight", "io.druid.extensions:mysql-metadata-storage", "io.druid.extensions:druid-hdfs-storage:0.8.1", "org.apache.hadoop:hadoop-client:2.7.1"]

druid.extensions.localRepository=extensions-repo

# Zookeeper

druid.zk.service.host=localhost

# Metadata Storage (use something like mysql in production by uncommenting properties below)

# by default druid will use derby

druid.metadata.storage.type=mysql

druid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid

druid.metadata.storage.connector.user=druid

druid.metadata.storage.connector.password=diurd

# Deep storage (local filesystem for examples - don't use this in production)

druid.storage.type=hdfs

druid.storage.storageDirectory=/

# Query Cache (we use a simple 10mb heap-based local cache on the broker)

druid.cache.type=local

druid.cache.sizeInBytes=10000000

# Indexing service discovery

druid.selectors.indexing.serviceName=overlord

# Monitoring (disabled for examples, if you enable SysMonitor, make sure to include sigar jar in your cp)

# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]

# Metrics logging (disabled for examples - change this to logging or http in production)

druid.emitter=noop

``

Been trying to get this to work for a couple hours so any help is appreciated.

Forgot to add my batch.spec file:

{

“type”: “index_hadoop”,

“spec” : {

“dataSchema”: {

“dataSource”: “hdfs-words”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “tsv”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “iso”

},

“columns”: [“timestamp”, “word”],

“dimensionsSpec”: {

“dimensions”: [“word”]

}

}

},

“metricsSpec”: [

{

“type”: “count”,

“name”: “count”

}

],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “minute”,

“queryGranularity”: “none”

}

},

“ioConfig”: {

“type” : “hadoop”,

“inputSpec” : {

“type” : “static”,

“paths” : “hdfs://words.txt”

}

},

“tuningConfig” : {

“type”: “hadoop”

}

}

}

``

Hi Andrei,

I believe it’s because you didn’t set intervals for granularitySpec in your batch.spec file, is it intentional?

Thanks for the tip Bingkun! Looks like it is now attempting to read from HDFS but running into some parser errors. I’ll look into it and update here.

Hmm okay so I fixed the parsing issue but now I am getting another cryptic stack trace:

2015-11-03T00:37:11,791 INFO [task-runner-0] io.druid.indexer.JobHelper - Deleting path[/tmp/druid-indexing/hdfs-words/2015-11-03T003611.991Z]
2015-11-03T00:37:11,835 ERROR [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_hdfs-words_2015-11-03T00:36:11.967Z, type=index_hadoop, dataSource=hdfs-words}]
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
        at com.google.api.client.repackaged.com.google.common.base.Throwables.propagate(Throwables.java:160) ~[google-http-client-1.15.0-rc.jar:?]
        at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:132) ~[druid-indexing-service-0.8.1.jar:0.8.1]
        at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:206) ~[druid-indexing-service-0.8.1.jar:0.8.1]
        at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:235) [druid-indexing-service-0.8.1.jar:0.8.1]
        at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:214) [druid-indexing-service-0.8.1.jar:0.8.1]
        at java.util.concurrent.FutureTask.run(FutureTask.java:262) [?:1.7.0_79]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_79]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_79]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_79]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_79]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_79]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_79]
        at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:129) ~[druid-indexing-service-0.8.1.jar:0.8.1]
        ... 7 more
Caused by: com.metamx.common.ISE: Job[class io.druid.indexer.LegacyIndexGeneratorJob] failed!
        at io.druid.indexer.JobHelper.runJobs(JobHelper.java:202) ~[druid-indexing-hadoop-0.8.1.jar:0.8.1]
        at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:96) ~[druid-indexing-hadoop-0.8.1.jar:0.8.1]
        at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:259) ~[druid-indexing-service-0.8.1.jar:0.8.1]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_79]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_79]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_79]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_79]
        at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:129) ~[druid-indexing-service-0.8.1.jar:0.8.1]
        ... 7 more
2015-11-03T00:37:11,847 INFO [task-runner-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_hadoop_hdfs-words_2015-11-03T00:36:11.967Z",
  "status" : "FAILED",
  "duration" : 48394
}

``

Hi Andrei,

Can you check the log output from the Hadoop task itself? The log location should be printed out somewhere in the I think the actual cause for the job failure is recorded in there, I’m not sure it’s shown in the overlord log.

I believe the task logs are stored at the location specified by the druid.indexer.logs.directory setting under “File Task Log”:

http://druid.io/docs/latest/configuration/indexing-service.html

  • Jon

Hi Jonathan,

My previous post is actually a snippet of the Hadoop index task log. I have attached the full version in this reply.

index_hadoop_hdfs-words_2015-11-03T01:59:23.303Z.log (4.46 MB)

Hi Andrei,

I think there may be too many shards created, since you are specifying hourly segment granularity with an interval that spans a year; you’ll get one segment per interval, and 8760 segments is probably too much for the local job runner to handle:

2015-11-03T02:00:20,590 WARN [Thread-54] org.apache.hadoop.mapred.LocalJobRunner - job_local132579983_0001

java.lang.OutOfMemoryError: GC overhead limit exceeded

at java.util.Hashtable$Entry.clone(Hashtable.java:1052) ~[?:1.7.0_79]

at java.util.Hashtable$Entry.clone(Hashtable.java:1052) ~[?:1.7.0_79]

at java.util.Hashtable$Entry.clone(Hashtable.java:1052) ~[?:1.7.0_79]

at java.util.Hashtable.clone(Hashtable.java:613) ~[?:1.7.0_79]

at org.apache.hadoop.conf.Configuration.(Configuration.java:661) ~[hadoop-common-2.3.0.jar:?]

at org.apache.hadoop.mapred.JobConf.(JobConf.java:439) ~[hadoop-mapreduce-client-core-2.3.0.jar:?]

at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.(LocalJobRunner.java:294) ~[hadoop-mapreduce-client-common-2.3.0.jar:?]

at org.apache.hadoop.mapred.LocalJobRunner$Job.getReduceTaskRunnables(LocalJobRunner.java:350) ~[hadoop-mapreduce-client-common-2.3.0.jar:?]

at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:526) [hadoop-mapreduce-client-common-2.3.0.jar:?]

2015-11-03T02:00:21,582 INFO [task-runner-0] org.apache.hadoop.mapreduce.Job - Job job_local132579983_0001 failed with state FAILED due to: NA

Depending on the size of your words.txt data, you could reduce the number of shards by:

  • Reducing the interval to a shorter range than a year

  • Increasing the segment granularity to monthly or yearly

A general recommendation for segment size is ~5 million rows per segment:

http://druid.io/docs/latest/design/segments.html

If the words.txt dataset is too large for those two options above to be feasible, you could try running the task with a larger cluster.

Thanks,

Jon

Thanks so much Jonathan that fixed the issue for me!

I currently have the indexing service, coordinator node, and historic node running on my VM but my queries are returning empty responses. I’ll be working on solving this tomorrow so might have more questions then but thanks for all the help so far!

Hi Jonathan. Thanks for all the help yesterday.

It looks like the indexing service is running fine now but the historic node cannot load the segment because it cannot find the file to load. I have attached a stack trace that I got from the historical node log.

I have tried using both of the following configurations for common.runtime.properties but neither seems to solve the problem.

Deep storage (local filesystem for examples - don’t use this in production)

druid.storage.type=hadoop

druid.storage.storageDirectory=hdfs://tmp/druid-indexing

``

Deep storage (local filesystem for examples - don’t use this in production)

druid.storage.type=local

druid.storage.storageDirectory=/tmp/druid-indexing

``

Because of that I’m not really sure why Druid is trying to load from “/tmp/hdfs-words/hdfs-words/*”. Any hints on what the problem might be?

historical.log (9.57 KB)

For HDFS, druid.storage.type has to be set to ‘hdfs’ and not ‘hadoop’.

As for why /tmp/hdfs-words/hdfs-words is being used, can you double check the common.runtime.properties being loaded by the historical?

I can’t think of any reason why that would occur unless “storageDirectory” was set to /tmp/hdfs-words, maybe you have two configuration files since I saw this line from your hadoop task log from earlier:

2015-11-03T00:37:11,791 INFO [task-runner-0] io.druid.indexer.JobHelper - Deleting path[/tmp/druid-indexing/hdfs-words/2015-11-03T003611.991Z]

Hi Jonathan, good catch.

I have searched my ${DRUID_INSTALL}/config directory and I specify druid.storage.storageDirectory only in the common.runtime.properties file. I’ll keep looking around but can’t find why this would be happening…