Hadoop indexer path issue

Hi All ,

I’m trying Hadoop indexer with Hour granularity , here is my granularity input spec

"inputSpec": {

"type": “granularity”,

"dataGranularity": “HOUR”,

"inputPath": “s3n://druid-dev-test/json/click”,

"filePattern": "\\

here is my data @ s3

aws s3 ls s3://druid-dev-test/json/click/y=2016/m=04/d=28/H=00/

2016-05-09 18:00:22 0 _SUCCESS

2016-05-09 17:59:24 84824694 part-r-00000-8b75f689-9a2c-4757-8f6d-28e24cfc9adf

2016-05-09 17:59:27 697737515 part-r-00001-8b75f689-9a2c-4757-8f6d-28e24cfc9adf

2016-05-09 17:59:42 700122162 part-r-00002-8b75f689-9a2c-4757-8f6d-28e24cfc9adf

2016-05-09 18:00:05 701576673 part-r-00003-8b75f689-9a2c-4757-8f6d-28e24cfc9adf

At the indexer log I see path error …, not sure If I’m missing anything here obvious .

**2016-05-10T22:45:28,735 INFO [task-runner-0] io.druid.indexer.path.GranularityPathSpec - Checking path[s3n://druid-dev-test/json/click/y=2016/m=04/d=28/H=00]
2016-05-10T22:45:29,961 INFO [task-runner-0] io.druid.indexer.path.GranularityPathSpec - Checking path[s3n://druid-dev-test/json/click/y=2016/m=04/d=28/H=00]**
2016-05-10T22:45:30,104 INFO [task-runner-0] org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at /0.0.0.0:8032
2016-05-10T22:45:30,211 WARN [task-runner-0] org.apache.hadoop.mapreduce.JobSubmitter - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2016-05-10T22:45:30,216 WARN [task-runner-0] org.apache.hadoop.mapreduce.JobSubmitter - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
2016-05-10T22:45:30,351 INFO [task-runner-0] org.apache.hadoop.mapreduce.JobSubmitter - Cleaning up the staging area /tmp/hadoop-yarn/staging/biswajit/.staging/job_1462902125229_0023
2016-05-10T22:45:30,355 WARN [task-runner-0] org.apache.hadoop.security.UserGroupInformation - PriviledgedActionException as:biswajit (auth:SIMPLE) cause:java.io.IOException: No input paths specified in job
2016-05-10T22:45:30,356 ERROR [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_click_2016-05-10T22:45:20.668Z, type=index_hadoop, dataSource=click}]
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
	at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:138) ~[druid-indexing-service-0.8.3.jar:0.8.3]
	at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:173) ~[druid-indexing-service-0.8.3.jar:0.8.3]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:285) [druid-indexing-service-0.8.3.jar:0.8.3]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:265) [druid-indexing-service-0.8.3.jar:0.8.3]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_73]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_73]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_73]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73]
	at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:135) ~[druid-indexing-service-0.8.3.jar:0.8.3]
	... 7 more
**Caused by: java.lang.RuntimeException: java.io.IOException: No input paths specified in job**

Please share your ingestion spec.

Also, I don’t see a pathFormat in your inputSpec.

http://druid.io/docs/0.9.0/ingestion/batch-ingestion.html

HI Fengjin ,

Thank you for the mail . After going through the source code I got some clue and was able to make it work , seem like inputsplitter is not exactly taking hadoop globing pattern .

Once I added “filePattern”: ".*” it started working … I have enclosed the spec below which I’m submitting as task …

I have two more questions …may be its naive …please bear with me :slight_smile:

  1. My batch would run hourly and I’m little confused what should go to hadoop_config and what should be supplied during task submission…what I understood so far …hadoop config would be … what I’m going to use during hadoop indexer startup ….is kind of super set and main spec definition ….and I can keep this as long running …. like below

io.druid.cli.Main index hadoop hadoop_batch_config.json

  1. Every hour when I submit a new task I have to submit with that hour interval time like below , so in a sense task submission interval would be dynamic based on particular hour …Can I ignore passing all dimension and metrics during each task submission again ??

