Druid 0.11.0 ORC Indexing Issue

Hi,

My Druid Version : 0.11.0

I am trying to run indexing task on ORC data.

ORC data is generated through CDH5.12.0 Hive

with

ROW FORMAT SERDE

‘org.apache.hadoop.hive.ql.io.orc.OrcSerde’

STORED AS INPUTFORMAT
‘org.apache.hadoop.hive.ql.io.orc.OrcInputFormat’

OUTPUTFORMAT

‘org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat’

``

Ingestion Spec :

{

“type”: “index_hadoop”,

“spec”: {

“dataSchema”: {

“dataSource”: “data-source”,

“parser”: {

“type”: “orc”,

“parseSpec”: {

“format”: “timeAndDims”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “millis”

},

“dimensionsSpec”: {

“dimensions”: [

“dim1”,

“dim2”

]

}

},

“typeString”: “struct< dim1:string,dim2:string,timestamp:bigint>”

},

“metricsSpec”: [

{

“type”: “count”,

“name”: “count”

}

],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “fifteen_minute”,

“queryGranularity”: “second”,

“intervals”: [

“2019-10-03T00:00:00.000Z/2019-10-04T00:00:00.000Z”

]

}

},

“ioConfig”: {

“type”: “hadoop”,

“inputSpec”: {

“type”: “static”,

“inputFormat”: “org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat”,

“paths”: “s3n://s3-bucket/key/”

}

},

“tuningConfig”: {

“type”: “hadoop”,

“jobProperties”: {

“fs.defaultFS”: “hdfs://ip-x-x-x-x.ap-southeast-1.compute.internal:8020”,

“mapreduce.jobhistory.address”: “ip-x-x-x-x.ap-southeast-1.compute.internal:10020”,

“mapreduce.jobhistory.webapp.address”: “ip-x-x-x-x.ap-southeast-1.compute.internal:19888”,

“yarn.resourcemanager.ha.enabled”: false,

“yarn.web-proxy.address”: “ip-x-x-x-x.ap-southeast-1.compute.internal:20888”,

“yarn.resourcemanager.resource-tracker.address”: “ip-x-x-x-x.ap-southeast-1.compute.internal:8025”,

“yarn.resourcemanager.address”: “ip-x-x-x-x.ap-southeast-1.compute.internal:8032”,

“yarn.resourcemanager.scheduler.address”: “ip-x-x-x-x.ap-southeast-1.compute.internal:8030”

}

}

},

“hadoopDependencyCoordinates”: [

“org.apache.hadoop:hadoop-client:x.x.x-amzn-x”

]

}

``

Issue that I am facing is, mappers are getting timed after(cluster default timeout of 10Mins) this logs

2019-10-18 10:58:35,898 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Opening ‘s3n://s3-bucket/key/000000_0’ for reading

2019-10-18 10:58:35,644 INFO [main] org.apache.hadoop.hive.ql.io.orc.ReaderImpl: Reading ORC rows from s3n://s3-bucket/key/000000_0 with {include: null, offset: 3, length: 30380290}

2019-10-18 10:58:35,898 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Opening ‘s3n://s3-bucket/key/000000_0’ for reading

2019-10-18 10:58:35,898 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Opening 's3n://s3-bucket/key/000000_0' for reading

``

What’s weird is it’s working for files smaller than 20M. Only bigger files are getting timed out.

So my guess is somewhere in org.apache.hadoop.fs.s3native.NativeS3FileSystem because of some issue its slowing down the reading for ORC ingestion in particular, because all other ingestion type are working.
So I introduced some logs in few org.apache.hadoop.fs.s3native classes, so here is logs of what I observed
2019-10-18 10:58:41,009 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> From read(byte b, int off, int len)

2019-10-18 10:58:41,009 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> Reading…

2019-10-18 10:58:41,009 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:58:41,010 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '0

2019-10-18 10:58:41,010 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:58:41,010 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 0

2019-10-18 10:58:46,220 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:58:46,220 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '24131

2019-10-18 10:58:46,220 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:58:46,220 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 24131

2019-10-18 10:58:51,363 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> From read(byte b, int off, int len)

2019-10-18 10:58:51,363 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> Reading…

2019-10-18 10:58:51,363 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:58:51,363 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '0

2019-10-18 10:58:51,363 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:58:51,363 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 0

2019-10-18 10:58:56,644 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:58:56,644 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '40478

2019-10-18 10:58:56,644 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:58:56,644 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 40478

2019-10-18 10:59:00,947 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> From read(byte b, int off, int len)

2019-10-18 10:59:00,947 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> Reading…

