Kafka Ingestion Task seems to always fail in OOM

We are experimenting with Druid for realtime Kafka ingestion with the Confluent schema registry. We have a particular topic that has a large number of fields. The number of events isn’t that large but in the logs it just seems to index until it runs out of memory. Here’s an example from the end of the task log:

2019-01-04T17:27:45,158 INFO [mpe_orders_joined_aggs_20190104-incremental-persist] org.apache.druid.java.util.common.io.smoosh.FileSmoosher - Created smoosh file [var/druid/task/index_kafka_mpe_orders_joined_aggs_20190104_18b39fe49d93c55_ikhppdcm/work/persist/mpe_orders_joined_aggs_20190104_2018-11-12T03:00:00.000Z_2018-11-12T04:00:00.000Z_2019-01-04T09:30:42.401Z/11/00000.smoosh] of size [10091] bytes.
2019-01-04T17:27:46,204 INFO [mpe_orders_joined_aggs_20190104-incremental-persist] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Segment[mpe_orders_joined_aggs_20190104_2018-11-15T02:00:00.000Z_2018-11-15T03:00:00.000Z_2019-01-04T09:30:47.481Z], persisting Hydrant[FireHydrant{, queryable=mpe_orders_joined_aggs_20190104_2018-11-15T02:00:00.000Z_2018-11-15T03:00:00.000Z_2019-01-04T09:30:47.481Z, count=12}]
2019-01-04T17:27:46,204 INFO [mpe_orders_joined_aggs_20190104-incremental-persist] org.apache.druid.segment.IndexMergerV9 - Starting persist for interval[2018-11-15T02:00:00.000Z/2018-11-15T03:00:00.000Z], rows[32]
2019-01-04T17:27:46,205 INFO [mpe_orders_joined_aggs_20190104-incremental-persist] org.apache.druid.segment.IndexMergerV9 - Using SegmentWriteOutMediumFactory[TmpFileSegmentWriteOutMediumFactory]
java.lang.OutOfMemoryError: GC overhead limit exceeded
Dumping heap to java_pid13429.hprof ...
Heap dump file created [2477180126 bytes in 15.890 secs]
Terminating due to java.lang.OutOfMemoryError: GC overhead limit exceeded.
I can double the Heap Memory to the indexing tasks, and it seems to work for two tasks, and fail the next, then succeed again for the next two:
druid.indexer.runner.javaOpts=-server -Xmx4g -XX:MaxDirectMemorySize=10240g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

But I’m thinking that it may instead be related to injestion spec?

Any help would be appreciated!

Hey Allon,

Are you using the latest (0.13.0) version? It has a feature where if you remove the “maxRowsInMemory” config, it will size it automatically based on the row sizes being ingested. That parameter is pretty important to avoiding memory exhaustion with wide datasets, and the automatic sizing was a nice recent addition. (Earlier versions would simply default to 75,000 rather than doing automatic sizing.)

If you still hit issues after that, try analyzing the heap dump to see if it is “suspiciously” full of a certain type of object. That would help point to what would need to be done next.

Gian- thank you, I’ll double check the version I’m using. I was doing some reading as well and it appears there needs to be some tuning in the spec because this topic has 15 partitions (yes, I know, going to be altered to 3) and many fields.

Under that circumstance, are there any other tuning options I can tweak to reduce the memory foot print? Thx!

Hey Allon,

In later versions of Druid, it is not quite so sensitive to the number of partitions. Since 0.12, Druid can ‘mix’ data together from different Kafka partitions, so there’s not much of a need to worry about 3 vs 15. maxRowsInMemory is really the big tunable that affects ingest-time memory use. Also making sure you’re using a later version of Druid is important, since 0.12 and 0.13 both have added improvements in that area.

I’ve removed the "maxRowsInMemory”, “maxBytesInMemory”, “maxRowsPerSegment”, “maxTotalRows”, “intermediatePersistPeriod” to the running supervisors, then reviewed the spec and it set these defaults:

“maxRowsInMemory”: 1000000,

“maxBytesInMemory”: 0,

“maxRowsPerSegment”: 5000000,

“maxTotalRows”: null,

“intermediatePersistPeriod”: “PT10M”,

Those are quite a bit higher than I had been using, but let’s see if the update to 0.13 helps with the issue. I’ll report back what I’m seeing.

Hi Gian- I am indeed running 0.13 and so far I haven’t see GC limit being met, but I continue to see a host of failed tasks and I’m not sure if they are related:

In the historical log I see tons of these:

