Ingest Data from Google Cloud Bucket

Hello,

I've having trouble getting data from a Google Cloud bucket into my quickstart Druid setup.

I have added successfully pull-deps druid-google-extensions and added the GCS Hadoop connector to the class-path. In my common settings I have:

druid.storage.type=google

druid.google.bucket=BUCKET_NAME

druid.google.prefix=SUB_BUCKET/segments


When I try and ingest some data using batch:

“ioConfig”:{

“type”:“hadoop”,

“inputSpec”:{

“type”:“static”,

“paths”:“gs://BUCKET_NAME/SUB_BUCKET/test.json”

}


I get the following error:

Caused by: java.lang.RuntimeException: java.io.IOException: Must supply a value for configuration setting: fs.gs.project.id

Any ideas? I've read up on the core-site.xml but I'm unsure about how to set that up - what settings do I use for my fs.default.name? I haven't spun up a cluster...

Thanks!

The Google Cloud Storage adapter for Hadoop expect you to set certain properties in your hadoop config.

We use something like this: https://gist.github.com/erikdubbelboer/ef724d68b77c234a913058d903d2bcf9

Hello,

Thanks! I’m not sure where/what to set the following parameters:
fs.default.name
fs.defaultFS
fs.gs.system.bucket

I have not spun up a separate hadoop cluster - is this needed?

Hello,

I wanted to thank you for your help - I got it to index a small json sitting in my bucket. I simply excluded those fields from my core-site.xml and it worked - though it’s taking a long time, I’ll report back!

Hello,

I am struggling to understand where the Hadoop process is running for the indexing job. Is it running concurrently with Druid on my single-machine? Is it being spun up with Dataproc and running on Google’s side?

I ask this because my data is large and I’m going to start trying to optimize the indexing/ingestion process and I’m a bit unclear as to what is happening!

Thanks!

Clark

The hadoop process is running on your Hadoop cluster, so that depends on where you set that up. Druid doesn’t have the ability to start a Dataproc cluster itself but if you have a Dataproc cluster you can have Druid run it’s indexing task on there. We start a new Dataproc cluster each time we want to run an index task and kill the dataproc cluster again when the task is done (unless we immediately start another task). But his is done by a process outside of Druid that isn’t open source. But Google has a good API so it’s really not that hard to reproduce.

Thanks Erik! I’ve got a dataproc cluster span up and initialized the dataproc cluster with the jackson jars and finally copied the dataproc .xmls to the druid _common. When I attempt to digest data I get the follow error:

java.net.UnknownHostException: spark-cluster-m

I can ssh/ping that host using the IP but cannot with the hostname - I'm not very experienced with Google Cloud - is there some issue with my IP/hostname for the dataproc cluster?
My druid instances are also on GC and I have no problem ssh/pinging with both IP and hostname.

Thanks!

Clark

I’ve managed to fix my previous issue by adding the IP of the master node to my etc/hosts file. Now when I run the job I get the following error:

Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;

Any ideas?

I remember I had that error as well when some Druid and Hadoop libraries mismatch. What we do is we actually provide Dataproc with an initialization script that copies some of the Druid jars to Hadoop.
The gist of it can be found here: https://gist.github.com/erikdubbelboer/9c0e5ac6465c12f7925f823a2b110c10
The build.sh script builds a tar with Druid and all the libs and config we require to distribute to all our servers.
The dataproc.sh script is an example of how we used to start Dataproc. At the moment we use the HTTP API for this but it’s not really any different.
We always build our own Druid jars because we sometimes do modifications but at the moment we are running the exact 0.11.0 release.

Thanks again Erik!

I’ve made the settings you suggested - now I get a very odd error:

Caused by: java.lang.RuntimeException: java.nio.file.AccessDeniedException: /hadoop_gcs_connector_metadata_cache

Any ideas? I've tried giving unlimited permissions to that folder on my dataproc cluster.

Thanks!

How did you start your dataproc cluster. When I start one that directory is automatically created:

erik@hadoop-eu-1-w-0:~$ ls -hal /hadoop_gcs_connector_metadata_cache
total 8.0K
drwxrwxr-x 2 gcsadmin hadoop 4.0K Dec 23 03:14 .
drwxr-xr-x 24 root root 4.0K Dec 23 03:15 …

Same, it’s a curious error!

How are you submitting your job exactly?

druid-libs.sh:
mkdir /tmp/initialization

gsutil -m rsync -r -d gs://BUCKET_NAME/druid-hadoop/ /tmp/initialization/

rm /usr/lib/hadoop-mapreduce/jackson--2..jar

cp /tmp/initialization/* /usr/lib/hadoop-mapreduce/

Provision script:
#!/bin/bash

source config.sh

set -x

set -e

$GCLOUD dataproc clusters create $CLUSTER_NAME \

–worker-machine-type n1-standard-8 \

–master-machine-type n1-standard-8 \

–num-workers 8 \

-z $ZONE \

–image-version 1.1 \

–bucket=BUCKET_NAME \

–initialization-actions gs://BUCKET_NAME/druid-hadoop/druid-libs.sh \

And how do you start the Hadoop Job?

curl -X ‘post’ OVERLORDNODE:OVERLORDPORT/druid/indexer/v1/task \

-H “Accept: application/json” \

-H “Content-Type:application/json” \

–data @<(cat <<EOF

{

“type”:“index_hadoop”,

“spec”:{

“ioConfig”:{

“type”:“hadoop”,

“inputSpec”:{

“type”:“static”,

“paths”:"${dirContents}"

}

},

“dataSchema”:{

“dataSource”:“timeseries”,

“parser”:{

“type”:“string”,

“parseSpec”:{

“format”:“json”,

“timestampSpec”: {

“format”: “yyyy-MM-dd”,

“column”: “date”

},

“dimensionsSpec”:{

“dimensions”:[

“id”,

“country”

],

“dimensionExclusions”:[

],

“spatialDimensions”:[

]

}

}

},

“metricsSpec”:[

{

“type”:“doubleSum”,

“name”:“streams”,

“fieldName”:“streams”

},

{

“type”:“doubleSum”,

“name”:“streams_30s”,

“fieldName”:“streams_30s”

},

{

“type”:“doubleSum”,

“name”:“streams_50pct”,

“fieldName”:“streams_50pct”

},

{

“type”:“doubleSum”,

“name”:“streams_75pct”,

“fieldName”:“streams_75pct”

}

],

“granularitySpec”:{

“segmentGranularity”:“DAY”,

“queryGranularity”:“NONE”,

“intervals”:[

“2017-11-23/2017-11-26”

]

},

“tuningConfig”: {

“type”: “hadoop”,

“partitionsSpec”: {

“type”: “hashed”,

“targetPartitionSize”: 5000000

},

“jobProperties”: {“mapreduce.job.user.classpath.first”: “true”}

}

},

“hadoopDependencyCoordinates”: [

“org.apache.hadoop:hadoop-client:2.7.3”,

“org.apache.hadoop:hadoop-hdfs:2.7.3”

]

}

}

EOF

)

And your middlemanager instance also has all the correct hadoop settings? What we did is we copied the settings from a running Dataproc instance into the Druid settings. So copy /etc/hadoop/conf/* from a Dataproc master instance to config/hadoop/ in Druid. Also maybe you can try Dataproc image-version 1.2?

Thanks Eric, I’ll try that and report back.

I originally had dataproc version 1.2 but was under the impression that the Hadoop version in data-proc 1.2 was newer than 2.7.3. Is there a way to get druid to use a newer version of Hadoop?

We use Dataproc 1.2 with Druid 0.11.0 without any modifications.

Hi,
I’ve gone to dataproc 1.2 with druid 0.11.0 and I’m back to my earlier error:

Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;
2018-01-04T23:29:45,712 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1515085861113_0001_m_000026_2, Status : FAILED

This error repeats many times and then the whole job fails. Starting to question my sanity...

Here's what I've done:
Provisioned dataproc cluster as mentioned above (now version 1.2) (copied over druids jackson libs).
Installed druid 0.11.0 (straight off website, no rebuild!) on my druid cluster computers and copy hadoop-connector then pull-deps for google-extensions.
Copied over the dataproc hadoop config files (/etc/hadoop/conf on dataproc to druid/conf/_common on druid).
Start zookeeper and druid.
Attempted to ingest data and get error.

Here's my _common/common_runtime_properties in case that helps.

druid.extensions.loadList=[“druid-google-extensions”]

druid.startup.logging.logProperties=true

druid.zk.service.host=HOST_IP

druid.zk.paths.base=/druid

druid.metadata.storage.type=derby

druid.metadata.storage.connector.connectURI=jdbc:derby://HOST_IP:1527/var/druid/metadata.db;create=true

druid.metadata.storage.connector.host=HOST_IP

druid.metadata.storage.connector.port=1527

For GCP:

druid.storage.type=google

druid.google.bucket=BUCKET_NAME

druid.google.prefix=timeseries-test/segments

mapreduce.job.user.classpath.first=true

druid.indexer.logs.type=google

druid.indexer.logs.bucket=BUCKET_NAME

druid.indexer.logs.prefix=timeseries-test/indexing-logs

druid.selectors.indexing.serviceName=druid/overlord

druid.selectors.coordinator.serviceName=druid/coordinator

druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”]

druid.emitter=logging

druid.emitter.logging.logLevel=info

druid.indexing.doubleStorage=double