2019-10-18 10:59:00,948 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:00,948 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '0

2019-10-18 10:59:00,948 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:00,948 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 0

2019-10-18 10:59:03,848 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:03,849 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '56825

2019-10-18 10:59:03,849 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:03,849 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 56825

2019-10-18 10:59:07,221 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> From read(byte b, int off, int len)

2019-10-18 10:59:07,221 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> Reading…

2019-10-18 10:59:07,221 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:07,221 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '0

2019-10-18 10:59:07,221 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:07,221 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 0

2019-10-18 10:59:11,326 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:11,326 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '73172

2019-10-18 10:59:11,326 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:11,326 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 73172

2019-10-18 10:59:15,400 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> From read(byte b, int off, int len)

2019-10-18 10:59:15,400 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> Reading…

2019-10-18 10:59:15,401 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:15,401 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '0

2019-10-18 10:59:15,401 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:15,401 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 0

2019-10-18 10:59:19,304 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:19,304 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '89519

2019-10-18 10:59:19,305 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:19,305 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 89519

2019-10-18 10:59:24,133 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> From read(byte b, int off, int len)

2019-10-18 10:59:24,153 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> Reading…

2019-10-18 10:59:24,153 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:24,153 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '0

2019-10-18 10:59:24,154 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:24,155 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 0

2019-10-18 10:59:29,954 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:29,954 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '105866

2019-10-18 10:59:29,955 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:29,955 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 105866

2019-10-18 10:59:35,456 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> From read(byte b, int off, int len)

2019-10-18 10:59:35,456 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ---------> Reading…

2019-10-18 10:59:35,456 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:35,456 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '0

2019-10-18 10:59:35,456 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:35,456 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 0

2019-10-18 10:59:38,614 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: ----------> From Seek

2019-10-18 10:59:38,614 INFO [main] org.apache.hadoop.fs.s3native.NativeS3FileSystem: Reopening key ‘key/000000_0’ for reading at position '122213

2019-10-18 10:59:38,614 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: ------------> Using Jets3tNativeFileSystemStore

2019-10-18 10:59:38,614 INFO [main] org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore: Getting key: key/000000_0 from bucket: s3-bucket with byteRangeStart: 122213

``

What I observe is read(byte b, int off, int len) method of org.apache.hadoop.fs.s3native.NativeS3FileSystem

Is calling seek again and again and seek in turn is calling reopen again and again.

If you observe every alternate reopening positions they are exact 16,347Bytes apart. So something is happening in NativeS3FileSystem with combination of ORC ingestion spec, which is causing this issue. I don’t have much idea about NativeS3FileSystem or org.apache.hadoop.hive.ql.io.orc So I am just observing through few logs and speculating.

Any help would be appreciated.

Point to Note :

Data is written using OrcInputFormat but reading using **OrcNewInputFormat. ** But still ingestion is working for significantly smaller files and data is also correct.

I tried giving ioConfig:inputSpec:inputFormat = org.apache.hadoop.hive.ql.io.orc.OrcInputFormat

But this gave this error, before even submitting job to the RM:

Caused by: java.lang.ClassCastException: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat cannot be cast to org.apache.hadoop.mapreduce.InputFormat
	at org.apache.hadoop.mapreduce.lib.input.MultipleInputs.getInputFormatMap(MultipleInputs.java:109) ~[?:?]
	at org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:59) ~[?:?]
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:303) ~[?:?]
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:320) ~[?:?]
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:198) ~[?:?]
	at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1341) ~[?:?]
	at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1338) ~[?:?]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_151]
	at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_151]
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840) ~[?:?]
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1338) ~[?:?]
	at io.druid.indexer.IndexGeneratorJob.run(IndexGeneratorJob.java:208) ~[druid-indexing-hadoop-0.11.0.jar:0.11.0]
	at io.druid.indexer.JobHelper.runJobs(JobHelper.java:368) ~[druid-indexing-hadoop-0.11.0.jar:0.11.0]
	at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:95) ~[druid-indexing-hadoop-0.11.0.jar:0.11.0]
	at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:279) ~[druid-indexing-service-0.11.0.jar:0.11.0]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:215) ~[druid-indexing-service-0.11.0.jar:0.11.0]
	... 7 more

``

Thanks.

Hi Abhi,
Even though not 100% sure, would like to clarify one doubt by using S3a instead of S3n.

You can try setting druid.storage.useS3aSchema=true in your common.runtime.properties file on all the nodes of your imply cluster, restart the cluster and retry the ingestion ?

Because you said it is working for smaller files but failing for bigger files , I am thinking of trying S3a instead of S3n.

Thank you.

–siva