[druid-user] Druid error while ingestion

Hi,

Has anyone faced this issue while ingestion?
Tasks fail during ingestion

java.lang.InterruptedException: null
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) ~[?:1.8.0_275]
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) ~[?:1.8.0_275]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.possiblyPause(SeekableStreamIndexTaskRunner.java:1276) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:566) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:263) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443) [druid-indexing-service-0.22.1.jar:0.22.1]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]

Regards,
Chaitanya

Hi Chaitanya,

Can you tell us more about your ingestion? Is this a Kafka stream? I’m wondering if you have an unhealthy supervisor.

Best,

Mark

Hi Mark,

Thanks for your reply . Yes the supervisor was going to unhealthy state and I got this log in the failed tasks . This has been resolved as of now after I changed the spec for my testing.
So I would look at that issue later if it is seen again.

However, now I see out of memory from druid for native threads.
2022-04-19T05:04:28,421 ERROR [main] org.apache.druid.cli.CliPeon - Error when starting up. Failing.
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method) ~[?:1.8.0_275]
at java.lang.Thread.start(Thread.java:717) ~[?:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) ~[?:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378) ~[?:1.8.0_275]
at org.jboss.netty.util.internal.DeadLockProofWorker.start(DeadLockProofWorker.java:38) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:368) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.(AbstractNioSelector.java:100) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.(AbstractNioWorker.java:52) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioWorker.(NioWorker.java:45) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.newWorker(NioWorkerPool.java:44) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.newWorker(NioWorkerPool.java:28) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:80) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:39) ~[netty-3.10.6.Final.jar:?]
at org.apache.druid.java.util.http.client.HttpClientInit.createBootstrap(HttpClientInit.java:166) ~[druid-core-0.22.1.jar:0.22.1]
at org.apache.druid.java.util.http.client.HttpClientInit.createClient(HttpClientInit.java:90) ~[druid-core-0.22.1.jar:0.22.1]
at org.apache.druid.guice.http.HttpClientModule$HttpClientProvider.get(HttpClientModule.java:121) ~[druid-server-0.22.1.jar:0.22.1]
at org.apache.druid.guice.http.HttpClientModule$HttpClientProvider.get(HttpClientModule.java:83) ~[druid-server-0.22.1.jar:0.22.1]
at com.google.inject.internal.ProviderInternalFactory.provision(ProviderInternalFactory.java:81) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InternalFactoryToInitializableAdapter.provision(InternalFactoryToInitializableAdapter.java:53) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderInternalFactory.circularGet(ProviderInternalFactory.java:61) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InternalFactoryToInitializableAdapter.get(InternalFactoryToInitializableAdapter.java:45) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderToInternalFactoryAdapter$1.call(ProviderToInternalFactoryAdapter.java:46) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl.callInContext(InjectorImpl.java:1092) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:194) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:41) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:38) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:62) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:110) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:90) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:268) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl$2$1.call(InjectorImpl.java:1019) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl.callInContext(InjectorImpl.java:1092) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl$2.get(InjectorImpl.java:1015) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1050) ~[guice-4.1.0.jar:?]
at org.apache.druid.guice.PolyBind$ConfiggedProvider.get(PolyBind.java:190) ~[druid-core-0.22.1.jar:0.22.1]
at com.google.inject.internal.ProviderInternalFactory.provision(ProviderInternalFactory.java:81) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InternalFactoryToInitializableAdapter.provision(InternalFactoryToInitializableAdapter.java:53) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderInternalFactory.circularGet(ProviderInternalFactory.java:61) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InternalFactoryToInitializableAdapter.get(InternalFactoryToInitializableAdapter.java:45) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:38) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:62) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:110) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:90) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:268) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderToInternalFactoryAdapter$1.call(ProviderToInternalFactoryAdapter.java:46) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl.callInContext(InjectorImpl.java:1092) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:194) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:41) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:38) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:62) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:110) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:90) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:268) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderToInternalFactoryAdapter$1.call(ProviderToInternalFactoryAdapter.java:46) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl.callInContext(InjectorImpl.java:1092) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40) ~[guice-4.1.0.jar:?]
at org.apache.druid.guice.LifecycleScope$1.get(LifecycleScope.java:68) ~[druid-core-0.22.1.jar:0.22.1]
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:41) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.FactoryProxy.get(FactoryProxy.java:56) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:38) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:62) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:110) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:90) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:268) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderToInternalFactoryAdapter$1.call(ProviderToInternalFactoryAdapter.java:46) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl.callInContext(InjectorImpl.java:1092) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40) ~[guice-4.1.0.jar:?]
at org.apache.druid.guice.LifecycleScope$1.get(LifecycleScope.java:68) ~[druid-core-0.22.1.jar:0.22.1]
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:41) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl$2$1.call(InjectorImpl.java:1019) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl.callInContext(InjectorImpl.java:1085) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl$2.get(InjectorImpl.java:1015) ~[guice-4.1.0.jar:?]
at com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1050) ~[guice-4.1.0.jar:?]
at org.apache.druid.guice.LifecycleModule$2.start(LifecycleModule.java:141) ~[druid-core-0.22.1.jar:0.22.1]
at org.apache.druid.cli.GuiceRunnable.initLifecycle(GuiceRunnable.java:115) [druid-services-0.22.1.jar:0.22.1]
at org.apache.druid.cli.CliPeon.run(CliPeon.java:304) [druid-services-0.22.1.jar:0.22.1]
at org.apache.druid.cli.Main.main(Main.java:113) [druid-services-0.22.1.jar:0.22.1]

