Loading files from S3 with wildcards in index task - Daunting

Hi Druiders,

 I'm trying to index large amount of data in S3 buckets. There are about 3000 files generated every day for the last 1 year.
 We do not want to set up batch ingestion using EMR cluster. So I have defined an index task to load up these files. The IO Config uses static-s3 firehose as below.

The task seem to complete successfully however there are no dimensions indexed based on the task logs.
I'm wondering if the wild card works?

“ioConfig” : {
“type” : “index”,
“firehose” : {
“type” : “static-s3”,
“prefixes”: [“s3:///2018/05/07/*.gz”]
},
“appendToExisting” : false
},
“tuningConfig” : {
“type” : “index”,
“targetPartitionSize” : 5000000,
“maxRowsInMemory” : 25000,
“forceExtendableShardSpecs” : true
}

``

 Secondly, I tried without the wildcard and just with the directory as below. It seemed to load up all the files under the directory however failed with an OOM: (heap space). The middle manager is configured with 64M heap. The peons are made to run with 2G Heap and 1.5G MaxDirectMemoryBuffers. The granularity spec has DAY for segments and Hour for query...

“firehose” : {
“type” : “static-s3”,
“prefixes”: [“s3:///2018/05/07/”]
},

``

Thirdly, I changed the segment granularity to hour and query granularity to minute thinking that this would reduce the heap footprint. However that failed with Heap space as well.

Unfortunately all of our files are randomnly generated timestamped filenames in utc. The file sizes vary as well.

Can someone suggest what’s the best way to index day’s data through static-s3 firehose?

Best Regards
Varaga

Hi Varaga,

the ‘prefixes’ of static-s3 firehose is the prefixes of objects and the regex filter is not supported yet. So, if you set ‘prefixes’ to [“s3:///2018/05/07/”], then the firehose will read all objects under ‘s3:///2018/05/07/’. I raised an issue for supporting the regex filter here https://github.com/druid-io/druid/issues/5772.

Regarding OOM, can you post the stacktrace here? What kinds of OOM did you see?

Probably your maxRowsInMemory is too low unless your data has a lot of columns (its default is 75000 which is set conservatively). If maxRowsInMemory is too low, Druid tasks try to persist generated segments too frequently, thereby generating too many segment files. Those files should be merged before publishing which can occur OOM if too many files should be merged.

Jihoon

2018년 5월 10일 (목) 오전 8:35, Varaga chakravarthyvp@gmail.com님이 작성:

2018-05-11T10:28:32,500 ERROR [task-runner-0-priority-0] io.druid.indexing.common.task.IndexTask - Encountered exception in DETERMINE_PARTITIONS.
com.amazonaws.SdkClientException: Failed to sanitize XML document destined for handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4229) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4176) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4170) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at io.druid.storage.s3.S3Utils$2.fetchNextBatch(S3Utils.java:124) ~[?:?]
at io.druid.storage.s3.S3Utils$2.next(S3Utils.java:147) ~[?:?]
at io.druid.storage.s3.S3Utils$2.next(S3Utils.java:114) ~[?:?]
at com.google.common.collect.Iterators.addAll(Iterators.java:357) ~[guava-16.0.1.jar:?]
at com.google.common.collect.Lists.newArrayList(Lists.java:147) ~[guava-16.0.1.jar:?]
at io.druid.firehose.s3.StaticS3FirehoseFactory.initObjects(StaticS3FirehoseFactory.java:138) ~[?:?]
at io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory.connect(PrefetchableTextFilesFirehoseFactory.java:167) ~[druid-api-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory.connect(PrefetchableTextFilesFirehoseFactory.java:89) ~[druid-api-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.common.task.IndexTask.collectIntervalsAndShardSpecs(IndexTask.java:716) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.common.task.IndexTask.createShardSpecsFromInput(IndexTask.java:645) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.common.task.IndexTask.determineShardSpecs(IndexTask.java:583) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.common.task.IndexTask.run(IndexTask.java:417) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:456) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:428) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
Caused by: java.lang.OutOfMemoryError: Java heap space

``

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread “main-SendThread(ip-10-35-19-87.ec2.internal:2181)”

``

2018-05-11T10:28:32,499 DEBUG [JettyScheduler] org.eclipse.jetty.server.session - Scavenging sessions at 1526034512491
Exception in thread “HttpClient-Netty-Boss-0” java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932)
at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

``

`Hi Jihoon,

Thanks for your response. I tried modifying the maxRowsInMemory with 500K, 1 million rows etc., and the targetPartitionSize is 1 million rows and both failed with OOM:heapspace. A day’s files’ will have roughly 2 million rows.

The files are json documents and each of them have atleast 30-40 fields (in druid terms). ```Out of this, only 4 dimensions were indexed. I use flatten spec as our json documents are nested object structs and the 4 dimensions are json path parsed/flattened.`

Here below is the stack trace:

There are also other OOMs.

It should also be noted that the TASK does not complete to FAILED or so. I presume that the Peon died aand the overlord console shows that the TASK is ever running !

Best Regards
Varaga

`

Varaga,

this is interesting. It looks that the task failed while fetching the object list from s3. Was the number of input files about 3000 as specified above?

Would you post your middleManager configuration?

