RealtimeIndexTask & VersioningPolicy

I’m attempting to use the realtime indexing task with the “custom” versioning policy to try to create multiple segments for the same interval.

As of now, the RealtimeIndexTask ignores whatever versioningPolicy you pass in the tuning config and overrides it with one that locks the interval and uses the lock version as the segment version:

https://github.com/druid-io/druid/blob/druid-0.10.1/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java#L260

As far as I can see, this code is unneeded due to the RealtimePlumber using the “locking” data segment announcer which locks the same interval (by the same task) that this versioning policy does:

https://github.com/druid-io/druid/blob/druid-0.10.1/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java#L727

I’ve removed this locally and it works as I think it should but I’m curious if this code is actually needed and if so, what its purpose is.

I’m happy to submit a PR with these changes if its not needed.

Thanks.

The realtime index task is using the indexing service locking API to determine a version, so it ignores the policy passed in by a user. The history behind this is that the versioning policies were used for realtime nodes, which make all of their own decisions completely independently; but realtime tasks delegate some of their decision-making to the overlord in order to co-exist nicer with other tasks for the same dataSource.

One important thing in the “playing nice” is that tasks are expected to use a version that is no higher than the one provided by the locking API. If they do use a higher one, it can accidentally subvert work done by other tasks.

Interesting, thanks for the context.

Are you interested in altering the RealtimeIndexTask so that it accepts a versioningPolicy if passed, or otherwise default to the existing locking policy that is currently in place?

We’d like to be able to create realtime index tasks for the same segment but the locking versioning policy is creating segments with multiple versions which means that they overshadow one another.

Are you using tranquility to create the realtime tasks, or are you doing it manually?

I think in general, the way to go would be to have the realtime tasks use the SegmentAllocate API just like the kafka index tasks, although I’m not sure how much effort this would be.

I’m creating the tasks manually.

If we did use the SegmentAllocate API in the RealtimeIndexTask, I’m not sure that will help with being able to specify a version for the segments created. If you’re saying that the “task lock” version needs to be used in order to prevent contention with other tasks then wouldn’t the same issue exist no matter how the segments are provisioned?

The SegmentAllocate API will select a version that matches the other pre-existing segments, which is ok since it’s less than the version from the lock. I guess the idea isn’t that the caller would specify a version, the idea is that the allocate api will determine a version such that the new segment can be added to a pre-existing time range.

I’ll take a crack at having the RealtimePlumber use the SegmentAllocate API. At first glance, it may be a bit sticky due to the tight integration of the “segment tracking” in the FiniteAppenderatorDriver but I think its doable.

What is your preferred method to collaborate on this? Keep it here on the mailing list? Submit a GH issue for further discussion? Submit a PR when ready and continue from there?

Thanks

Would you be able to shed some light on the purpose of the sequence name that is used when allocating segments? What are the assumptions around using that?

Submitted a PR for this: https://github.com/druid-io/druid/pull/4774