Doubt about internal Druid pipeline

Hi everyone,

We’re having trouble finding in the docs exactly where the data goes through when using the Hadoop indexing task.

From what we understand, and please correct me if I’m wrong: when we launch a task to the Overlord, this node submits a task to the Middle Manager, which generates a Peon which submits the task to a Hadoop cluster.

In the Hadoop cluster, there are two tasks:

  1. Determine partitions hashed
  2. Index generator
    What do these jobs actually do internally?

What and where is the output of these tasks?

From what we’ve gathered, these jobs perform the rollup of the original data, create the segments and drop these into our deep storage (S3). Is this correct?

From here, how does the Historical know which segments to download and keep in itself? I understand this is the coordinator’s job, through the appropriate set of rules we’ve decided to configure.

And finally, queries can be issued to the Broker, which will look for the appropriate segments in its cache, and in case they cannot be found, they will route the query to the Historical. The Broker merges the results and presents them to the client. Is this correct as well?

Thank you very much!

Joan

Hi everyone,

We’re having trouble finding in the docs exactly where the data goes through when using the Hadoop indexing task.

From what we understand, and please correct me if I’m wrong: when we launch a task to the Overlord, this node submits a task to the Middle Manager, which generates a Peon which submits the task to a Hadoop cluster.

In the Hadoop cluster, there are two tasks:

  1. Determine partitions hashed
  2. Index generator
    What do these jobs actually do internally?

Job one is used to figure out the partitions and bucketing strategy. In nutshell it will compute first the number of rows after rollup and based on that number we can compute the pseudo optimal number of mappers for the next job.

Job two, creates the druid segments

What and where is the output of these tasks?

Segments will be stored at the deep storage

From what we’ve gathered, these jobs perform the rollup of the original data, create the segments and drop these into our deep storage (S3). Is this correct?

yes

From here, how does the Historical know which segments to download and keep in itself? I understand this is the coordinator’s job, through the appropriate set of rules we’ve decided to configure.

as soon the second job is done metadata about the newly created segments is committed to the metadata storage (e.g. Mysql) the coordinator is watching that table and as soon new rows are in it will tell historical to load those segments via Zookeeper callbacks…

And finally, queries can be issued to the Broker, which will look for the appropriate segments in its cache, and in case they cannot be found, they will route the query to the Historical. The Broker merges the results and presents them to the client. Is this correct as well?

broker does not really cache segments but some query results. Broker will always query the historical unless it has a cached result (not segment)

Thanks Slim, these answers are very helpful in order to optimize the Hadoop cluster.

Regards,

Joan

By the way, regarding how data is stored in Deep Storage:

Is data always sent from Hadoop to Deep Storage in the shape of immutable shards?

What i mean is, when a segment/shard is uploaded to Deep Storage, is there some kind of merging process that modifies these files or are they always different and new as they are uploaded?

In case that there is some kind of post-processing, in which node does it happen?

Thanks!

Joan

By the way, regarding how data is stored in Deep Storage:

Is data always sent from Hadoop to Deep Storage in the shape of immutable shards?

Yes the segments are immutable.

What i mean is, when a segment/shard is uploaded to Deep Storage, is there some kind of merging process that modifies these files or are they always different and new as they are uploaded?

every job produces a new and unique set of segments, every Segment has a version that is can be used to determine which one is the most recent version.

In case that there is some kind of post-processing, in which node does it happen?

Not sure what you mean by post processing.

What I mean by post-processing is, if I initially run a batch indexing task, this will create a number of segments. Later, when I run a delta task, this data is appended to the previous one, but how is this reflected in Deep Storage? Are new segments created with a new version?

Thanks!

Joan

Hi Slim,

By the way, does this mean we must delete the segments from deep storage manually?

Joan