Does segment loading affect datasource availability for querying?


I’d like to gain a better understanding of the segment loading process in terms of the sequential steps that happen.

The context is that we seem to see that datasources become temporarily unavailable for querying while segments get loaded.

I would expect Druid to distinguish two scenarios:

1.) segments that were already loaded get lost (e.g. due to a historical node crash) and need to be re-provisioned.
In this case, I would understand and even expect that a datasource becomes unavailable for querying because showing no data is better than showing incorrect/incomplete data.

2.) newly arriving segments need to be loaded for the first time or existing segments get replaced with new versions or handed off to another tier.
In this case, I would expect Druid to load the new segments in the background while serving up queries against the timeline that doesn’t contain the new segments yet. Only after all new segments are available in Druid’s cache (i.e. have been loaded from deep storage) should Druid make them available for querying. During segment loading, I would expect the datasource to still be available for querying.

How are these two cases handled in Druid? Are my assumptions correct?

My current understanding of the segment loading process is such:
1.) Hadoop Batch Indexing job uploads newly produced Druid segments to deep storage and afterwards inserts the segment information into the Druid metastore
2.) Coordinator discovers the new metastore entries and announces them in zookeeper.
3.) Coordinator applies load rules to the newly arrived segments and decides which historicals are to load them, updates this information in Zookeeper (loadqueue section).
4.) Historicals listen to directives given in their zookeeper section (loadqueue) which contains directives to either load or drop segments.
5.) Historicals load segments and mark availabilty somewhere in Zookeeper. (segments section? )
6.) Broker keeps timeline of segments and decides which historical to query for which segments.

The part that I’m missing in my understanding, is who decides whether a datasource is available for querying (I’m assuming that’s the coordinator) and which policy is applied to make this decision. How does Druid distinguish between loading new segments and compensating for segments vanished that were already present?
Also, what is the meaning of these NOOP operations that appear in the historical logs? There’s LOAD/DROP and NOOP which I assume stand for NO-OPERATION.

Any enlightment would be more than welcome.
Most important to me would be to receive confirmation that a datasource does not become temporarily unavailable for querying in the situation #2 and to understand how Druid makes sure this doesn’t happen.

Thanks a lot

Yes you are correct datasource is available for querying during the time
new segments are loaded. Steps you have mentioned looks correct and segments are announced at /druid/segments/<segment_host> path.
did not quite get your question about who decides whether the datasource is available for querying. Segments are loaded/dropped as per
the configured Load/Drop rule per datasource, all the loaded segments are available for querying (except when they are overshadowed by newer version).
Druid Coordinator has various helpers that run periodically to maintain segments in the cluster. One of the helpers polls segment information about used segments (used field set to 1) periodically from metadata store. Another helper run user specified rules on each polled segment in order and the first rule that the segment satisfies is used to make decision about whether to load, drop or replicate the segment. Another helper looks for overshadowed and unused segments that should not exist in the cluster and drop them. information in metadata store is the source of truth for making these decisions.

Broker maintains a VersionedIntervalTimeline that holds information about all the loaded segments and knows which segment to query for which interval, that means it also handles the case when a segment is only partially overshadowed by a newer version thus the segment can be queried for some portion of its interval (non-overshadowed portion).

I am not sure about NOOP operations, may be posting some log lines will help. This is my understanding of how things work, I hope I did not confuse you.

  • Parag

thanks a lot Prag for this detailed explanation. This is very helpful.

You said that you didn’t quite get my question about who decides whether the datasource is available for querying.
I did observe that Druid sometimes marks a datasource as unavailable for querying if data that should be there is actually missing. Also, just by thinking about, this is what one would do intuitively: if one can determine that data is imcomplete, it is better to fail a query than to compute and return partial (and therefore wrong) results.
So if a datasource is missing data, Druid should not respond to a query, so I wondered how Druid distinguishes between the situation in which data is actually missing (e.g. because a node crashed and a certain interval of data isn’t present) and the situation in which data is kind of missing (e.g. segments that are to be used according to the metastore, but are not yet loaded by the respective historicals). I wondered how Druid behaves in this situation.

If segments get processed by the Hadoop Indexer, they are registered with the metastore and their used field is set to 1 and older segment versions are set to 0 if I’m not mistaken.
Therefore, the coordinator and broker know that this segment version is now normative and to be used, but it might take a while until a historical has loaded this new segment version. I was concerned that during this time, Druid might not serve up queries or pause queries because otherwise query results might be inconsistent: Imagine a situation in which some historicals have already loaded a new version of a segment while others haven’t.