Jihoon

2018년 5월 11일 (금) 오전 4:05, Varaga chakravarthyvp@gmail.com님이 작성:

Hi Jihoon,

My bad, I was wrong with the no., of files. There are actually 14528 files on that day. Should this not be batched to read a smaller set of files every ROP?

Here is my configuration:

druid.service=druid/middlemanager
druid.host=
druid.port=

HTTP server threads

druid.server.http.numThreads=40

Processing threads and buffers

druid.processing.buffer.sizeBytes=36870912
druid.processing.numMergeBuffers=2
druid.processing.numThreads=2

Resources for peons

druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dlog4j.configurationFile=/opt/druid/conf/druid/middleManager/log4j2.xml
druid.indexer.task.baseDir=/mnt//task
druid.indexer.logs.directory=/mnt//task/logs
#druid.indexer.task.restoreTasksOnRestart=true
druid.indexer.runner.startPort=40000

Peon properties

druid.indexer.fork.property.druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”]
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=136870912
druid.indexer.fork.property.druid.processing.numMergeBuffers=2
druid.indexer.fork.property.druid.processing.numThreads=2

Number of tasks per middleManager

druid.worker.capacity=3
druid.worker.ip=localhost
druid.worker.version=0

``

Here is the jvm.config for middle manager

-server
-Xmx64m
-Xms64m
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

``

Hi Jihoon,

Any updates here?

Best Regards

Varaga

Hi Varaga,

I’m not sure what is occupying so much memory. Can you add “-XX:+HeapDumpOnOutOfMemoryError” to ‘druid.indexer.runner.javaOpts’ to get a heap dump file on the OOM error? You can use YourKit or VisualVm to analyze it.

In the mean time, I think there are two possible workarounds.

  • Increasing the heap size of peon to something larger than 2 GB.

  • Limit the max number of files ingested per task to something small like 5000.

Jihoon

2018년 5월 14일 (월) 오후 1:15, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:

Hi Jihoon,

Is there a configuration to set the max files for an indexing task?

I’ll try increasing the heap size. Does the peon need direct buffers at all? right now, I have 1.5G for off-heap and 2GB heap for each worker. It’s not quite clear if peon uses off-heap memory !

Best Regards

Varaga

Varaga,

Is there a configuration to set the max files for an indexing task?

No, it isn’t. You should do it manually by specifying the prefix properly.

I’ll try increasing the heap size. Does the peon need direct buffers at all? right now, I have 1.5G for off-heap and 2GB heap for each worker. It’s not quite clear if peon uses off-heap memory !

Yes, peon needs off-heap memory for query processing. If you don’t use stream ingestion, you can minimize it by adjusting druid.processing.buffer.sizeBytes, druid.processing.numThreads, and druid.processing.numMergeBuffers. Please note that the amount of memory needed is ‘druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)’.

Jihoon

2018년 5월 14일 (월) 오후 1:30, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:

Thanks Jihoon,

The prefix can’t be set as you said since regex isn’t supported yet for static-s3 firehose? Setting uris is not a runner since there are 14K files/day.

I’m also planning to use Kafka Indexing Service. I’m not sure if that falls under stream ingestion?

Varaga

Varaga,

The prefix can’t be set as you said since regex isn’t supported yet for static-s3 firehose? Setting uris is not a runner since there are 14K files/day.

The prefix is a filter to input files by checking the prefix of paths of input files. Let me suppose you have 3 files and their URIs are s3://some/path/file_1.csv, s3://some/path/file_1_1.csv and s3://some/path/file_2.csv, respectively. You can ingest only s3://some/path/file_1.csv and s3://some/path/file_1_1.csv by specifying the prefix to s3://some/path/file_1.

I’m also planning to use Kafka Indexing Service. I’m not sure if that falls under stream ingestion?

Yes, kafka indexing service is a stream ingestion method.

Jihoon

2018년 5월 14일 (월) 오후 2:14, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:

Hi Jihoon,

I increased heap upto 16GB + 1.5G for max buffers, with 5GB hard disk. It failed again with OOM:heap

Unfortunately I couldn’t use the prefixes as our files are generated with - formats.

I’ve kind of hit with road block.

The hprof was generated when the heap was at 8GB. I didn’t try to load it as this would hoard up my laptop.

Best Regards

Varaga

Hi Varaga,

would you please upload the heap dump file on something public like google drive or s3?

Jihoon

2018년 5월 15일 (화) 오전 9:52, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:

Hi Jihoon,

Here is the link to the hprof file in drive:
https://drive.google.com/open?id=1vgi0ftPI95iLEWMXpOMjjG7OzwYQ8t_c

Varaga,

from the heap dump file, the list of 'S3ObjectSummary’s took most of your memory (98%). It looks that the size of the list was more than 16,000,000. Would you double-check how many files are in the path you specified?

Jihoon

2018년 5월 16일 (수) 오전 9:19, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:

Jihoon, thanks for your response on the heap dump,

Let me check and revert with respect to the no., of files.

Are there dimensioning characteristics and scale factors based on data sizes etc., documented? I think it’d be great if there is documentation or guidance around resources needed vs data sizes for indexing, segment caches sizes etc.,

Best Regards

Varaga