Load two batch file with the same date

I 'v loaded two files in in two separate tasks. Timestamp is the same in these files.
Why is druid not pooled the data? But it created one segment with “used”:false and one segment with “used”:true. Thx

timestamp dim1 metric2 metric3
first file:

1451260800 1 23 1 0

1451260800 2 23 1 1

second file:

1451260800 3 24 1 0

1451260800 4 26 1 1

first task:

{

“type”: “index”,

“spec”: {

“dataSchema”: {

“dataSource”: “test”,

“parser”: {…},

“metricsSpec”: […],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “DAY”,

“queryGranularity”: “DAY”,

“intervals”: [“2015-01-01/2016-12-31”]

}

},

“ioConfig”: {

“type”: “index”,

“firehose”: {

“type”: “local”,

“baseDir”: “/dir”,

“filter”: “first_file”

}

},

“tuningConfig”: {

“type”: “index”,

“targetPartitionSize”: 0,

“rowFlushBoundary”: 0

}

}

}

second task:

{

“type”: “index”,

“spec”: {

“dataSchema”: {

“dataSource”: “test”,

“parser”: {…},

“metricsSpec”: […],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “DAY”,

“queryGranularity”: “DAY”,

“intervals”: [“2015-01-01/2016-12-31”]

}

},

“ioConfig”: {

“type”: “index”,

“firehose”: {

“type”: “local”,

“baseDir”: “/dir”,

“filter”: “first_file”

}

},

“tuningConfig”: {

“type”: “index”,

“targetPartitionSize”: 0,

“rowFlushBoundary”: 0

}

}

}

Hi,

When you run multiple tasks for the same interval, it is treated as if you are reIndexing your data and the segments from the second job overrides the segments generated by first job.

In case you want to index data in both the files, you need to run a single index or hadoopIndex task which ingests data from both the files.

Batch ingestion is normally a “replace-by-interval” operation, so whenever you load data for a particular interval, that replaces any other data for that interval currently existing in the system. If you want to load both files, you can specify wildcards for the “filter”, or you can use the “index_hadoop” task and specify any path recognized by Hadoop (including comma-separated or wildcards). The hadoop task can work locally without a hadoop cluster, it just runs in-process.

This from a while ago so I am not sure if appending functionality is available on batch ingestion yet. However, I am doing batch ingestion right now using Hadoop indexing in which data for the same interval arrives in two different ingestion tasks and is getting over-written. After reading some posts here, it seems like that’s the default behavior. However, I was going through the Druid doc today and observed that for Hadoop ingestion, append and overwrite can be done. I didn’t find any more info on how to append using Hadoop batch ingestion though.

http://druid.io/docs/latest/ingestion/index.html

Hi,

The hadoop batch task will always overwrite, to do an “append” you can use a “multi” type input spec, where one input is an existing Druid datasource and the other input contains the data to be appended.

http://druid.io/docs/latest/ingestion/update-existing-data.html

  • Jon

That’s what my understanding was. However, I saw this table at http://druid.io/docs/latest/ingestion/index.html , So was a bit confused!

But now I think of it, ‘multi’ is just a type of inputSpec for Hadoop batch ingestion, so it does make sense.

Since I have you here now, I am stuck at a problem.

My aim: I have to ingest 130GB of data spread across 1300 files. The data timestamp ranges from Jan-1-2017 to Dec-31-2017. Data is skewed - so the 1300th file could have data for 1st Jan. Data could be in any file for any timestamp basically.

What I did: Simply used hadoop ingestion with hadoop inputSpec. Used a 8 core Middle manager. Had limited heap and disk memory for hadoop map/reduce tmp/ so ingested 1 file per indexing task submitted. So I created a bash script to submit 1300 indexing tasks on Druid. Of course this is not a viable way to load data, so when I queried the data, I realized everything has been overwritten! Lesson learnt!

**My options: **

1. Use Delta ingestion:

- Use the ‘multi’ inputSpec: use the in dataSource “child” type and the hadoop static indexing in another child type.

  • Following are my two doubts in this technique:

a. For the very first file to be ingested, there will be no datasource (this is an assumption, might be wrong). So how will Druid tackle that?

b. Since I will not know the interval to be specified in the ‘dataSource’ ingestionSpec, can I set it to the min/max values of my data? Will this result in poor ingestion performance?

2. Ingesting all 1300 files using only 1 indexing task:

- This is probably not right but what if I try to ingest all 1300 files using only one ingestion task? My understanding is that druid will assign only one peon to it and hence it will be extremely slow?

Thanks!

b. Since I will not know the interval to be specified in the ‘dataSource’ ingestionSpec, can I set it to the min/max values of my data? Will this result in poor ingestion performance?

Since the append data has arbitrarily distributed timestamps, you would need to set it to min/max values, which means using delta ingestion to pull in the files one by one would require reading the entire existing dataset for each append, so I wouldn’t recommend this approach.

a. For the very first file to be ingested, there will be no datasource (this is an assumption, might be wrong). So how will Druid tackle that?

You would have to just do a simple non-multi ingestion for the first file

2. Ingesting all 1300 files using only 1 indexing task:

I would recommend trying this, the peon itself doesn’t have to do that much work if you’re using a Hadoop cluster, I think most of the workload will be on your mappers and reducers.

Thanks,

Jon

Alternatively, you could try the native batch ingestion task, which can append without reading the existing data: http://druid.io/docs/latest/ingestion/native-batch.html

  • Jon

Thanks for sharing that. Will appending using native batch ingestion be better, performance wise, than Hadoop considering I will be specifying min/max value of timestamp for entire dataset in intervals for ingestSegment firehose as well?

Also, why would we not have to read existing data in case of Native batch ingestion but for Hadoop we have to?

Will appending using native batch ingestion be better, performance wise, than Hadoop considering I will be specifying min/max value of timestamp for entire dataset in intervals for ingestSegment firehose as well?

For append, it can be faster since it only needs to read the append data and not the append data + original data. Probably depends on how much data is being ingest in each append.

I’m not sure appending incrementally with native would compare with using Hadoop to ingest all 1300 inputs at once. In any case, I recommend trying out the various options and seeing how they perform on your cluster and data.

Also, why would we not have to read existing data in case of Native batch ingestion but for Hadoop we have to?

Hm, I’m not sure why the Hadoop task doesn’t support a true append mode, maybe someone else can comment on that.

  • Jon

Also worth mentioning, in 0.13.0, you can run parallel native batch indexing tasks, splitting an ingestion workload across multiple peons:

https://github.com/apache/incubator-druid/blob/master/docs/content/ingestion/native_tasks.md

Thanks,

Jon

That’s all very helpful info.

Thanks for taking out time and answering my questions.