Batch Ingestion: Reduce task memory consumption

Hey Guys,

We’re running a druid (v0.10) cluster with Hadoop AWS EMR and seeing a problem when trying to index a segment with sharding.

The segment size we’re loading is about 5.5GB (The raw data is splitted into around 1500 s3 files which are being passed in the payload json) and we’re splitting it to 10 shards.

We see that increasing the shard count doesn’t seem to help with decreasing the memory consumption per each reduce task.

For instance, when running the load we mentioned with the following configuration, each reduce task used up to 18GB on average!

“tuningConfig”: {

            "type": "hadoop",

            "combineText": "true",

            "partitionsSpec": {

                "type": "hashed",

                "numShards": 10

            },

            "jobProperties": {

                "fs.s3.awsAccessKeyId": ACCESS_KEY,

                "fs.s3.awsSecretAccessKey": SECRET_KEY,

                "fs.s3.impl": "org.apache.hadoop.fs.s3native.NativeS3FileSystem",

                "fs.s3n.awsAccessKeyId": ACCESS_KEY,

                "fs.s3n.awsSecretAccessKey": SECRET_KEY,

                "fs.s3n.impl": "org.apache.hadoop.fs.s3native.NativeS3FileSystem",

                "io.compression.codecs": "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec",

                "mapreduce.job.classloader": "true",

                "mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.",

                "mapreduce.map.java.opts": "-server -Xms1024m -Xmx1228m -Duser.timezone=UTC -Dfile.encoding=UTF-8",

                "mapreduce.reduce.java.opts": "-server -Xms2048m -Xmx6553m -Duser.timezone=UTC -Dfile.encoding=UTF-8",

                "mapreduce.job.jvm.numtasks": -1,

                "mapreduce.map.memory.mb": 1536,

                "mapreduce.client.submit.file.replication": 3,

                "mapreduce.reduce.memory.mb": 8192,

                "yarn.app.mapreduce.am.resource.mb": 2048,

                "yarn.app.mapreduce.am.command-opts": "-Xmx1536m",

                "mapreduce.input.fileinputformat.split.maxsize": 30 * 1024 * 1024

            }

        }

It looks like adding more shards (and hence, running more reduce tasks) correlates with consuming more memory per reduce task!

Therefore, we can’t decide how much memory to allocate per reduce task.

Any idea why we’re seeing this behavior? Any knowledge about the reduce task internal logics and memory management?

Will setting the maxRowInMemory help?

Thanks,

Yuval