Below is my config with Each middle manager having 7 workers (tasks) , and each having 2Gb memory assigned to it.

druidconfig.PNG

Seems like the OS is unable to allocate and create native threads . So the application has to spawn lesser threads.
Are there any at the druid level to handle this ?

Regards,
Chaitanya

Hi Chaitanya,

Thank you for sharing all of that. A couple of things come to mind: do you have druid.indexer.runner.javaOpts in your middle manager runtime.properties? If so, you might bump up the Xms and Xmx. I’m also wondering about druid.processing.numThreads. It’s one of the Peon Processing properties, quoted in the next paragraph:

The number of processing threads to have available for parallel processing of segments. Our rule of thumb is num_cores - 1, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value 1.

Best,

Mark

Hi Mark,

Currently druid.indexer.runner.javaOpts isnt set . However, the druid.indexer.runner.javaOptsArray is set to 3g now.
Still I get the same error saying , unable to create native thread.

2022-04-20T10:51:22,429 INFO [main] org.apache.druid.cli.CliPeon - * druid.indexer.runner.javaOpts: -server -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

2022-04-20T10:51:22,429 INFO [main] org.apache.druid.cli.CliPeon - * druid.indexer.runner.javaOptsArray: ["-server", “-Xms3g”, “-Xss256k”,"-Xmx3g", “-XX:+UseG1GC”, “-XX:MaxGCPauseMillis=100”, “-XX:+HeapDumpOnOutOfMemoryError”,"-XX:HeapDumpPath=/tmp/mm-heap.hprof", “-Duser.timezone=UTC”, “-Dfile.encoding=UTF-8”, “-XX:+ExitOnOutOfMemoryError”, “-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager”,"-XX:+PrintGCDetails"]

Here, I am increasing the XmX and Xms values in the druid.indexer.runner.javaOptsArray itself.

Wrt your next question, I have the below configuration set. Also I changed druid_indexer_fork_property_druid_processing_buffer_sizeBytes from 50M to 100M , and the numThreads are set to 2 . With this configuration , I am still getting the same OOM error.

env:

  • name: DRUID_XMS
    value: 64m
  • name: DRUID_XMX
    value: 512m
  • name: druid_indexer_fork_property_druid_processing_buffer_sizeBytes
    value: 100M
  • name: druid_indexer_fork_property_druid_processing_numMergeBuffers
    value: “2”
  • name: druid_indexer_fork_property_druid_processing_numThreads
    value: “2”
  • name: druid_indexer_runner_javaOptsArray
    value: ‘["-server", “-Xms3g”, “-Xss256k”,"-Xmx3g", “-XX:+UseG1GC”, “-XX:MaxGCPauseMillis=100”, “-XX:+HeapDumpOnOutOfMemoryError”,"-XX:HeapDumpPath=/tmp/mm-heap.hprof", “-Duser.timezone=UTC”, “-Dfile.encoding=UTF-8”, “-XX:+ExitOnOutOfMemoryError”, “-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager”,"-XX:+PrintGCDetails"]’
  • name: druid_server_http_numThreads
    value: “50”
  • name: druid_worker_capacity
    value: “7”

I also did print GC, and got this set of details in the same stack trace after OutOfMemory was seen.
Heap
garbage-first heap total 3145728K, used 60937K [0x0000000700000000, 0x0000000700106000, 0x00000007c0000000)
region size 1024K, 51 young (52224K), 24 survivors (24576K)
Metaspace used 45174K, capacity 46262K, committed 46588K, reserved 1089536K
class space used 5561K, capacity 5864K, committed 5884K, reserved 10485

So, its not an issue with Heap space, as there was enough available already.

Now we need to understand why the “unable to create native thread” error is seen?

Regards,
Chaitanya

