Hi Druiders,
I'm trying to index large amount of data in S3 buckets. There are about 3000 files generated every day for the last 1 year.
We do not want to set up batch ingestion using EMR cluster. So I have defined an index task to load up these files. The IO Config uses static-s3 firehose as below.
The task seem to complete successfully however there are no dimensions indexed based on the task logs.
I'm wondering if the wild card works?
“ioConfig” : {
“type” : “index”,
“firehose” : {
“type” : “static-s3”,
“prefixes”: [“s3:///2018/05/07/*.gz”]
},
“appendToExisting” : false
},
“tuningConfig” : {
“type” : “index”,
“targetPartitionSize” : 5000000,
“maxRowsInMemory” : 25000,
“forceExtendableShardSpecs” : true
}
``
Secondly, I tried without the wildcard and just with the directory as below. It seemed to load up all the files under the directory however failed with an OOM: (heap space). The middle manager is configured with 64M heap. The peons are made to run with 2G Heap and 1.5G MaxDirectMemoryBuffers. The granularity spec has DAY for segments and Hour for query...
“firehose” : {
“type” : “static-s3”,
“prefixes”: [“s3:///2018/05/07/”]
},
``
Thirdly, I changed the segment granularity to hour and query granularity to minute thinking that this would reduce the heap footprint. However that failed with Heap space as well.
Unfortunately all of our files are randomnly generated timestamped filenames in utc. The file sizes vary as well.
Can someone suggest what’s the best way to index day’s data through static-s3 firehose?
Best Regards
Varaga
Hi Varaga,
the ‘prefixes’ of static-s3 firehose is the prefixes of objects and the regex filter is not supported yet. So, if you set ‘prefixes’ to [“s3:///2018/05/07/”], then the firehose will read all objects under ‘s3:///2018/05/07/’. I raised an issue for supporting the regex filter here https://github.com/druid-io/druid/issues/5772.
Regarding OOM, can you post the stacktrace here? What kinds of OOM did you see?
Probably your maxRowsInMemory is too low unless your data has a lot of columns (its default is 75000 which is set conservatively). If maxRowsInMemory is too low, Druid tasks try to persist generated segments too frequently, thereby generating too many segment files. Those files should be merged before publishing which can occur OOM if too many files should be merged.
Jihoon
2018년 5월 10일 (목) 오전 8:35, Varaga chakravarthyvp@gmail.com님이 작성:
2018-05-11T10:28:32,500 ERROR [task-runner-0-priority-0] io.druid.indexing.common.task.IndexTask - Encountered exception in DETERMINE_PARTITIONS.
com.amazonaws.SdkClientException: Failed to sanitize XML document destined for handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4229) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4176) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4170) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865) ~[aws-java-sdk-bundle-1.11.199.jar:?]
at io.druid.storage.s3.S3Utils$2.fetchNextBatch(S3Utils.java:124) ~[?:?]
at io.druid.storage.s3.S3Utils$2.next(S3Utils.java:147) ~[?:?]
at io.druid.storage.s3.S3Utils$2.next(S3Utils.java:114) ~[?:?]
at com.google.common.collect.Iterators.addAll(Iterators.java:357) ~[guava-16.0.1.jar:?]
at com.google.common.collect.Lists.newArrayList(Lists.java:147) ~[guava-16.0.1.jar:?]
at io.druid.firehose.s3.StaticS3FirehoseFactory.initObjects(StaticS3FirehoseFactory.java:138) ~[?:?]
at io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory.connect(PrefetchableTextFilesFirehoseFactory.java:167) ~[druid-api-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory.connect(PrefetchableTextFilesFirehoseFactory.java:89) ~[druid-api-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.common.task.IndexTask.collectIntervalsAndShardSpecs(IndexTask.java:716) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.common.task.IndexTask.createShardSpecsFromInput(IndexTask.java:645) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.common.task.IndexTask.determineShardSpecs(IndexTask.java:583) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.common.task.IndexTask.run(IndexTask.java:417) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:456) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:428) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
Caused by: java.lang.OutOfMemoryError: Java heap space
``
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread “main-SendThread(ip-10-35-19-87.ec2.internal:2181)”
``
2018-05-11T10:28:32,499 DEBUG [JettyScheduler] org.eclipse.jetty.server.session - Scavenging sessions at 1526034512491
Exception in thread “HttpClient-Netty-Boss-0” java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932)
at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
``
`Hi Jihoon,
Thanks for your response. I tried modifying the maxRowsInMemory
with 500K, 1 million rows etc., and the targetPartitionSize
is 1 million rows and both failed with OOM:heapspace
. A day’s files’ will have roughly 2 million rows.
The files are json documents and each of them have atleast 30-40 fields (in druid terms). ```Out of this, only 4 dimensions were indexed. I use flatten spec as our json documents are nested object structs and the 4 dimensions are json path parsed/flattened.`
Here below is the stack trace:
There are also other OOMs.
It should also be noted that the TASK does not complete to FAILED or so. I presume that the Peon died aand the overlord console shows that the TASK is ever running !
Best Regards
Varaga
`
Varaga,
this is interesting. It looks that the task failed while fetching the object list from s3. Was the number of input files about 3000 as specified above?
Would you post your middleManager configuration?
Jihoon
2018년 5월 11일 (금) 오전 4:05, Varaga chakravarthyvp@gmail.com님이 작성:
Hi Jihoon,
My bad, I was wrong with the no., of files. There are actually 14528 files on that day. Should this not be batched to read a smaller set of files every ROP?
Here is my configuration:
druid.service=druid/middlemanager
druid.host=
druid.port=
HTTP server threads
druid.server.http.numThreads=40
Processing threads and buffers
druid.processing.buffer.sizeBytes=36870912
druid.processing.numMergeBuffers=2
druid.processing.numThreads=2
Resources for peons
druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dlog4j.configurationFile=/opt/druid/conf/druid/middleManager/log4j2.xml
druid.indexer.task.baseDir=/mnt//task
druid.indexer.logs.directory=/mnt//task/logs
#druid.indexer.task.restoreTasksOnRestart=true
druid.indexer.runner.startPort=40000
Peon properties
druid.indexer.fork.property.druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”]
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=136870912
druid.indexer.fork.property.druid.processing.numMergeBuffers=2
druid.indexer.fork.property.druid.processing.numThreads=2
Number of tasks per middleManager
druid.worker.capacity=3
druid.worker.ip=localhost
druid.worker.version=0
``
Here is the jvm.config for middle manager
-server
-Xmx64m
-Xms64m
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
``
Hi Varaga,
I’m not sure what is occupying so much memory. Can you add “-XX:+HeapDumpOnOutOfMemoryError” to ‘druid.indexer.runner.javaOpts’ to get a heap dump file on the OOM error? You can use YourKit or VisualVm to analyze it.
In the mean time, I think there are two possible workarounds.
Jihoon
2018년 5월 14일 (월) 오후 1:15, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:
Hi Jihoon,
Is there a configuration to set the max files for an indexing task?
I’ll try increasing the heap size. Does the peon need direct buffers at all? right now, I have 1.5G for off-heap and 2GB heap for each worker. It’s not quite clear if peon uses off-heap memory !
Best Regards
Varaga
Varaga,
Is there a configuration to set the max files for an indexing task?
No, it isn’t. You should do it manually by specifying the prefix properly.
I’ll try increasing the heap size. Does the peon need direct buffers at all? right now, I have 1.5G for off-heap and 2GB heap for each worker. It’s not quite clear if peon uses off-heap memory !
Yes, peon needs off-heap memory for query processing. If you don’t use stream ingestion, you can minimize it by adjusting druid.processing.buffer.sizeBytes, druid.processing.numThreads, and druid.processing.numMergeBuffers. Please note that the amount of memory needed is ‘druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)’.
Jihoon
2018년 5월 14일 (월) 오후 1:30, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:
Thanks Jihoon,
The prefix can’t be set as you said since regex isn’t supported yet for static-s3 firehose? Setting uris is not a runner since there are 14K files/day.
I’m also planning to use Kafka Indexing Service. I’m not sure if that falls under stream ingestion?
Varaga
Varaga,
The prefix can’t be set as you said since regex isn’t supported yet for static-s3 firehose? Setting uris is not a runner since there are 14K files/day.
The prefix is a filter to input files by checking the prefix of paths of input files. Let me suppose you have 3 files and their URIs are s3://some/path/file_1.csv, s3://some/path/file_1_1.csv and s3://some/path/file_2.csv, respectively. You can ingest only s3://some/path/file_1.csv and s3://some/path/file_1_1.csv by specifying the prefix to s3://some/path/file_1.
I’m also planning to use Kafka Indexing Service. I’m not sure if that falls under stream ingestion?
Yes, kafka indexing service is a stream ingestion method.
Jihoon
2018년 5월 14일 (월) 오후 2:14, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:
Hi Jihoon,
I increased heap upto 16GB + 1.5G for max buffers, with 5GB hard disk. It failed again with OOM:heap
Unfortunately I couldn’t use the prefixes as our files are generated with - formats.
I’ve kind of hit with road block.
The hprof was generated when the heap was at 8GB. I didn’t try to load it as this would hoard up my laptop.
Best Regards
Varaga
Hi Varaga,
would you please upload the heap dump file on something public like google drive or s3?
Jihoon
2018년 5월 15일 (화) 오전 9:52, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:
Varaga,
from the heap dump file, the list of 'S3ObjectSummary’s took most of your memory (98%). It looks that the size of the list was more than 16,000,000. Would you double-check how many files are in the path you specified?
Jihoon
2018년 5월 16일 (수) 오전 9:19, Chakravarthy varaga chakravarthyvp@gmail.com님이 작성:
Jihoon, thanks for your response on the heap dump,
Let me check and revert with respect to the no., of files.
Are there dimensioning characteristics and scale factors based on data sizes etc., documented? I think it’d be great if there is documentation or guidance around resources needed vs data sizes for indexing, segment caches sizes etc.,
Best Regards
Varaga