"intervals": [“2016-04-28T00:00:00Z/2016-04-28T00:00:05Z”]

  1. I’m planning to use in input source path as s3 and “workingPath”: as HDFS …do you see any issue … does s3a works ? … for me it was giving some weired error …I was able to make work with s3n

hadoop_config.json

{

“type”: “index_hadoop”,

“spec”: {

“dataSchema”: {

“dataSource”: “click”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: [

],

“dimensionExclusions”: ,

“spatialDimensions”:

}

}

},

“metricsSpec”: [

{

“type”: “longSum”,

“name”: “total_count”,

“fieldName”: “total_count”

},

{

“type”: “hyperUnique”,

“name”: “unique_count”,

“fieldName”: “fingerprint_id”

}

],

“granularitySpec”: {

“segmentGranularity”: “HOUR”,

“queryGranularity”: “HOUR”

}

},

“ioConfig”: {

“type”: “hadoop”,

“inputSpec”: {

“type”: “granularity”,

“dataGranularity”: “HOUR”,

“inputPath”: “s3://druid-dev-test/json/click”,

“filePattern”: “.*”

},

“metadataUpdateSpec”: {

“type”: “mysql”,

“connectURI”: ,

“user”: “xxx”,

“password”: “xx”,

“segmentTable”: “druid_segments”

},

“segmentOutputPath”: “/tmp/segments”

},

“tuningConfig”: {

“type”: “hadoop”,

“workingPath”: “hdfs://localhost:9000/tmp/segments”,

“partitionsSpec”: {

“type”: “hashed”,

“targetPartitionSize”: 5000000

},

“jobProperties”: {

“fs.s3.awsAccessKeyId”: “xx”,

“fs.s3.awsSecretAccessKey”: “jxx”,

“fs.s3.impl”: “xx”,

“fs.s3n.awsAccessKeyId”: “xxx”,

“fs.s3n.awsSecretAccessKey”: “xxxx”,

“fs.s3n.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,

“io.compression.codecs”: “org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec”

}

},

“hadoopDependencyCoordinates”: {}

}

}

Hadoop_Task son

{

“type”: “index_hadoop”,

“spec”: {

“dataSchema”: {

“dataSource”: “click”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: [

“event_name”,

“xxx”.

“yyy"

],

“dimensionExclusions”: ,

“spatialDimensions”:

}

}

},

“metricsSpec”: [

{

“type”: “longSum”,

“name”: “total_count”,

“fieldName”: “total_count”

},

{

“type”: “hyperUnique”,

“name”: “unique_count”,

“fieldName”: “fingerprint_id”

}

],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “HOUR”,

“queryGranularity”: “NONE”,

“intervals”: [“2016-04-28T00:00:00Z/2016-04-28T00:00:05Z”]

}

},

“ioConfig”: {

“type”: “hadoop”,

“inputSpec”: {

“type”: “granularity”,

“dataGranularity”: “HOUR”,

“inputPath”: “s3://druid-dev-test/json/click”,

“filePattern”: “.*”

}

},

“tuningConfig”: {

“type”: “hadoop”,

“partitionsSpec”: {

“targetPartitionSize”: 5000000

}

}, “jobProperties”: {

“fs.s3.awsAccessKeyId”: “xx",

“fs.s3.awsSecretAccessKey”: “xx",

“fs.s3.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,

“fs.s3n.awsAccessKeyId”: xx",

“fs.s3n.awsSecretAccessKey”: “xx",

“fs.s3n.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,

“io.compression.codecs”: “org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec”

}

}

}

Hi Biswajit,

Have you tried reading:

http://druid.io/docs/0.9.0/tutorials/tutorial-batch.html

It has some good information about what you need to update for batch ingestion.

Thank you for reply , I was able to build task and run hadoop indexer task for different time granularity . I was wondering if there is any API to create task . I want to create task part of my batch pipe line code flow . I have noticed tranquility uses some class to build task part of code …may be I’ll borrow the same …

~ Biswajit

