Druid 0.14.1 - Map/Reduce indexing task fail AWS Signature Version 4

Hi,

I’m really close to get my first ingestion success but here is what I hope to be my last issue :slight_smile:

My cluster is on AWS, with s3 deep storage (this bucket is SSE encrypted)

As ingesting parquet files with S3 firehose is not working for now, I’m using hadoop.

My parquet files are stored in an S3 bucket (unencrypted for now)

Here is my ingestion spec :

{
“type”: “index_hadoop”,
“spec”: {
“dataSchema”: {
“dataSource”: “tasks”,
“parser”: {
“type”: “parquet”,
“parseSpec”: {
“format”: “timeAndDims”,
“dimensionsSpec”: {
“dimensions”: [
{
“name”: “a”,
“type”: “string”
},
{
“name”: “b”,
“type”: “string”
},
{
“name”: “c”,
“type”: “string”
},
{
“name”: “d”,
“type”: “string”
},
{
“name”: “e”,
“type”: “string”
},
{
“name”: “f”,
“type”: “string”
},
{
“name”: “g”,
“type”: “string”
},
{
“name”: “h”,
“type”: “string”
},
{
“name”: “i”,
“type”: “string”
}
]
},
“timestampSpec”: {
“column”: “business_date”,
“format”: “iso”
}
}
},
“metricsSpec”: [
{
“type”: “longSum”,
“name”: “amt”,
“fieldName”: “amt”
},
{
“type”: “longSum”,
“name”: “amt2”,
“fieldName”: “amt2”
},
{
“type”: “longSum”,
“name”: “qty”,
“fieldName”: “qty”
}
],
“granularitySpec” : {
“type” : “uniform”,
“segmentGranularity” : “DAY”,
“queryGranularity” : “DAY”,
“rollup” : true,
“intervals” : [ “2017-11-22T00:00:00.000Z/2017-11-24T00:00:00.000Z” ]
}
},
“ioConfig”: {
“type”: “hadoop”,
“inputSpec”: {
“paths” : “s3://mybucket/part-00000-cf38e3a8-8e18-4bc0-bb05-3d1f59604976.snappy.parquet”,
“inputFormat” : “org.apache.druid.data.input.parquet.DruidParquetInputFormat”,
“type” : “static”
},
“metadataUpdateSpec”: null,
“segmentOutputPath”: null
},
“tuningConfig”: {
“type”: “hadoop”,
“workingPath”: null,
“version”: “2019-05-10T13:31:45.290Z”,
“partitionsSpec”: {
“type”: “hashed”,
“targetPartitionSize”: -1,
“maxPartitionSize”: -1,
“assumeGrouped”: false,
“numShards”: 1,
“partitionDimensions”: [“business_date”]
},
“shardSpecs”: {},
“indexSpec”: {
“bitmap”: {
“type”: “roaring”,
“compressRunOnSerialization”: true
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“maxRowsInMemory”: 75000,
“leaveIntermediate”: false,
“cleanupOnFailure”: true,
“overwriteFiles”: false,
“ignoreInvalidRows”: true,
“jobProperties”: {
“mapreduce.map.output.compress”: “true”,
“mapreduce.map.java.opts”: “-server -Xms6g -Xmx6g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseG1GC”,
“fs.s3.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,
“mapreduce.input.fileinputformat.split.maxsize”: “1073741825”,
“fs.s3n.awsAccessKeyId”: “accesskey”,
“mapreduce.reduce.memory.mb”: “40000”,
“mapreduce.fileoutputcommitter.algorithm.version”: “2”,
“fs.s3.awsAccessKeyId”: “accesskey”,
“fs.s3.awsSecretAccessKey”: “secretkey”,
“mapreduce.map.memory.mb”: “8000”,
“fs.s3n.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,
“fs.s3n.awsSecretAccessKey”: “secretkey”,
“mapreduce.input.fileinputformat.split.minsize”: “536870912”,
“mapreduce.task.timeout”: “1800000”,
“mapreduce.reduce.java.opts”: “-server -Xms32g -Xmx32g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseG1GC”,
“io.compression.codecs”: “org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec”,
“mapreduce.job.user.classpath.first”: “true”
},
“combineText”: false,
“useCombiner”: true,
“buildV9Directly”: true,
“numBackgroundPersistThreads”: 1,
“forceExtendableShardSpecs”: false,
“useExplicitVersion”: false,
“allowedHadoopPrefix”:
}
},
“context”: {},
“groupId”: “tasks_group”,
“dataSource”: “tasks”,
“resource”: {
“availabilityGroup”: “tasks_group”,
“requiredCapacity”: 1
}
}

``

Everything seems to be fine until the hadoop indexing fails to finish reduce phase with that error :

2019-05-16T11:14:55,419 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1557388159969_0910_r_000001_2, Status : FAILED
Error: org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 Error Message. -- ResponseCode: 400, ResponseStatus: Bad Request, XML Error Message: <?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidArgument</Code><Message>Requests specifying Server Side Encryption with AWS KMS managed keys require AWS Signature Version 4.</Message><ArgumentName>Authorization</ArgumentName><ArgumentValue>null</ArgumentValue><RequestId>BC75E9E920D3FB8D</RequestId><HostId>XQlPwsl6zjNKd335RPI2z6evZbJy/d4fjUwiTnYLI9GlhBFH7NbKTI3CW3x4pUfqv8/tGOG4D94=</HostId></Error>
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:464)
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:124)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
        at org.apache.hadoop.fs.s3native.$Proxy77.storeFile(Unknown Source)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.close(NativeS3FileSystem.java:285)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
        at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:241)
        at java.util.zip.ZipOutputStream.close(ZipOutputStream.java:377)
        at org.apache.druid.indexer.JobHelper.zipAndCopyDir(JobHelper.java:553)
        at org.apache.druid.indexer.JobHelper$2.push(JobHelper.java:447)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
        at com.sun.proxy.$Proxy78.push(Unknown Source)
        at org.apache.druid.indexer.JobHelper.serializeOutIndex(JobHelper.java:458)
        at org.apache.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:817)
        at org.apache.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:570)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:635)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:175)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:169)
