Reindexing data with IngestSegmentFirehose works first time, fails subsequent times

Hi,

I’m using kafka-indexing-service to ingest data from Kafka. To delete individual records from druid, I’m also firing off an indexing job with indexSegmentFirehose to ingest data without the data, filtered on an ID column. My observation is that the first time I fire off the indexing job, I can see the total count successfully decreasing from say, 150 to 100. But any subsequent indexing jobs fired to delete other ID’s for the same time intervals do not work. Any idea what might be happening?

In the logs for the second reindexing task(attached) I can see that 0 events were processed, whereas in the first task I can see that that the number of events processed is 50.

log.txt (82.2 KB)

I know that the data corresponding to the second indexing job is present in Druid because I can query it.

Hi guys,

Is this an expected situation? That the segments for a particular time interval can only be reindexed once? On the reindexing documentation on the Druid page, I can see that its mentioned we recommend to keep a copy of your raw data around if you need to ever reindex your data. Does that mean indexing from ingestSegmentFirehose is unreliable?

Any help will be appreciated.

The issue lies in the filters passed to ingestSegmentFirehose.

For context, this is what I’m trying to achieve. I’m trying to delete data from within segments by reindexing. For that I’m using an index task with IngestSegmentFirehose. My Index task looks like this

{

“type”: “index”,

“spec”: {

“dataSchema”: {

“dataSource”: “testData”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: ,

“dimensionExclusions”: ,

“spatialDimensions”:

}

}

},

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “DAY”,

“queryGranularity”: “NONE”,

“intervals”: [“2016-01-01T13:00:00.000Z/2019-02-21T13:56:52.889Z”]

}

},

“ioConfig”: {

“type”: “index”,

“firehose”: {

“type”: “ingestSegment”,

“dataSource”: “testData”,

“interval”: “2016-01-01T13:00:00.000Z/2019-02-21T13:56:52.889Z”,

“filter”: {

“type”: “not”,

“field”: {

“type”: “and”,

“fields”: [

{

“type”: “selector”,

“dimension”: “aId”,

“value”: “71F9BCD4-3DDE-48BC-84DE-89D7DB987B4E”

},

{

“type”: “selector”,

“dimension”: “bId”,

“value”: “DAB5009E-BF0E-430E-BF75-CF2E3A4FD739”

}

]

}

}

}

}

}

}

So first time this job works because the data which I’m deleting from the segments is intermixed with other data, so the filters in IngestSegmentFireHose actually have something to pick up. However, when there’s only one possible combination of (aId, bId) in the datasource, there’s nothing left for the firehose to reindex, so rather than dropping those, it just returns logs like these:

2019-03-07T08:57:05,700 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Dropping segments[[]]
2019-03-07T08:57:05,706 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.task.IndexTask - Pushed segments[[]]
2019-03-07T08:57:05,707 INFO [publish-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Nothing to publish, skipping publish step.
2019-03-07T08:57:05,707 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.task.IndexTask - Processed[0] events, unparseable[0], thrownAway[0].
2019-03-07T08:57:05,708 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.task.IndexTask - Published segments[]

Now, the question is, is there an attribute in IngestSegmentFirehose to drop/delete/ignore the events as set in the filters?

Is your issue that batch indexing doesn’t drop segments for intervals where there should be no data at all after the index run?

Yes, it is. I tried excluding all dimensions and metrics from the ingestSegmentFirehose while reindexing, but then all that remains is the time column. I need to drop that too.

I actually added a warning about this to the docs recently, because it surprised me too: https://github.com/apache/incubator-druid/pull/7046/files#diff-4c1b0fe70b6e47e926987b1c496035b7R130

I think it would be nice to be able to pass a flag to batch ingestion that causes it to fill any gaps inside the interval from your granularitySpec that aren’t covered by new segments with empty new segments, to achieve this goal.

For now, if you want to delete a segment that should entirely not exist, you can use the DELETE /druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId} HTTP API.