Beginner question: Is it possible to update existing segment with new events (delta loading)

Hi,
I just started to work with Druid and got question about updating existing segment.

I’d like to build daily aggregation for some events in batch mode. I’m not interested in granularity lower than day, but my events coming more frequently and can be delayed for day or two.

Assuming have my data source described as follows :

"dataSchema" : {
  "dataSource" : "testSource2",
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "day",
    "queryGranularity" : "day",
    "intervals" : ["2018-09-12/2018-09-13"]
  },
...

and some dummy data records indexed with Hadoop indexer:

{"dt":"2018-09-12T03:43:00.000Z", "dim1" : "D1_1",  "amount" : 4}
{"dt":"2018-09-12T03:45:00.000Z", "dim1" : "D1_1",  "amout" : 2}
{"dt":"2018-09-12T03:47:33.333Z", "dim1" : "D1_1",  "amout" : 2}

My select query will return something like that

[ {

“timestamp” : “2018-09-12T00:00:00.000Z”,

“result” : {

"pagingIdentifiers" : {

  "testSource2_2018-09-12T00:00:00.000Z_2018-09-13T00:00:00.000Z_2018-07-03T18:51:25.855Z" : 0

},

"dimensions" : [ "dim1" ],

"metrics" : [ "amount" ],

"events" : [ {

  "segmentId" : "testSource2_2018-09-12T00:00:00.000Z_2018-09-13T00:00:00.000Z_2018-07-03T18:51:25.855Z",

  "offset" : 0,

  "event" : {

    "timestamp" : "2018-09-12T00:00:00.000Z",

    "dim1" : "D1_1",

    "amount" : 8

  }

} ]

}

} ]

Now I’d like to append some additional “delta” events for the next hour, e.g. the came later into my processing pipeline from data provider

{"dt":"2018-09-12T04:23:00.000Z", "dim1" : "D1_1", "amount" : 3}
{"dt":"2018-09-12T04:25:00.000Z", "dim1" : "D1_1", "amount" : 1}

If I use same indexing technique loading the from file using hadoop, those 2 records will replace everything in already existing segment so the result will look like

[ {

“timestamp” : “2018-09-12T00:00:00.000Z”,

“result” : {

"pagingIdentifiers" : {

  "testSource2_2018-09-12T00:00:00.000Z_2018-09-13T00:00:00.000Z_2018-07-03T18:51:25.855Z" : 0

},

"dimensions" : [ "dim1" ],

"metrics" : [ "amount" ],

"events" : [ {

  "segmentId" : "testSource2_2018-09-12T00:00:00.000Z_2018-09-13T00:00:00.000Z_2018-07-03T18:51:25.855Z",

  "offset" : 0,

  "event" : {

    "timestamp" : "2018-09-12T00:00:00.000Z",

    "dim1" : "D1_1",

    "amount" : 4       #<-------- I would expect to have 12 (8+4) here

  }

} ]

}

} ]

What is the proper way of “appending” data?

Should I create separate dataSource to store those 2 records and then use it in “ioConfig”?

Should I just load 2 new records and then somehow rebuild the whole segment?

Or should I keep all 5 records inside the same file “partition”/directory on File system and then reload them all-together?

There is the documentation http://druid.io/docs/latest/ingestion/update-existing-data.html , but for me it is not clear how to use provided information.

Most likely I’m doing something wrong.

I’ll appreciate any suggestions/advises

Alexey

There is the documentation http://druid.io/docs/latest/ingestion/update-existing-data.html , but for me it is not clear how to use provided information.

You’ll want to use a “multi” type inputSpec as shown in the example, where one of the subspecs is your existing datasource that you wish to append to, and where the other subspec points to the file(s) with the new data you want to append, which will have the effect of combining the two input sources.

  • Jon