Partition Spec not working as expected?

Hello everyone,

First off I’d like to present you with my problem:

We’re trying out Druid to see if it’s capable of storing time-series data for advertising.

You might think “well, of course it can do that” but there’s some challenges.

This data can change retrospectively, for example certain metrics might be modified to adjust for impressions or clicks generated by click farms.

These changes can happen for up to 28 days after an event took place.

Additionally, our data is pulled in from external sources that, although generous, have rate limits in place that prevents us from loading data in bulk.

Data for a single campaign could look something like this:

{“campaign_id”: 1, “adset_id”: 1, “ad_id”: 1, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}
{“campaign_id”: 1, “adset_id”: 1, “ad_id”: 2, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}
{“campaign_id”: 1, “adset_id”: 1, “ad_id”: 3, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}
{“campaign_id”: 1, “adset_id”: 2, “ad_id”: 4, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}
{“campaign_id”: 1, “adset_id”: 2, “ad_id”: 5, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}
{“campaign_id”: 1, “adset_id”: 2, “ad_id”: 6, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}

``

I ingest, I can query it.

{
  "queryType": "topN",
  "dataSource": "insights",
  "aggregations": [
    {
      "type": "count",
      "name": "count"
    }
  ],
  "granularity": "all",
  "postAggregations": [],
  "intervals": "1901-01-01T00:00:00+00:00/2018-07-12T09:55:02+00:00",
  "threshold": 10000,
  "metric": "count",
  "dimension": "campaign_id"
}

``

{“campaign_id”: “1”,“count”: 6}

``

Okay great! Now let’s say I want to ingest data from a different campaign:

{“campaign_id”: 2, “adset_id”: 3, “ad_id”: 7, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}

{“campaign_id”: 2, “adset_id”: 3, “ad_id”: 8, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}

{“campaign_id”: 2, “adset_id”: 3, “ad_id”: 9, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}

{“campaign_id”: 2, “adset_id”: 4, “ad_id”: 10, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}

{“campaign_id”: 2, “adset_id”: 4, “ad_id”: 11, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}

{“campaign_id”: 2, “adset_id”: 4, “ad_id”: 12, “clicks”: 2000, “spend”: 400, “date”: “2018-01-01T01:00:00Z”}

``

Using the same query I now get back, and all my data for my previous campaign is gone.

{

“campaign_id”: “2”,

“count”: 6

}

``

Now this what I expected, because it’s segmented by time, however my ingestion spec:

{

“type”: “index_hadoop”,

“spec”: {

“ioConfig”: {

“type”: “hadoop”,

“inputSpec”: {

“type”: “static”,

“paths”: “playground/druid-tmp/campaign_2”

}

},

“dataSchema”: {

“dataSource”: “insights”,

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “month”,

“queryGranularity”: “none”,

“intervals”: [

“2018-01-01/2019-01-01”

]

},

“parser”: {

“type”: “hadoopyString”,

“parseSpec”: {

“format”: “json”,

“dimensionsSpec”: {

“dimensions”: [

“ad_id”,

“adset_id”,

“campaign_id”

]

},

“timestampSpec”: {

“format”: “auto”,

“column”: “date”

}

}

},

“metricsSpec”: [

{

“name”: “clicks”,

“type”: “longSum”,

“fieldName”: “clicks”

},

{

“name”: “spend”,

“type”: “doubleSum”,

“fieldName”: “spend”

}

]

},

“tuningConfig”: {

“type”: “hadoop”,

“partitionsSpec”: {

“type”: “dimension”,

“targetPartitionSize”: 5000000,

“partitionDimension”: “campaign_id”

},

“jobProperties”: {}

}

}

}

``

is additionally sharded by the campaign_id, which I assumed would not overwrite data for other campaigns.

Is my understanding of sharding wrong? Is there another way I can solve this problem? Should I not use Druid for this?

There are additional dimensions that are shared cross campaigns, for example “account_id”,

which could easily spawn across thousands of campaigns, but being able to batch load data per campaign is an absolute must, because of rate limiting issue from our data partners.

is additionally sharded by the campaign_id, which I assumed would not overwrite data for other campaigns.

Is my understanding of sharding wrong? Is there another way I can solve this problem? Should I not use Druid for this?

There are additional dimensions that are shared cross campaigns, for example “account_id”,

which could easily spawn across thousands of campaigns, but being able to batch load data per campaign is an absolute must, because of rate limiting issue from our data partners.

From the docs for partitionDimension “The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set”

So it looks like it will be ignored because you have targetPartitionSize set.

Hi Zachery,

Thanks for the reply. It seems this is the case for Hash-based partitioning, I’m using single dimension based partitioning.

However , regardless of whether I set my partitionSpec to

“partitionsSpec”: {

“type”: “hashed”,

“partitionDimensions”: [“campaign_id”]

},

``

“partitionsSpec”: {

“type”: “dimension”,

“partitionDimension”: “campaign_id”

},

``

The end result seems to be the same.

Hey Brian,

The segments generated from a batch ingestion task will have a newer version than the old segments so they'll replace the segments that already exist for whichever interval that's being ingested.

You could look into using the Kafka Indexing Service which can handle older events or using delta ingestion as described here: http://druid.io/docs/latest/ingestion/update-existing-data.html

Cheers,
Dylan

Hey Dylan,

That explains just about everything. I’ll take a look at the options you mentioned.

Thanks!!

Brian