Reindexing Segments of a Realtime dataSource

Hey there,
we are using a Druid setup with an Indexing Service (Overlord and Middlemanagers) and we have set up Druid Indexing jobs that directly index data via the KafkaFirehose in Realtime. The index task looks something like this:`

{
“type”: “index_realtime”,
“id”: “index_task1”,
“resource”: {
“availabilityGroup”: “task_group_1”,
“requiredCapacity”: 1
},
“spec”: {
“dataSchema”: {
“dataSource”: “datasource1”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“timestampSpec”: {
“column”: “timestamp”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”: [
“timestamp”,
“dimension1”,
“dimension2”,
“dimension3”
],
“dimensionExclusions”: ,
“spatialDimensions”:
}
}
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “count”
},
{
“type”: “longSum”,
“name”: “sum_dim1”,
“fieldName”: “dimension1”
},
{
“type”: “hyperUnique”,
“name”: “unique_dim2”,
“fieldName”: “dimension2”
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “hour”,
“queryGranularity”: “minute”
}
},
“ioConfig”: {
“type”: “realtime”,
“firehose”: {
“type”: “kafka-0.8”,
“consumerProps”: {
“zookeeper.connect”: “xxx:2181,xxx:2181,xxx:2181”,
“zookeeper.connection.timeout.ms”: “15000”,
“zookeeper.session.timeout.ms”: “15000”,
“zookeeper.sync.time.ms”: “5000”,
“group.id”: “druid-analyzer”,
“fetch.message.max.bytes”: “1048586”,
“auto.offset.reset”: “largest”,
“auto.commit.enable”: “false”
},
“feed”: “kafka_topic”
},
“tuningConfig”: {
“type”: “realtime”,
“maxRowsInMemory”: 500000,
“intermediatePersistPeriod”: “PT10m”,
“windowPeriod”: “PT10m”,
“basePersistDirectory”: “/tmp/realtime/basePersist”,
“rejectionPolicy”: {
“type”: “serverTime”
}
}
}
}
}
Generally, we
are getting the data as json from Kafka and use the keys in each Kafka message as dimensions in druid. Due to a Key change in the Kafka topic (lets say, “dimension1” changed to “dim_1”) without a consecutive change
on the dimension name in the Realtime indexing task, we ended up with Segments, not containing the specific dimension dim_1.

As the FAQ offers, it seems to be possible to reindex existing data in druid:

`

How can I Reindex existing data in Druid with schema changes?

You can use IngestSegmentFirehose with index task to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
See Firehose for more details on IngestSegmentFirehose.

So we builded up a reindexing task, which looked like this:

{
“type”: “index”,
“id”: “reindex_task_1”,
“spec”: {
“dataSchema”: {
“dataSource”: “reindexed_datasource1”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“timestampSpec”: {
“column”: “timestamp”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”: [
“timestamp”,
“dim_1”,
“dimension2”,
“dimension3”
],
“dimensionExclusions”: ,
“spatialDimensions”:
}
}
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “count”
},
{
“type”: “longSum”,
“name”: “sum_dim1”,
“fieldName”: “dim_1”
},
{
“type”: “hyperUnique”,
“name”: “unique_dim2”,
“fieldName”: “dimension2”
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “hour”,
“queryGranularity”: “minute”,
“intervals” : [“2015-06-26T00:00:00.000/2015-06-29T00:00:00.000”]
}
},
“ioConfig”: {
“type”: “index”,
“firehose”: {
“type” : “ingestSegment”,
“dataSource” : “datasource1”,
“interval” : “2015-06-26T00:00:00.000/2015-06-29T00:00:00.000”,
“filter”: null,
“dimensions”: null,
“metrics”: null
}
}
}
}``

This reindexing task completed successfully. However the new dim_1 was not correctly indexed, as the task log said, that the dimension is not available.

This makes sense for me, as I suppose that only dimensions are stored in druid segments, which are indexed via the druid indexing task and nothing else which is part of the data stream, right? If this is correct however, I wonder how the reindexing of existing druid data should work at all with dimension changes in a segment? Is reindexing possible for datasources that were indexed in realtime?

Hi,

The ingest segment firehose can create new segments from existing segments. If you want to add new columns or do more complex modifications of data, the recommendation is that you rebuild segments directly from the raw data.