Hadoop based ingestion doesn't work with Google cloud deep storage

Hi,

I am using a single machine setup for druid 0.12.2.

The quickstart tutorial runs perfectly. I am able to ingest and query the data.

However, when I change the deep storage from local to google, the ingestion doesn’t upload the data. Below are the changes I made to the common.runtime.properties file

druid.extensions.loadList=[“druid-google-extensions”]

druid.storage.type=googledruid.google.bucket=druid-deep
druid.google.prefix=test/

I made no changes in the task index file wikiticker-2015-09-12-sampled.json.gz

I have used the same Google deep storage config with native indexing and it works. Below is the error when I try to use hadoop based indexing.

2018-10-04T22:53:46,313 INFO [pool-29-thread-1] io.druid.segment.StringDimensionMergerV9 - Completed dim[user] inverted with cardinality[10,531] in 107 millis.
2018-10-04T22:53:46,319 INFO [pool-29-thread-1] io.druid.segment.IndexMergerV9 - Completed index.drd in 3 millis.
2018-10-04T22:53:46,331 INFO [pool-29-thread-1] io.druid.java.util.common.io.smoosh.FileSmoosher - Created smoosh file [/home/aaankitgupta/druid-0.12.2/var/tmp/base8753711025009482054flush/merged/00000.smoosh] of size [5534433] bytes.
2018-10-04T22:53:46,386 INFO [communication thread] org.apache.hadoop.mapred.LocalJobRunner - reduce > reduce
2018-10-04T22:53:46,420 INFO [Thread-68] org.apache.hadoop.mapred.LocalJobRunner - reduce task executor complete.
2018-10-04T22:53:46,426 WARN [Thread-68] org.apache.hadoop.mapred.LocalJobRunner - job_local1003964568_0002
java.lang.Exception: java.io.IOException: No FileSystem for scheme: gs
	at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) ~[hadoop-mapreduce-client-common-2.7.3.jar:?]
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529) [hadoop-mapreduce-client-common-2.7.3.jar:?]
Caused by: java.io.IOException: No FileSystem for scheme: gs
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) ~[hadoop-common-2.7.3.jar:?]
	at io.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:726) ~[druid-indexing-hadoop-0.12.2.jar:0.12.2]
	at io.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:500) ~[druid-indexing-hadoop-0.12.2.jar:0.12.2]
	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) ~[hadoop-mapreduce-client-core-2.7.3.jar:?]
	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) ~[hadoop-mapreduce-client-core-2.7.3.jar:?]
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) ~[hadoop-mapreduce-client-core-2.7.3.jar:?]
	at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319) ~[hadoop-mapreduce-client-common-2.7.3.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
2018-10-04T22:53:46,694 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job -  map 100% reduce 100%
2018-10-04T22:53:46,694 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_local1003964568_0002 failed with state FAILED due to: NA
2018-10-04T22:53:46,700 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Counters: 30
	File System Counters
		FILE: Number of bytes read=43256734
		FILE: Number of bytes written=51878497
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=39244
		Map output records=39244
		Map output bytes=16736001
		Map output materialized bytes=16892983
		Input split bytes=315
		Combine input records=0
		Combine output records=0
		Reduce input groups=1
		Reduce shuffle bytes=16892983
		Reduce input records=39244
		Reduce output records=0
		Spilled Records=78488
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=360
		Total committed heap usage (bytes)=2015363072
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=0
	File Output Format Counters
		Bytes Written=8
2018-10-04T22:53:46,705 INFO [task-runner-0-priority-0] io.druid.indexer.JobHelper - Deleting path[var/druid/hadoop-tmp/wikiticker/2018-10-04T225318.382Z_1ceab83a50da40168b409292ed2ba1e2]
2018-10-04T22:53:46,727 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[AbstractTask{id='index_hadoop_wikiticker_2018-10-04T22:53:18.321Z', groupId='index_hadoop_wikiticker_2018-10-04T22:53:18.321Z', taskResource=TaskResource{availabilityGroup='index_hadoop_wikiticker_2018-10-04T22:53:18.321Z', requiredCapacity=1}, dataSource='wikiticker', context={}}]
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
	at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:222) ~[druid-indexing-service-0.12.2.jar:0.12.2]
	at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:238) ~[druid-indexing-service-0.12.2.jar:0.12.2]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:444) [druid-indexing-service-0.12.2.jar:0.12.2]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:416) [druid-indexing-service-0.12.2.jar:0.12.2]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:219) ~[druid-indexing-service-0.12.2.jar:0.12.2]
	... 7 more
Caused by: io.druid.java.util.common.ISE: Job[class io.druid.indexer.IndexGeneratorJob] failed!
	at io.druid.indexer.JobHelper.runJobs(JobHelper.java:391) ~[druid-indexing-hadoop-0.12.2.jar:0.12.2]
	at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:95) ~[druid-indexing-hadoop-0.12.2.jar:0.12.2]
	at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:293) ~[druid-indexing-service-0.12.2.jar:0.12.2]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:219) ~[druid-indexing-service-0.12.2.jar:0.12.2]
	... 7 more
