Realtime node taking too long to persistAndMerge

I am ingesting data from Kafka using Kafka Firehose API. There are around 5 million to 10 million events/ logs in 1 hour. I have 24 partitions in Kafka which are evenly distributed on 4 Realtime nodes.
Most of the dimensions have low cardinality, but some of the dimensions ( around 4-5) in the data have very high cardinality i.e. around 70-80% of the total rows. Real-time node is taking too long to merge and persist the segments - sometimes it takes around 30 minutes to process one high cardinality dimension and there are 15-20 intermediate partitions.

My nodes have 8 cores, 16 GB RAM and SSDs, other configs can be found below.

-----Data_realtime_spec.json -

“metricsSpec”: [

{

“type”: “count”,

“name”: “count”

},

{

“type”: “hyperUnique”,

“name”: “DistinctVisitors”,

“fieldName”: “visitor_id”

}

],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “HOUR”,

“queryGranularity”: “HOUR”

}

“tuningConfig”: {

“type”: “realtime”,

“maxRowsInMemory”: 200000,

“intermediatePersistPeriod”: “PT15m”,

“windowPeriod”: “PT20m”,

“basePersistDirectory”: “/tmp/realtime/basePersist”,

“rejectionPolicy”: {

“type”: “serverTime”

},

“shardSpec”: {

“type”: “linear”,

“partitionNum”: 4

}

}

---- Realtime/runtime.properties -

druid.processing.buffer.sizeBytes=500000000

druid.processing.numThreads=5

  1. What is the best possible configurations given the current System limitations- JVM heap size, Max direct memory size, druid.processing.buffer.sizeBytes, druid.processing.numThreads, maxRowsInMemory, intermediatePersistPeriod, windowPeriod and other configs?

  2. Do I need to upgrade the nodes ( more RAM etc.) and if yes, how much?

This https://github.com/druid-io/druid/pull/1725 is probably partially at play here.

In general, your lowest hanging fruit for reducing the merge and push is to reduce the cardinality each realtime segment handles, either through changing how you distribute the data or increasing the number of realtime indexing tasks (assuming the cardinality each one handles is also reduced).

Indexing of high cardinality dimensions is a bit of a pain point right now, and something that hasn’t been fully optimized yet. I suspect this is because the high cardinality dimension values are kept as memory mapped bytes and not cached as strings, which makes some of the compares that occur during indexing very expensive to do over and over and over (they must be converted from bytes to strings every time). GC pressure is already pretty high during that final step and adding in string caching (from a development side) has to be done very carefully to not blow the heap limitations.

If you have the ability to limit the cardinality of the dimensions for a particular segment (example: send to a realtime indexing instance based on a hash of your highest cardinality dimension) it might help a bit.

What you are experiencing with indexing high cardinality dimensions is a known pain point right now, and long persist and merge times are something we experience internally as well.

Hi Saksham, are those 10 million events after rollup? As in, each RT node is handling around 2.5M events per hour?

Do you happen to have the logs around long merge times? As the merge and persists occur, there should be periodic printouts of how long each step is taking. Those numbers will be interesting.