Also , although I see index status as success and I can see data at deep storage . However , I’m seeing error like below at indexer log , can I ignore this error , how do I fix this ??

 { "id" : "index_hadoop_event_2016-05-17T07:44:19.453Z",
  "status" : "SUCCESS",
  "duration" : 110748
}

***index erro ***

io.druid.indexing.worker.executor.ExecutorLifecycle.stop() throws java.lang.Exception] on object[io.druid.indexing.worker.executor.ExecutorLifecycle@5fe5c68b]
java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_91]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_91]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_91]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_91]
	at com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler.stop(Lifecycle.java:337) [java-util-0.27.4.jar:?]
	at com.metamx.common.lifecycle.Lifecycle.stop(Lifecycle.java:261) [java-util-0.27.4.jar:?]
	at io.druid.cli.CliPeon$2.run(CliPeon.java:241) [druid-services-0.8.3.jar:0.8.3]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.nio.channels.ClosedChannelException
	at sun.nio.ch.FileLockImpl.release(FileLockImpl.java:58) ~[?:1.8.0_91]
	at io.druid.indexing.worker.executor.ExecutorLifecycle.stop(ExecutorLifecycle.java:220) ~[druid-indexing-service-0.8.3.jar:0.8.3]
	... 8 more
2016-05-17T07:46:20,647 INFO [Thread-30] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void io.druid.indexing.overlord.ThreadPoolTaskRunner.stop()] on object[io.druid.indexing.overlord.ThreadPoolTaskRunner@7c3c3d67].
2016-05-17T07:46:20,647 INFO [Thread-30] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void io.druid.curator.discovery.ServerDiscoverySelector.stop() throws java.io.IOException] on object[io.druid.curator.discovery.ServerDiscoverySelector@3ea48c37].
2016-05-17T07:46:20,647 ERROR [Thread-30] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Exception when stopping method[public void io.druid.curator.discovery.ServerDiscoverySelector.stop() throws java.io.IOException] on object[io.druid.curator.discovery.ServerDiscoverySelector@3ea48c37]
java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_91]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_91]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_91]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_91]
	at com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler.stop(Lifecycle.java:337) [java-util-0.27.4.jar:?]
	at com.metamx.common.lifecycle.Lifecycle.stop(Lifecycle.java:261) [java-util-0.27.4.jar:?]
	at io.druid.cli.CliPeon$2.run(CliPeon.java:241) [druid-services-0.8.3.jar:0.8.3]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.IllegalStateException: Already closed or has not been started
	at com.google.common.base.Preconditions.checkState(Preconditions.java:176) ~[guava-16.0.1.jar:?]
	at org.apache.curator.x.discovery.details.ServiceCacheImpl.close(ServiceCacheImpl.java:104) ~[curator-x-discovery-2.9.1.jar:?]
	at org.apache.curator.x.discovery.details.ServiceProviderImpl.close(ServiceProviderImpl.java:78) ~[curator-x-discovery-2.9.1.jar:?]
	at io.druid.curator.discovery.ServerDiscoverySelector.stop(ServerDiscoverySelector.java:122) ~[druid-server-0.8.3.jar:0.8.3]
	... 8 more
2016-05-17T07:46:20,648 INFO [Thread-30] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void io.druid.curator.announcement.Announcer.stop()] on object[io.druid.curator.announcement.Announcer@18e4551].

Hey Biswajit,

If you use the indexing service for your batch ingestion, the overlord has an HTTP API that you can use to create your indexing tasks, there are details here: http://druid.io/docs/latest/design/indexing-service.html

If the task succeeded and the data is in deep storage you should be fine. The exception is related to a not-quite-clean shutdown.

Hi David ,

Thank you for the mail .

By api I mean task creation api ,I have build it using indexing-service.jar, I want to create indexing task programmitcally from my data pipeline .

is there any suggestion /better approach for backfill ?? like say if I have to load last one year data via Hadoop indexer with HOUR data granularity .

~Biswajit

Hi Biswajit,
There is no explicit api jar for tasks at present, you can either create your task json directly or add dependency on druid jars and use the task classes.