Running Hadoop Indexer in Remote Mode

Hello everyone,

How can I run Hadoop indexer job in a remote Hadoop cluster?

I have a production CDH Hadoop cluster and it has the Jackson dependency conflicts when trying to post batch indexing tasks to Overlord.

So decided to apply the workaround - building a custom fat jar and running Hadoop indexer - which I found in the documents (http://druid.io/docs/latest/operations/other-Hadoop.html) as well as this user group (https://groups.google.com/forum/#!topic/druid-user/UM-Cgj750sY).

So far managed to index small sample Json files that resides in HDFS, but when I try to index full-size production data (hundreds of Gigabytes per day in total) the job will fail due to “java.lang.OutOfMemoryError: Java heap space”.

From both the logs and a post I’ve found in this group (https://groups.google.com/d/msg/druid-user/kvvQtb4F1Lw/lWJsp8n9AgAJ),

I’m suspecting that the batch ingestion job is running in “local” mode rather than in “remote” mode.

The question is How do I run Hadoop Indexer in remote mode (i.e. in the Hadoop cluster)?

Here are my java command and configs:

java command:

java -Xmx256m
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-classpath $(hadoop classpath)
-classpath $DRUID_CONF_DIR/overlord
-classpath $DRUID_CONF_DIR/_common
-classpath ./target/scala-2.10/druid-batch-ingestion-assembly-0.1-SNAPSHOT.jar
io.druid.cli.Main index hadoop
./batch-index.json

``

configs:

  • $(hadoop classpath) includes my $HADOOP_CONF_DIR, $HADOOP_HOME, etc:

/usr/local/Cellar/hadoop/hadoop-conf:/usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/common/lib/:/usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/common/:/usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/hdfs:/usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/hdfs/lib/:/usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/hdfs/:/usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/yarn/lib/:/usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/yarn/:/usr/lib/hadoop-mapreduce//share/hadoop/mapreduce/*

  • _common/common.runtime.properties:

    druid.extensions.loadList=[“druid-hdfs-storage”, “mysql-metadata-storage”]
    druid.extensions.hadoopDependenciesDir=/usr/local/Cellar/hadoop/2.7.2
    druid.startup.logging.logProperties=true

    druid.zk.service.host=10.197.3.126:2181,10.197.3.125:2181,10.197.3.127:2181
    druid.zk.paths.base=/druid

    druid.metadata.storage.type=mysql
    druid.metadata.storage.connector.connectURI=jdbc:mysql://comm-s2graph.mydb.iwilab.com:3306/druid_staging
    druid.metadata.storage.connector.user=graph
    druid.metadata.storage.connector.password=graph

    druid.storage.type=hdfs
    druid.storage.storageDirectory=/user/work/s2graph/druid/staging/segments

    druid.indexer.logs.type=hdfs
    druid.indexer.logs.directory=/user/work/s2graph/druid/staging/indexing-logs

    druid.selectors.indexing.serviceName=druid/overlord
    druid.selectors.coordinator.serviceName=druid/coordinator

    druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”]
    druid.emitter=logging
    druid.emitter.logging.logLevel=info

``

  • overlord/runtime.properties:

    druid.service=druid/overlord
    druid.port=8090

    druid.indexer.queue.startDelay=PT5S

    druid.indexer.runner.type=remote
    druid.indexer.storage.type=metadata
    druid.indexer.runner.taskCleanupTimeout=PT5M

``

  • overlord/jvm.config

    -server
    -Xmx10g
    -Xms10g
    -XX:NewSize=2g
    -XX:MaxNewSize=2g
    -XX:+UseConcMarkSweepGC
    -XX:+PrintGCDetails
    -XX:+PrintGCTimeStamps
    -Duser.timezone=UTC
    -Dfile.encoding=UTF-8
    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
    -Djava.io.tmpdir=var/tmp

``

  • batch-index.json:

{
“type” : “index_hadoop”,
“spec” : {
“dataSchema” : {
“dataSource” : “data-source”,
“parser” : {
“type” : “hadoopyString”,
“parseSpec” : {
“format” : “json”,
“columns”: [“col1”, “col2”, “col3”, …],
“dimensionsSpec”: {
“dimensions”: [
“col1”, “col2”, “col3”, …
]
}
}
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “count”
},
{
“type”: “hyperUnique”,
“name”: “unique_visitors”,
“fieldName”: “col1”
}
],
“granularitySpec” : {
“type” : “uniform”,
“segmentGranularity” : “hour”,
“queryGranularity” : “NONE”,
“intervals” : [ “2016-06-26/2016-06-28” ]
}
},
“ioConfig” : {
“type” : “hadoop”,
“inputSpec” : {
“type” : “static”,
“paths” : “hdfs://name.node/path/to/batch/data.json”
},
“metadataUpdateSpec” : {
“type”:“mysql”,
“connectURI” : “jdbc:mysql://mysql.domain:3306/druid”,
“user” : “druid”,
“password” : “diurd”,
“segmentTable” : “druid_segments”
},
“segmentOutputPath” : “hdfs://name.node/path/to/segments”
},
“tuningConfig” : {
“type”: “hadoop”,
“workingPath”: “hdfs://name.node/some/path/tmp”
}
}
}

