Running Druid batch ingestion tasks on a remote Google Dataproc cluster

Hi all,

I’m actually have a Druid cluster (Overlord, MiddleManagers, Coordinator and Brokers) that successfully performs a local batch ingestion of files located in a Google Storage bucket, and place the produced index segments in another Google Storage bucket. I’d like now to implement a new solution based on remote execution in an Hadoop cluster provided by Google Dataproc (https://cloud.google.com/dataproc/docs/). The Hadoop version present in Dataproc nodes is 2.3.0, the Druid version is 0.9.2.

Currently, batch ingestion of files is provided through the “druid-google-extensions” extension, so the produced segments are placed in a Google bucket as specified in the common.runtime.properties common configuration:

Deep storage

Cloudfiles storage configuration

druid.storage.type=google
druid.google.bucket=the-bucket
druid.google.prefix=the-path

and the json file for input batch ingestion is configured like:

“ioConfig” : {
“type” : “index”,
“firehose” : {
“type” : “ingestSegment”,
“paths” : “gs://another-bucket/file.gz”
}

Now, I’d like to obtain the same result, executing tasks on a remote Hadoop cluster. The idea is to use Dataproc Google clusters because is relatively easy (just a command) to create an Hadoop cluster already configured and ready to execute Hadoop jobs, and (of course) because our data is based on Google Storage.

I’ve already studied some strategies to implement remote execution of batch ingestions with Hadoop, and what I understand is to follow specifications in http://druid.io/docs/latest/ingestion/batch-ingestion.html, in particular in the section “Remote Hadoop Cluster”. So the idea is to copy all xml configuration of the Hadoop Dataproc cluster in _common/, and to properly adapt the json file for batch ingestion.

What I’d like to ask now to anyone has experience with the integration of Druid with remote Google Dataproc clusters is: druid.storage.type has to be configured to ‘hdfs’, and the Google bucket where to place the index segments has to be visible from the Hadoop FS, but can the Google firehose be continued to be used in ioConfig? Which is the alternative, to locally import the file in the Druid cluster, or use a single bucket as HDFS deep storage?

Hope to find someone was already able to implement this :slight_smile:

Regards,
Giuseppe.

We have been using Google Cloud Storage (GCS) and Dataproc for more than a year now. This was exactly the reason why I wrote the GCS support.

We have all of our raw CSV files and segments in GCS buckets. Once a day we programmatically create a new Dataproc cluster and index all CSV files from yesterday. After which we kill the Dataproc cluster again. This way we only have to pay for the actual minutes we use the cluster.

We always keep druid.storage.type set to google.

When building our Druid version we copied the Hadoop config from a Dataproc cluster to the Druid config (/etc/hadoop/conf/* from the dataproc master to druid/config/hadoop/) and modified this to point to the hostname we always use for our Dataproc cluster (mapreduce.jobhistory.address) (I guess this can also be set as arguments to the hadoop job you create).

To fix compatibility issues we actually have Dataproc use the Jackson libs from Druid. We use a script similar to this: https://gist.github.com/erikdubbelboer/196f28e274ed7363858d8e6b8d4a5356
So we have a Dataproc initialization action that just replaces the Dataproc Jackson libs with the ones from when we build our Druid version.

In the Druid Hadoop job we submit we use: mapreduce.job.user.classpath.first = true
And hadoopDependencyCoordinates to [“org.apache.hadoop:hadoop-client:2.7.3”]

Since all our raw CSV files are also stored in GCS We use the following ioConfig:

“ioConfig” : {
“type”: “hadoop”,
“inputSpec”: {
“type”: “static”,
“paths”: “gs://another-bucket/foo.gz,gs://another-bucket/bar.gz”
}

Which has been possible since https://github.com/druid-io/druid/pull/2645

Let me know if you need to know more.

Cheers,
Erik

Hi Erik,

First of all: thanks for your reply, really appreciated! :slight_smile:

Hi all,

I have finally been able to fix the issue, and to properly submit the job to the Dataproc cluster!

The procedure reported in my previous message works, but

Great that you got it working.

We do:

“tuningConfig” : {
“type” : “hadoop”,
“jobProperties”: {
“mapreduce.job.user.classpath.first”: “true”
}

And we leave mapreduce.job.classloader as the default. I also tried what you are doing but somehow never got that working. We use the default hadoop version in pom.xml so that might have something to do with it.

Hi Erik,