Hi Chaitanya,
The CPU limit for the MM is set to 16. druid_worker_capacity is 7.
These seem a bit off. worker capacity is typically (CPU cores - 1)

But that would not explain the issue you are seeing.

So a few questions:
What are the characteristics of the nodes that the pod is being deployed to? Are there enough resources there?
How many/which pods are running on each node? are the resource limits assigned to each pod available in the k8s cluster or are they competing for resources with each other?

Sergio

Hi Chaitanya,

Regarding the “unable to create new native thread” error, can you tell us about your max_map_count? I’m wondering if it might need to be increased.

Best,

Mark

Hi Sergio,

There are 18 worker nodes and each node has 16 cores and 64Gb memory allocated.
There are no other pods running

Above is the node config attached. Apart from the Druid ingestion , not many operations are running in this env. Also wrt memory utilization, resources are assigned to each pod based on the earlier screenshot I had sent, I see a lot of available CPU and memory in the worker nodes as well.

druid-exporter-prometheus-druid-exporter-798bd6d7bf-6k86p 1/1 Running 1 2d17h 10.130.8.22 large1-r2dnw-worker-sdf29
large-druid-broker-646955dfc7-tbkqt 1/1 Running 0 23h 10.130.2.53 large1-r2dnw-worker-4nbff
large-druid-coordinator-8fb4755f5-cckkq 1/1 Running 1 22h 10.130.4.141 large1-r2dnw-worker-fjsk9
large-druid-historical-0 1/1 Running 0 22h 10.130.2.72 large1-r2dnw-worker-4nbff
large-druid-historical-1 1/1 Running 0 22h 10.130.4.127 large1-r2dnw-worker-fjsk9
large-druid-middle-manager-0 1/1 Running 0 19h 10.130.2.107 large1-r2dnw-worker-4nbff
large-druid-middle-manager-1 1/1 Running 0 19h 10.130.5.12 large1-r2dnw-worker-fjsk9
large-druid-middle-manager-2 1/1 Running 0 19h 10.131.2.56 large1-r2dnw-worker-9wh9n
large-druid-middle-manager-3 1/1 Running 0 19h 10.128.4.47 large1-r2dnw-worker-r42l9
large-druid-middle-manager-4 1/1 Running 0 19h 10.131.8.45 large1-r2dnw-worker-s8rdh
large-druid-middle-manager-5 1/1 Running 0 19h 10.129.6.30 large1-r2dnw-worker-sg99r
large-druid-middle-manager-6 1/1 Running 0 19h 10.128.2.50 large1-r2dnw-worker-t7fq9
large-druid-middle-manager-7 1/1 Running 0 19h 10.128.10.58 large1-r2dnw-worker-h7mwf
large-druid-middle-manager-8 1/1 Running 0 19h 10.131.0.50 large1-r2dnw-worker-v8v9g
large-druid-middle-manager-9 1/1 Running 0 19h 10.128.6.36 large1-r2dnw-worker-ht2tv

Regards,
Chaitanya

Hi Mark,

The max_map_count is set as 262144 in the worker nodes.

Regards,
Chaitanya

A couple of more thoughts:

  • Could this be a noisy neighbor situation? I see you have many pods on these nodes, could other applications be exhausting or helping exhaust one of these limits?
  • Could you share the ingestion spec for the job that was/is failing this way? I’m wondering if the intermediate settings might be causing frequent spill to disk threads, I’m thinking of the following supervisor parameters:
  • maxRowsInMemory
  • maxBytesInMemory
  • intermediatePersistPeriod
  • maxPendingPersists

Let us know how it goes,

Sergio

Hi Sergio,

Thanks for the reply . I would check the number of threads used by those other processes.