``

Hello

Hello everyone,

How can I run Hadoop indexer job in a remote Hadoop cluster?

The Druid index job is a Hadoop Job. Like every other hadoop job, it relies on the hadoop/conf/*.xml site files. So if you include the good one that points to your remote cluster, druid will use that.

You can double check from the MR/YARN UI of your cluster if there is any MR jobs, thought.

Hi Slim,

Thanks for the reply.

No such MR job in the cluster UI.

I believe that the correct .xml site files were included via the first -classpath option in the java command.

Again, the MR job seems to be reading the data file from and uploading jar to HDFS so I think at least I’m doing something right.

I think the problem is similar to the following thread, although in this case the user is submiting the index task to overlord:

https://groups.google.com/forum/#!searchin/druid-user/hadoop$20indexer$20remote/druid-user/vvX3VEGMTcw/DAueqiq3EQAJ

Here is a snippet of the MR logs:

2016-07-12T00:26:28,879 INFO [main] io.druid.indexer.JobHelper - Deleting path[hdfs://name.node.url/user/work/druid/tmp/druid-social/2016-07-12T002628.463Z]
2016-07-12T00:26:29,275 INFO [main] io.druid.indexer.path.StaticPathSpec - Adding paths[hdfs://name.node.url/user/work/druid/druid_batch_data/social_2016-06-27_2016-06-27.json]
2016-07-12T00:26:29,291 INFO [main] io.druid.indexer.path.StaticPathSpec - Adding paths[hdfs://name.node.url/user/work/druid/druid_batch_data/social_2016-06-27_2016-06-27.json]
2016-07-12T00:26:29,296 INFO [main] io.druid.indexer.JobHelper - Uploading jar to path[hdfs://name.node.url/user/work/druid/tmp/classpath/druid-batch-ingestion-assembly-0.1-SNAPSHOT.jar]
2016-07-12T00:26:37,337 INFO [main] org.apache.hadoop.conf.Configuration.deprecation - session.id is deprecated. Instead, use dfs.metrics.session-id
2016-07-12T00:26:37,337 INFO [main] org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId=
2016-07-12T00:26:37,428 WARN [main] org.apache.hadoop.mapreduce.JobSubmitter - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2016-07-12T00:26:37,433 WARN [main] org.apache.hadoop.mapreduce.JobSubmitter - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2016-07-12T00:26:38,108 INFO [main] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 200
2016-07-12T00:26:38,175 INFO [main] org.apache.hadoop.mapreduce.JobSubmitter - number of splits:400
2016-07-12T00:26:38,264 INFO [main] org.apache.hadoop.mapreduce.JobSubmitter - Submitting tokens for job: job_local1656890929_0001
2016-07-12T00:26:38,277 WARN [main] org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-druid/mapred/staging/deploy1656890929/.staging/job_local1656890929_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2016-07-12T00:26:38,277 WARN [main] org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-druid/mapred/staging/deploy1656890929/.staging/job_local1656890929_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2016-07-12T00:27:27,300 INFO [main] org.apache.hadoop.mapred.LocalDistributedCacheManager - Localized hdfs://name.node.url/user/work/druid/tmp/classpath/druid-batch-ingestion-assembly-0.1-SNAPSHOT.jar as file:/tmp/hadoop-druid/mapred/local/1468283198306/druid-batch-ingestion-assembly-0.1-SNAPSHOT.jar
2016-07-12T00:27:27,330 WARN [main] org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-druid/mapred/local/localRunner/deploy/job_local1656890929_0001/job_local1656890929_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2016-07-12T00:27:27,330 WARN [main] org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-druid/mapred/local/localRunner/deploy/job_local1656890929_0001/job_local1656890929_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2016-07-12T00:27:27,331 INFO [main] org.apache.hadoop.mapred.LocalDistributedCacheManager - file:/tmp/hadoop-druid/mapred/local/1468283198306/druid-batch-ingestion-assembly-0.1-SNAPSHOT.jar
2016-07-12T00:27:27,334 INFO [main] org.apache.hadoop.mapreduce.Job - The url to track the job: http://localhost:8080/
2016-07-12T00:27:27,334 INFO [main] io.druid.indexer.IndexGeneratorJob - Job druid-social-index-generator-Optional.of([2016-06-26T00:00:00.000Z/2016-06-28T00:00:00.000Z]) submitted, status available at http://localhost:8080/
2016-07-12T00:27:27,337 INFO [Thread-14] org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter set in config null
2016-07-12T00:27:27,337 INFO [main] org.apache.hadoop.mapreduce.Job - Running job: job_local1656890929_0001
2016-07-12T00:27:27,350 INFO [Thread-14] org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2016-07-12T00:27:27,431 INFO [Thread-14] org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks
2016-07-12T00:27:27,432 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local1656890929_0001_m_000000_0
2016-07-12T00:27:27,447 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.yarn.util.ProcfsBasedProcessTree - ProcfsBasedProcessTree currently is supported only on Linux.
2016-07-12T00:27:27,447 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.Task - Using ResourceCalculatorProcessTree : null
2016-07-12T00:27:27,449 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - Processing split: hdfs://name.node.url/user/work/druid/druid_batch_data/social_2016-06-27_2016-06-27.json/part-00000:0+134217728
2016-07-12T00:27:27,454 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2016-07-12T00:27:27,499 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - (EQUATOR) 0 kvi 26214396(104857584)
2016-07-12T00:27:27,499 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - mapreduce.task.io.sort.mb: 100
2016-07-12T00:27:27,499 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - soft limit at 83886080
2016-07-12T00:27:27,499 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - bufstart = 0; bufvoid = 104857600
2016-07-12T00:27:27,499 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - kvstart = 26214396; length = 6553600
2016-07-12T00:27:27,524 INFO [LocalJobRunner Map Task Executor #0] io.druid.indexer.HadoopDruidIndexerConfig - Running with config:

``

(https://groups.google.com/forum/#!searchin/druid-user/hadoop$20indexer$20remote/druid-user/vvX3VEGMTcw/DAueqiq3EQAJ)

Hi,
i suspect you are feeding druid the wrong mapred-site.xml file.

Please make sure that your classpath include the root dir of this file that points to the remote grid.

Can you share your configuration files ?

Hi,
i suspect you are feeding druid the wrong mapred-site.xml file.

Please make sure that your classpath include the root dir of this file that points to the remote grid.