…SegmentLoadDropHandler - Failed to load segment for dataSource: {class=org.apache.druid.server.coordination.SegmentLoadDropHandler, exceptionType=class org.apache.druid.segment.loading.SegmentLoadingException,
exceptionMessage=Exception loading segment …

Caused by: java.lang.IllegalArgumentException: [/var/druid/segments/blah/2018-12-12T12:00:00.000Z_2018-12-12T13:00:00.000Z/2019-01-04T09:30:42.938Z/2/7e9d4998-cc17-4cf4-9e6d-e00846734949/index.zip] does not
exist

Similar for the coordinator, tons:

…LoadRule - Throttling replication for segment [blah-12-13T10:00:00.000Z_2018-12-13T11:00:00.000Z_2018-12-21T19:38:34.879Z] in tier [_default_tier]

…LoadRule - Loading in progress, skipping drop until loading is complete

Then the tasks eventually fail:

…CoordinatorBasedSegmentHandoffNotifier - Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2018-10-06T16:00:00.000Z/2018-10-06T17:00:00.000Z, version=‘2019-01-04T09:31:31.878Z’, partitionNumber=5}]]

…CoordinatorBasedSegmentHandoffNotifier - Still waiting for Handoff for Segments : [[SegmentDescriptor{interval=2018-10-06T16:00:00.000Z/2018-10-06T17:00:00.000Z, version=‘2019-01-04T09:31:31.878Z’, partitionNumber=5}]]

Does this make sense?

Hey Allon,

It sounds like you might not have a deep storage configured. The most typical ones are s3, hdfs, or file (if it’s file, it should be a shared network mount that all servers can access).

Thanks Gian. I’m just using it on a single instance for now, a large instance, but I’m only toying around with very small (relative) amounts of data. I think some of my work-arounds for OOM errors with earlier
Druid may have been problematic with the latest Druid (kafka-index) and I think the Historical node was overwhelmed trying to delete all of the segment-cache partitions that it couldn’t keep up with the newly created ones. I’m starting it fresh to confirm
now.

Hi Gian, started it fresh and I still observe errors in the MiddleManager log and some tasks fail:

2019-01-07T20:45:44,371 INFO [forking-task-runner-1] org.apache.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: var/druid/indexing-logs/index_kafka_mpe_orders_joined_aggs_e3b519887f6133e_gbahiabb.log

2019-01-07T20:45:44,373 INFO [forking-task-runner-1] org.apache.druid.indexing.overlord.ForkingTaskRunner - Exception caught during execution

java.io.IOException: Stream closed

at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) ~[?:1.8.0_192]

at java.io.BufferedInputStream.read1(BufferedInputStream.java:291) ~[?:1.8.0_192]

at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_192]

at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.8.0_192]

at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?]

at org.apache.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:438) [druid-indexing-service-0.13.0-incubating-iap3.jar:0.13.0-incubating-iap3]

at org.apache.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:223) [druid-indexing-service-0.13.0-incubating-iap3.jar:0.13.0-incubating-iap3]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_192]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_192]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_192]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]

2019-01-07T20:45:44,380 INFO [forking-task-runner-1] org.apache.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: var/druid/task/index_kafka_mpe_orders_joined_aggs_e3b519887f6133e_gbahiabb

2019-01-07T20:45:44,735 ERROR [forking-task-runner-1] org.apache.druid.indexing.overlord.ForkingTaskRunner - Failed to delete task directory: {class=org.apache.druid.indexing.overlord.ForkingTaskRunner, exceptionType=class
java.io.IOException, exceptionMessage=Unable to delete directory var/druid/task/index_kafka_mpe_orders_joined_aggs_e3b519887f6133e_gbahiabb/work/persist., taskDir=var/druid/task/index_kafka_mpe_orders_joined_aggs_e3b519887f6133e_gbahiabb, task=index_kafka_mpe_orders_joined_aggs_e3b519887f6133e_gbahiabb}

java.io.IOException: Unable to delete directory var/druid/task/index_kafka_mpe_orders_joined_aggs_e3b519887f6133e_gbahiabb/work/persist.

at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1581) ~[commons-io-2.5.jar:2.5]

at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2372) ~[commons-io-2.5.jar:2.5]

at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1679) ~[commons-io-2.5.jar:2.5]

at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1575) ~[commons-io-2.5.jar:2.5]

at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2372) ~[commons-io-2.5.jar:2.5]

at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1679) ~[commons-io-2.5.jar:2.5]

at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1575) ~[commons-io-2.5.jar:2.5]

at org.apache.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:499) [druid-indexing-service-0.13.0-incubating-iap3.jar:0.13.0-incubating-iap3]

at org.apache.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:223) [druid-indexing-service-0.13.0-incubating-iap3.jar:0.13.0-incubating-iap3]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_192]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_192]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_192]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]

2019-01-07T20:45:44,739 INFO [WorkerTaskManager-NoticeHandler] org.apache.druid.indexing.worker.WorkerTaskManager - Job’s finished. Completed [index_kafka_mpe_orders_joined_aggs_e3b519887f6133e_gbahiabb] with
status [FAILED]

2019-01-07T20:49:46,553 INFO [WorkerTaskManager-CompletedTasksCleaner] org.apache.druid.indexing.worker.WorkerTaskManager - Deleting completed task[index_kafka_mpe_orders_joined_aggs_e3b519887f6133e_gbahiabb]
information, overlord task status[FAILED].

All on a local store, big EBS volume, plenty of CPU and RAM. Not sure what is up.