Create new data sources with another specs based on existing data

Our cluster takes data from kafka and puts them into one data source with 5 minute granularity.

We now want to create couple more data sources that should be based on the same data, but javing bigger granularities, like 30m, 6h and 1d.

When it comes to new incoming data we can (at least that’s the way we see it) just put our data onto some different kafka topics as well and each injection specs for each data source we like to have will read from that topic and roll-up data according to specs.

But how can we go through our existing 5m granularity data so to fill historical periods for introduced data sources?

Here are two options.

  1. Setup multiple different Kafka Indexing into Druid on same Kafka Topic but with different dataSource schema in the Kafka supervisor specs. OR
  2. Run periodic reindexing jobs (see reindexing information in http://druid.io/docs/0.11.0/ingestion/update-existing-data.html ) to create dataSources for bigger granularities.

hth,
Himanshu

  1. I have done it like this, but now I am facing this: https://github.com/druid-io/druid/issues/5244
  2. The docs are so sparse and unclear, maybe you can describe it a bit, please.

Thank you

  1. replied on the issue
  2. if you’re familiar with batch indexing then batch re-indexing is same except for the spec of where input data is stored … and in this case you basically say that my input data is stored in Druid’s xx dataSource. If you’re not doing batch indexing already then I would suggest to first familiarize with that, once that works… changing it to do reindexing would be somewhat easy. if you have specific questions while trying things out, we can help.

– Himanshu

  1. thank you, I have posted a reply as well. Lets discuss this issue there then :slight_smile:
  2. Well, we use kafka-indexing-service for that since I see a lot of tasks like “index_kafka__eba088c525316c9_efioeied”. So it seems that indexing happens automatically, without any manual task submitting (besides 1 time supervisor spec submission to create new data source). I am a bit confused here, I think. Batch-indexing and batch-reindexing is probably from another opera, right?

(1) yep
(2) yes, batch ingestion is a totally different feature . Here you submit a task json to Druid telling about location of input data, parse specification for that input data, schema for segments to be created. Then Druid runs that task (locally or on hadoop based on type of task) and creates the segments. Batch reindexing is same except you input data is located within Druid itself.
For your use case, I think (1) is fine approach.

Ok it seems that we moved further on solving this.

So we created a batch indexing job (correct me if I am wrong, but this is batch indexing job, not batch-reindexing, correct?) that looks like this:

{ "type": "index_hadoop", "spec": { "dataSchema": { "dataSource": "new-datasource-index-1h", "parser": { "type": "hadoopyString", "parseSpec": { "format": "json", "timestampSpec": { "column": "timestamp", "format": "auto" }, "dimensionsSpec": { "dimensions" : ["aaa", "bbb", "ccc", "ddd", "eee"], "dimensionExclusions" : ["xxx", "yyy"], "spatialDimensions" : [] } } }, "metricsSpec": [ { "name": "event_qty", "type": "longSum", "fieldName": "count" }, { "name": "sum_value", "type": "doubleSum", "fieldName": "val"}, { "name": "min_value", "type": "doubleMin", "fieldName": "val"}, { "name": "max_value", "type": "doubleMax", "fieldName": "val"} ], "granularitySpec": { "type" : "uniform", "segmentGranularity" : "day", "queryGranularity" : "hour", "rollup" : true, "intervals" : ["2017-12-01T00:00:00.000/2017-12-01T04:00:00.000"] } }, "ioConfig": { "type": "hadoop", "inputSpec": { "type": "dataSource", "ingestionSpec": { "dataSource" : "existing-druid-datasource", "intervals" : ["2017-12-01T00:00:00.000/2017-12-01T04:00:00.000"], "metrics" : ["count", "val"] } } }, "tuningConfig": { "type": "hadoop", "maxRowsInMemory": 15000000, "numBackgroundPersistThreads": 0, "jobProperties": { "mapreduce.job.classloader": "true" } } } }

We can confirm that this job runs ok (sometimes not, crashing with “Java heap space” error) and indeed creates a new data source from existing ones with new granularity settings.

But, there’s a slight problem: it seems that it cannot recognize metrics since after job completes, I can see that all new metrics in new data source are equal to 0 (as if I have not found any metrics in existing data source even if I had it specified in ingestionSpec under ioConfig).

For example here’s one row from existing-druid-datasource data source:

{ "__time" : "2017-12-01T01:05:00.000Z", "aaa" : "some-aaa-value", "bbb" : "some-bbb-value", "ccc" : "some-ccc-value", "ddd" : "some-ddd-value", "eee" : "some-eee-value", "xxx" : "151128900", "yyy" : "some irrelevant text", "count" : 1, "val" : 59287810 }

And here’s what was created in new-datasource-index-1h datasource:

{ "__time" : "2017-12-01T01:00:00.000Z", "aaa" : "some-aaa-value", "bbb" : "some-bbb-value", "ccc" : "some-ccc-value", "ddd" : "some-ddd-value", "eee" : "some-eee-value", "event_qty" : 0, "sum_value" : 0 "min_value" : 0 "max_value" : 0 }

Any clue why this happens?

Thanks in advance!