Caused by: org.jets3t.service.S3ServiceException: S3 Error Message. -- ResponseCode: 400, ResponseStatus: Bad Request, XML Error Message: <?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidArgument</Code><Message>Requests specifying Server Side Encryption with AWS KMS managed keys require AWS Signature Version 4.</Message><ArgumentName>Authorization</ArgumentName><ArgumentValue>null</ArgumentValue><RequestId>BC75E9E920D3FB8D</RequestId><HostId>XQlPwsl6zjNKd335RPI2z6evZbJy/d4fjUwiTnYLI9GlhBFH7NbKTI3CW3x4pUfqv8/tGOG4D94=</HostId></Error>
        at org.jets3t.service.S3Service.putObject(S3Service.java:2267)
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:122)
        ... 37 more

``

With that failure, i’m wondering 2 things :

1/ In which bucket is hadoop trying to write ?

In my mind, hadoop processes data and then Druid recovers them from hdfs to send them to deep storage. Am i wrong ? Is hadoop sending segments to deep storage itself ?

2/ If hadoop is sending segments itself, how can I avoid this kind of errors ?

I configured my EMR instances to have access to my deep storage bucket and KMS key decryption capabilities but it doesn’t seem to be sufficient.

I’ve found some resources about that on internet but all came up with rebuilding dependencies and druid itself (moreover, all were on older druid versions) and I can’t do that.

I saw something about adding this propertie

“com.amazonaws.services.s3.enableV4”: “true”

``

I set it in my jobProperties, that didnt’ change anything.

Has someone faced the same issue ? Any hints ?

Guillaume

Found a way out of this :slight_smile:

jobProperty “server-side-encryption” is the key.

Here a two examples of working configurations

With s3n

“fs.s3n.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,
“fs.s3n.server-side-encryption-algorithm”: “AES256”,
“fs.s3n.awsAccessKeyId”: “accesskey”,
“fs.s3n.awsSecretAccessKey”: “secretkey”,
“fs.s3.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,
“fs.s3.awsAccessKeyId”: “accesskey”,
“fs.s3.awsSecretAccessKey”: “secretkey”,

``

I don’t kow yet why s3 configuration is needed, but hadoop was’nt happy without it

With more recent s3a

Set this property in your Druid config

druid.storage.useS3aSchema=true

``

Then your spec can contain :

“fs.s3a.awsAccessKeyId”: “accesskey”,
“fs.s3a.awsSecretAccessKey”: “secretkey”,
“fs.s3a.impl”: “org.apache.hadoop.fs.s3a.S3AFileSystem”,
“fs.s3a.server-side-encryption-algorithm”: “AES256”,
“fs.s3.impl”: “org.apache.hadoop.fs.s3a.S3AFileSystem”,

``

fs.s3.impl is still needed to make it work.

Also, an even more secure way to pass spec without credentials in it :

“fs.s3a.impl”: “org.apache.hadoop.fs.s3a.S3AFileSystem”,
“fs.s3a.aws.credentials.provider”: “com.amazonaws.auth.InstanceProfileCredentialsProvider”,
“fs.s3a.server-side-encryption-algorithm”: “AES256”,
“fs.s3.impl”: “org.apache.hadoop.fs.s3a.S3AFileSystem”,

``

It requires you to fulfill your EMR instance role the policies to access your s3 buckets (source bucket AND druid deep storage if applicable) and kms:Decrypt to your kms key

If you use Instance Role Profile, you can also omit the credentials.provider property as long as you don’t provide any other credential properties (as InstanceProfileCredentialsProvider is the last checked authentication method, see https://hadoop.apache.org/docs/r2.8.3/hadoop-project-dist/hadoop-common/core-default.xml for more details)