Batch Ingestion - Performance of inputSpec static vs inputSpec granularity

I was wondering the Batch Ingestion performance difference between the two inputSpec types ( static vs granularity ).

Case: 16800MB of CSV event data over 7 days.

Option 1: Druid Ingestion Index Task with static inputSpec

  • One 16800MB CSV file containing all 7 days worth of data.

“ioConfig”: {

“type”: “hadoop”,

“inputSpec”: {

“type”: “static”,

“paths”: “${csvTempDir}”,

“filter”: “*.csv”

}

},

``

Hi Mark, there should be virtually no difference in performance with these two methods as most of the time in indexing is in segment creation.

It seems I have performance issues when creating a Druid Ingestion Index Task over a large period. Note that I am currently using Local hadoop ingestion due to library incompatibilities with the old Hadoop Distribution (2.3) Druid is built against.

The same data indexed with with one Druid Indexer Task per hour where data exists, or one Druid Indexer task per month where data exists, verses one Druid Indexer Task for the whole period will have a huge performance difference. I have 3.2 GB of data spread unevenly over two years. Any ideas as to why the big performance difference ( 18-19 hours vs ~ 1 hour )? Is there any setting or configuration that could be modified to address this performance issue?

Results:

Option 1: Druid Ingestion Index Task with static inputSpec.

Interval Period - 2014-06-01T00:00:00Z to 2016-06-22T00:00:00Z

By Month - 8 Druid Ingestion Index Tasks executed one at a time for each month of data (Interval Period Total: 8 months) - Time Taken: 50 minutes

By Hour - 99 Druid Ingestion Index Tasks executed one at a time for each hour of data (Interval Period Total: 99 hours) - Time Taken: 1 hour 20 minutes

By None - 1 Druid Ingestion Index Task executed over the whole period (Interval Period Total: 2 years and 21 days) - Time Taken: 18-19 hours

  • Please note that that except the File Granularity “By None”, only segments with data get indexed.

Performance Test Configuration:

I did some additional research using a program I wrote based on “Option 1: Druid Ingestion Index Task with static inputSpec.”

The program:

  1. Runs a sql query for events from 2014-06-01T00:00:00Z to 2016-06-22T00:00:00Z in ascending ordered.
  2. Writes the sql query data to HDFS *.csv files by a choice of HOUR/MONTH/YEAR/NONE.
  3. Then sequentially create and execute a Druid Indexer Task (Local “index_hadoop” type) for each report data csv file I had.

The following is my tuning config:

“tuningConfig”: {

“type”: “hadoop”,

“partitionsSpec”: {

“targetPartitionSize”: 0

},

“numBackgroundPersistThreads” : 1,

“useCombiner”: true,

“jobProperties”: {

“mapreduce.map.memory.mb”: 2048,

“mapreduce.map.java.opts”: “-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8”,

“mapreduce.reduce.memory.mb”: 6144,

“mapreduce.reduce.java.opts”:"-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8",

“mapreduce.job.reduces”: 21,

“mapreduce.job.jvm.numtasks”: 20,

“mapreduce.reduce.shuffle.parallelcopies”: 50,

“mapreduce.reduce.shuffle.input.buffer.percent”: 0.5,

“mapreduce.task.io.sort.mb” : 250,

“mapreduce.task.io.sort.factor”: 100,

“mapreduce.jobtracker.handler.count”: 64,

“mapreduce.tasktracker.http.threads”: 20,

“mapreduce.output.fileoutputformat.compress”: false,

“mapreduce.output.fileoutputformat.compress.type”: “BLOCK”,

“mapreduce.output.fileoutputformat.compress.codec”: “org.apache.hadoop.io.compress.DefaultCodec”,

“mapreduce.map.output.compress”: true,

“mapreduce.map.output.compress.codec”: “org.apache.hadoop.io.compress.DefaultCodec”,

“mapreduce.map.speculative”: false,

“mapreduce.reduce.speculative”: false,

“mapreduce.task.timeout”: 1800000

}

}

``

Sample Data by Month (~ 3.2 GB):

Found 7 items

-rw-r–r-- 1 reporting hdfs 114 2016-06-22 13:50 hdfs://hadoopc/tmp/report-data/rebuild-2015_11.csv

-rw-r–r-- 1 reporting hdfs 7.0 M 2016-06-22 13:50 hdfs://hadoopc/tmp/report-data/rebuild-2015_12.csv

-rw-r–r-- 1 reporting hdfs 7.0 M 2016-06-22 13:50 hdfs://hadoopc/tmp/report-data/rebuild-2016_01.csv

-rw-r–r-- 1 reporting hdfs 2.7 K 2016-06-22 13:50 hdfs://hadoopc/tmp/report-data/rebuild-2016_02.csv

-rw-r–r-- 1 reporting hdfs 7.0 M 2016-06-22 13:50 hdfs://hadoopc/tmp/report-data/rebuild-2016_03.csv

-rw-r–r-- 1 reporting hdfs 847 2016-06-22 13:50 hdfs://hadoopc/tmp/report-data/rebuild-2016_04.csv

-rw-r–r-- 1 reporting hdfs 3.2 G 2016-06-22 13:50 hdfs://hadoopc/tmp/report-data/rebuild-2016_05.csv
-rw-r–r-- 1 reporting hdfs 7.0 M 2016-06-22 13:50 hdfs://hadoopc/tmp/report-data/rebuild-2016_06.csv

``

Sample Intervals by Month:

“intervals”: [“2015-11-01T00:00:00.000Z/2015-12-01T00:00:00.000Z”]

“intervals”: [“2015-12-01T00:00:00.000Z/2016-01-01T00:00:00.000Z”]

“intervals”: [“2016-01-01T00:00:00.000Z/2016-02-01T00:00:00.000Z”]

“intervals”: [“2016-02-01T00:00:00.000Z/2016-03-01T00:00:00.000Z”]

“intervals”: [“2016-03-01T00:00:00.000Z/2016-04-01T00:00:00.000Z”]

“intervals”: [“2016-04-01T00:00:00.000Z/2016-05-01T00:00:00.000Z”]

“intervals”: [“2016-05-01T00:00:00.000Z/2016-06-01T00:00:00.000Z”]

“intervals”: [“2016-06-01T00:00:00.000Z/2016-07-01T00:00:00.000Z”]

``

The local hadoop task is only meant for quickstarts and PoCs, it is not designed to be performant at all. For ingestion of large batch static files, we recommend using a remote Hadoop cluster or if you have your data in Kafka and are using Kafka 0.9.1, you can stream your data via the new Kafka indexing task.