Hi, all.
At our company, we rely heavily on Druid over Hadoop for storing all of our customer’s historical data.
Our current data model is -
Each datasource is divided into one-month segments, divided into shards based on their number of lines.
At the moment, we load new data hourly - on each load we run a re-ingestion of the last 3 months (3 segments).
We want to move to a new model and trying to figure out if it’s doable using Druid.
According to our new model, each batch of new data should be appended to multiple historical segments.
We don’t want to run a historical re-ingestion on every load since that would take up too much time and resources, so we thought about appending batches of small data to historical segments instead and running periodic compaction (for example once a day).
From researching the matter, we found that there could be a few ways of doing so:
- Delta Ingestion is a native druid process documented here - http://druid.io/docs/latest/ingestion/update-existing-data.html.
From the documentation, it seems like the Delta ingestion is supposed to do exactly what we want, but when we’ve tried running it, the actual result was re-indexing.
Does delta ingestion really match our use-cases?
We are adding an example of the ingestion spec of a failed attempt (caused re-indexing of the data), we would appreciate any feedback, of any specs mistakes we might have overlooked:
{
“type”: “index_hadoop”,
“spec”: {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "multi",
"Children": [
{
"type" : "dataSource",
"ingestionSpec" : {
"dataSource": "test_datasource",
"intervals": ["2015-08-01/2015-08-31"],
}
},
{
"type" : "static",
"paths": <S3_URLS>
}
],
}
},
"dataSchema": {
"dataSource": "test_datasource",
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "MONTH",
"queryGranularity": "none",
"intervals": ["2015-08-01/2015-08-31"]
},
"parser": {
"type": "hadoopyString",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "attribution_date",
"format": "auto"
},
},
},
"dimensionsSpec": {
"dimensions": <DIMENSIONS>
},
"metricsSpec": <METRICS>
},
"tuningConfig": {
"type": "hadoop",
"combineText": "true",
"buildV9Directly": "true",
"partitionsSpec": {
"type": "hashed",
"numShards": 6
},
"jobProperties": {
"fs.s3.awsAccessKeyId": <AWS_ACCESS_KEY_ID>,
"fs.s3.awsSecretAccessKey": <AWS_SECRET_ACCESS_KEY>,
"fs.s3.impl": "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
"fs.s3n.awsAccessKeyId": <AWS_ACCESS_KEY_ID>,
"fs.s3n.awsSecretAccessKey": <AWS_SECRET_ACCESS_KEY>,
"fs.s3n.impl": "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
"io.compression.codecs": "org.apache.hadoop.io.compress.GzipCodec,"
"org.apache.hadoop.io.compress.DefaultCodec,"
"org.apache.hadoop.io.compress.BZip2Codec,"
"org.apache.hadoop.io.compress.SnappyCodec",
"mapreduce.job.classloader": "true",
"mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,"
"org.apache.commons.logging.,org.apache.log4j.,"
"org.apache.hadoop.",
"mapreduce.job.jvm.numtasks": 20,
"mapreduce.task.io.sort.mb": "256",
"mapreduce.reduce.shuffle.parallelcopies": "24",
"mapreduce.client.submit.file.replication": 3,
"yarn.app.mapreduce.am.resource.mb": 2048,
"yarn.app.mapreduce.am.command-opts": "-Xmx1536m",
"mapreduce.input.fileinputformat.split.maxsize": 30 * 1024 * 1024,
"mapreduce.reduce.shuffle.input.buffer.percent": 0.6
}
}
}
}
-
Real-time ingestion over Kafka was also raised as a possibility, but since it requires a single consumer for each updated segment, this option as well would take up too many resources.
-
A third option that was raised is creating a plumber extension that would handle our data and append it as we want - we were thinking about something similar to the behavior of the real-time ingestion - creating new shards and appending them to segments without re-ingesting the segments.
Did anyone have any experience with writing plumber extensions and could comment on if they think this is a logical option, and what would be a good way to start on it.
Thanks in advance for any help and\or suggestions.
Maya and Noa.