2018-10-04T22:53:46,737 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_hadoop_wikiticker_2018-10-04T22:53:18.321Z] status changed to [FAILED].
2018-10-04T22:53:46,740 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_hadoop_wikiticker_2018-10-04T22:53:18.321Z",
  "status" : "FAILED",
  "duration" : 19899
}
2018-10-04T22:53:46,749 INFO [main] io.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void io.druid.server.listener.announcer.ListenerResourceAnnouncer.stop()] on object[io.druid.query.lookup.LookupResourceListenerAnnouncer@5a30722c].
2018-10-04T22:53:46,749 INFO [main] io.druid.curator.announcement.Announcer - unannouncing [/druid/listeners/lookups/__default/http:druid-dummy.c.agupta292-terraform.internal:8100]
2018-10-04T22:53:46,768 INFO [main] io.druid.server.listener.announcer.ListenerResourceAnnouncer - Unannouncing start time on [/druid/listeners/lookups/__default/http:druid-dummy.c.agupta292-terraform.internal:8100]
2018-10-04T22:53:46,768 INFO [main] io.druid.server.initialization.jetty.JettyServerModule - Stopping Jetty Server...
2018-10-04T22:53:46,773 INFO [main] org.eclipse.jetty.server.AbstractConnector - Stopped ServerConnector@6185f44{HTTP/1.1,[http/1.1]}{0.0.0.0:8100}
2018-10-04T22:53:46,776 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Stopped o.e.j.s.ServletContextHandler@4fb64a52{/,null,UNAVAILABLE}
2018-10-04T22:53:46,779 INFO [main] io.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void io.druid.indexing.worker.executor.ExecutorLifecycle.stop() throws java.lang.Exception] on object[io.druid.indexing.worker.executor.ExecutorLifecycle@1e01b133].





My task JSON is as follows:

{
“type” : “index_hadoop”,
“spec” : {
“ioConfig” : {
“type” : “hadoop”,
“inputSpec” : {
“type” : “static”,
“paths” : “quickstart/wikiticker-2015-09-12-sampled.json.gz”
}
},
“dataSchema” : {
“dataSource” : “wikiticker”,
“granularitySpec” : {
“type” : “uniform”,
“segmentGranularity” : “day”,
“queryGranularity” : “none”,
“intervals” : [“2015-09-12/2015-09-13”]
},
“parser” : {
“type” : “hadoopyString”,
“parseSpec” : {
“format” : “json”,
“dimensionsSpec” : {
“dimensions” : [
“channel”,
“cityName”,
“comment”,
“countryIsoCode”,
“countryName”,
“isAnonymous”,
“isMinor”,
“isNew”,
“isRobot”,
“isUnpatrolled”,
“metroCode”,
“namespace”,
“page”,
“regionIsoCode”,
“regionName”,
“user”
]
},
“timestampSpec” : {
“format” : “auto”,
“column” : “time”
}
}
},
“metricsSpec” : [
{
“name” : “count”,

          "type" : "count"
        },
        {
          "name" : "added",
          "type" : "longSum",
          "fieldName" : "added"
        },
        {
          "name" : "deleted",
          "type" : "longSum",
          "fieldName" : "deleted"
        },
        {
          "name" : "delta",
          "type" : "longSum",
          "fieldName" : "delta"
        },
        {
          "name" : "user_unique",
          "type" : "hyperUnique",
          "fieldName" : "user"
        }
      ]
    },
    "tuningConfig" : {
      "type" : "hadoop",
      "partitionsSpec" : {
        "type" : "hashed",
        "targetPartitionSize" : 5000000
      },
      "jobProperties" : {}
    }
  }
}

Is the gcs-connector.jar available in your hadoop cluster? And does core-site.xml contain the correct configuration for hadoop? Something like: https://gist.github.com/erikdubbelboer/ef724d68b77c234a913058d903d2bcf9 ? If you are using Google Dataproc those should all be set correctly, if not you’ll have to do this yourself.

Hi Erik,

Thanks for those recommendations.

However, I want to clear that I am not running a separate hadoop instance. I am simply using the hadoop indexing example that ships with druid-0.12.3 and trying to read an avro file from GCS instead of reading a JSON from quickstart/ directory.

Also, where can I find the gcs-connector.jar file?

Appreciate the help!!!

Thanks,

Ankit

Could it be this?

https://cloud.google.com/dataproc/docs/concepts/connectors/install-storage-connector

Yes that is where you can get it. You seems to be running Hadoop somehow seeing as its a hadoop job and your log contains lines from hadoop processes.

Thank Erik. That worked. :slight_smile: