Updating multiple historical segments without reindexing

Hi, all.

At our company, we rely heavily on Druid over Hadoop for storing all of our customer’s historical data.

Our current data model is -

Each datasource is divided into one-month segments, divided into shards based on their number of lines.

At the moment, we load new data hourly - on each load we run a re-ingestion of the last 3 months (3 segments).

We want to move to a new model and trying to figure out if it’s doable using Druid.

According to our new model, each batch of new data should be appended to multiple historical segments.

We don’t want to run a historical re-ingestion on every load since that would take up too much time and resources, so we thought about appending batches of small data to historical segments instead and running periodic compaction (for example once a day).

From researching the matter, we found that there could be a few ways of doing so:

  1. Delta Ingestion is a native druid process documented here - http://druid.io/docs/latest/ingestion/update-existing-data.html.

From the documentation, it seems like the Delta ingestion is supposed to do exactly what we want, but when we’ve tried running it, the actual result was re-indexing.

Does delta ingestion really match our use-cases?

We are adding an example of the ingestion spec of a failed attempt (caused re-indexing of the data), we would appreciate any feedback, of any specs mistakes we might have overlooked:

{

“type”: “index_hadoop”,

“spec”: {

   "ioConfig": {

       "type": "hadoop",

       "inputSpec": {

           "type": "multi",

           "Children": [

               {

                   "type" : "dataSource",

                   "ingestionSpec" : {

                       "dataSource": "test_datasource",

                       "intervals": ["2015-08-01/2015-08-31"],

                    }

               },

               {

                   "type" : "static",

                   "paths": <S3_URLS>

               }

           ],

       }

   },

   "dataSchema": {

       "dataSource": "test_datasource",

       "granularitySpec": {

           "type": "uniform",

           "segmentGranularity": "MONTH",

           "queryGranularity": "none",

           "intervals": ["2015-08-01/2015-08-31"]

       },

       "parser": {

           "type": "hadoopyString",

           "parseSpec": {

               "format": "json",

               "timestampSpec": {

                   "column": "attribution_date",

                   "format": "auto"

               },

           },

       },

       "dimensionsSpec": {

           "dimensions": <DIMENSIONS>

       },

       "metricsSpec": <METRICS>

   },

   "tuningConfig": {

       "type": "hadoop",

       "combineText": "true",

       "buildV9Directly": "true",

       "partitionsSpec": {

           "type": "hashed",

           "numShards": 6

       },

       "jobProperties": {

           "fs.s3.awsAccessKeyId": <AWS_ACCESS_KEY_ID>,

           "fs.s3.awsSecretAccessKey": <AWS_SECRET_ACCESS_KEY>,

           "fs.s3.impl": "org.apache.hadoop.fs.s3native.NativeS3FileSystem",

           "fs.s3n.awsAccessKeyId": <AWS_ACCESS_KEY_ID>,

           "fs.s3n.awsSecretAccessKey": <AWS_SECRET_ACCESS_KEY>,

           "fs.s3n.impl": "org.apache.hadoop.fs.s3native.NativeS3FileSystem",

           "io.compression.codecs": "org.apache.hadoop.io.compress.GzipCodec,"

                                    "org.apache.hadoop.io.compress.DefaultCodec,"

                                    "org.apache.hadoop.io.compress.BZip2Codec,"

                                    "org.apache.hadoop.io.compress.SnappyCodec",

           "mapreduce.job.classloader": "true",

           "mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,"

                                                       "org.apache.commons.logging.,org.apache.log4j.,"

                                                       "org.apache.hadoop.",

           "mapreduce.job.jvm.numtasks": 20,

           "mapreduce.task.io.sort.mb": "256",

           "mapreduce.reduce.shuffle.parallelcopies": "24",

           "mapreduce.client.submit.file.replication": 3,

           "yarn.app.mapreduce.am.resource.mb": 2048,

           "yarn.app.mapreduce.am.command-opts": "-Xmx1536m",

           "mapreduce.input.fileinputformat.split.maxsize": 30 * 1024 * 1024,

           "mapreduce.reduce.shuffle.input.buffer.percent": 0.6

       }

   }

}

}

  1. Real-time ingestion over Kafka was also raised as a possibility, but since it requires a single consumer for each updated segment, this option as well would take up too many resources.

  2. A third option that was raised is creating a plumber extension that would handle our data and append it as we want - we were thinking about something similar to the behavior of the real-time ingestion - creating new shards and appending them to segments without re-ingesting the segments.

Did anyone have any experience with writing plumber extensions and could comment on if they think this is a logical option, and what would be a good way to start on it.

Thanks in advance for any help and\or suggestions.

Maya and Noa.

Hey guys,

were you able to get the option #1 working. I am on the same boat. Would be great if you could share your experiences.

Hey all,

The behavior you’re looking for is actually already supported using Druid’s native indexing task. In the ioConfig section of the ingestion spec, if you set ‘appendToExisting’ to be true, the output of the batch ingestion will be a set of partitions that will be added onto the previous version of the segment set, thereby giving you ‘append’ behavior instead of the ‘overwrite’ behavior you would normally get. See http://druid.io/docs/latest/ingestion/native-batch.html for more details.

One gotcha worth pointing out - you should run all of your batch ingestion jobs using the tuningConfig ‘forceExtendableShardSpecs’ set to true, otherwise Druid won’t be able to extend the segment set if it contains a particular type of non-extendable shard. If you run into this issue, you may have to go back and re-index your existing data with ‘forceExtendableShardSpec’ set to true before running append-mode indexing tasks.

And yes you’re absolutely correct that you’ll get the best results by periodically compacting these segments if they come out being too small (ideally they should be a few hundred MB each).

Hope this helps!

David

This is all great information. Thank you so much for taking out the time!

Hello, David.

We are using parquet format for raw files. So we can use only hadoop indexing task. We can’t use native indexing task with setting “appendToExisting”. Am I right?

I have segmentGranularity = “hour”, and I want to update data in current hour each 5 minutes.

Is there any difference in my options to ingest data:

  • re-ingest current hour each 5 minutes;
  • use delta ingestion each 5 minutes (read dataSource and only new raw files).
    Seems that it’s no sense to do delta ingestion in our case. But I’m not sure at all.

Hi,

Currently HadoopIndexTask is only able to read the whole data and (over)write them into the specified interval. This is true even for delta ingestion. The task would read all existing segments and new files you specified and reindex them together. This is only because we haven’t implemented the real appending mode yet, but yeah, that’s what it does now.

To support the various file formats other than string formats for the native index task, we have an open issue (https://github.com/apache/incubator-druid/issues/5584) and I think someone is working on it.

If you want to update every 5 minutes, I think it would make more sense to use Kafka Indexing Service if possible.

Jihoon

Thank you for your answer!