Btw, here is my ingestion spec
{
“type”: “kafka”,
“spec”: {
“dataSchema”: {
“dataSource”: “pmdata”,
“timestampSpec”: {
“column”: “ts”,
“format”: “iso”,
“missingValue”: null
},
“dimensionsSpec”: {
“dimensions”: ,
“dimensionExclusions”: [
“__time”,
“ts”
]
},
“metricsSpec”: ,
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “HOUR”,
“queryGranularity”: “MINUTE”,
“rollup”: false,
“intervals”:
},
“transformSpec”: {
“filter”: null,
“transforms”:
}
},
“ioConfig”: {
“topic”: “pm.tsd”,
“inputFormat”: {
“type”: “json”,
“flattenSpec”: {
“useFieldDiscovery”: true,
“fields”:
},
“featureSpec”: {}
},
“replicas”: 1,
“taskCount”: 40,
“taskDuration”: “PT3600S”,
“consumerProperties”: {
“bootstrap.servers”: “kf-ckaf-kafka-headless.csf-os-name.svc.cluster.local:9092”
},
“pollTimeout”: 100,
“startDelay”: “PT5S”,
“period”: “PT30S”,
“useEarliestOffset”: false,
“completionTimeout”: “PT1800S”,
“lateMessageRejectionPeriod”: null,
“earlyMessageRejectionPeriod”: null,
“lateMessageRejectionStartDateTime”: null,
“stream”: “pm.tsd”,
“useEarliestSequenceNumber”: false,
“autoscalerConfig”: null,
“type”: “kafka”
},
“tuningConfig”: {
“type”: “kafka”,
“appendableIndexSpec”: {
“type”: “onheap”
},
“maxRowsInMemory”: 50000,
“maxBytesInMemory”: 2147483647,
“skipBytesInMemoryOverheadCheck”: false,
“maxRowsPerSegment”: 100000,
“maxTotalRows”: 500000,
“intermediatePersistPeriod”: “PT10M”,
“basePersistDirectory”: “/opt/druid/var/tmp/druid-realtime-persist8813738935372672681”,
“maxPendingPersists”: 0,
“indexSpec”: {
“bitmap”: {
“type”: “roaring”,
“compressRunOnSerialization”: true
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”,
“segmentLoader”: null
},
“indexSpecForIntermediatePersists”: {
“bitmap”: {
“type”: “roaring”,
“compressRunOnSerialization”: true
},
“dimensionCompression”: “uncompressed”,
“metricCompression”: “none”,
“longEncoding”: “longs”,
“segmentLoader”: null
},
“reportParseExceptions”: false,
“handoffConditionTimeout”: 100,
“resetOffsetAutomatically”: true,
“segmentWriteOutMediumFactory”: null,
“workerThreads”: 10,
“chatThreads”: null,
“chatRetries”: 8,
“httpTimeout”: “PT10S”,
“shutdownTimeout”: “PT80S”,
“offsetFetchPeriod”: “PT30S”,
“intermediateHandoffPeriod”: “PT780S”,
“logParseExceptions”: true,
“maxParseExceptions”: 2147483647,
“maxSavedParseExceptions”: 0,
“skipSequenceNumberAvailabilityCheck”: false,
“repartitionTransitionDuration”: “PT120S”
}
}
}

We were trying to change the below params
maxRowsInMemory
maxRowsPerSegment

maxTotalRows

Yesterday we were also trying even with intermediatepersistperiod tunings.
We will try more tunings with the params you have mentioned.

Now most of the tasks are succeeding, but a few tasks are failing due to the “unable to create Native thread” OOM error . One important change that helped in increasing the tasks overall success rate is by setting this “dimensionCompression”: “uncompressed”

Right now I am focussing on the tunings which would reduce the overall thread utilization per worker node.

Also , we increased the slots of the MMs to 12(from 10), so that more free slots would be available per MM, and these tasks would have more worker nodes(and hence more threads) to run.

Regards,
Chaitanya

Hey Chaitanya,
I think we need to start with how many messages you are trying to ingest, what is the avg and max arrival rate for messages to the topic?

You have very small values for “maxRowsInMemory”: 50000, “maxRowsPerSegment”: 100000, “maxTotalRows”: 500000With maxRowsInMemory at 50000, it will persist to disk every 50000 rows or so
maxRowsPerSegment at 100000 means each task will output a segment to deep storage around every 100000 rows, depending on the message throughput this could be very often. The recommended size is 50x that number, 5 million rows per segment.
maxTotalRows at 500000 across all 40 tasks, means that on average a segment will be output 500000/40 every 12500 rows, If you are using maxRowsPerSegment, then perhaps this value is unnecessary.

How many kafka partitions do you have in the topic? This determines the maximum number of ingestion tasks that can be used, but you can also use less tasks than the number of kafka partitions. Where each task will be assigned a roughly equal number of partitions to ingest from.

Try increasing the parameters above and maybe reducing the number of tasks. This all depends on the throughput of incoming messages.

Take a look at the Segment Optimization docs which describe how to create ideal segment sizes.

Hi Sergio ,

Yes I had already tried increasing these values, but had seen problems with the memory usage per task.

Btw, the “no native thread available” OOM issue was a node specific one which I resolved later.
I had heap space issues later as each task was taking more than the allocated 4GB. But I reduced the maxBytesInMemory to 1GB (from 2) and each task is now taking around 3.6GB approx…

All the tasks are passing after this. Now all the ingestion issues are resolved with my current ingestion spec.

Thanks for all the support / info provided so far, Mark and Sergio .

Regards,
Chaitanya

Hi Chaitanya,

Glad to hear your ingestion issues are resolved. Thank you for all of the details and clarity, in particular for your solution to the “no native thread available” issue.

Best,

Mark