Whatever I do I am unable to get more than 5 to 10 TPS

I am using 7 64 core and 250GB RAM data servers. Whatever configuration options I follow, I am unable to get more than 5 to 10 TPS. The druid hangs if it goes over 10TPS.

Here are my confugs:

Broker

druid.service=druid/broker
druid.plaintextPort=8082

HTTP server settings

druid.server.http.numThreads=120

HTTP client settings

druid.broker.http.numConnections=500
druid.broker.http.maxQueuedBytes=100000000
druid.server.http.defaultQueryTimeout=3600000

Processing threads and buffers

druid.processing.buffer.sizeBytes=500000000
druid.processing.numMergeBuffers=40
druid.processing.numThreads=15
druid.processing.tmpDir=var/druid/processing

Query cache disabled – push down caching and merging instead

druid.broker.cache.useCache=false
druid.broker.cache.populateCache=false
druid.broker.cache.useResultLevelCache=false
druid.broker.cache.populateResultLevelCache=false
druid.sql.planner.metadataSegmentCacheEnable=false
druid.query.search.maxSearchLimit=1000000000
druid.query.groupBy.maxMergingDictionarySize=100000000
druid.query.groupBy.maxOnDiskStorage=1000000000
druid.broker.cache.useResultLevelCache=true
druid.broker.cache.populateResultLevelCache=true

MiddleManager

druid.service=druid/middleManager
druid.plaintextPort=8091

Number of tasks per middleManager

druid.worker.capacity=20

Task launch parameters

druid.indexer.runner.javaOpts=-server -Xms2g -Xmx2g -XX:MaxDirectMemorySize=13g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task

HTTP server threads

druid.server.http.numThreads=800

Processing threads and buffers on Peons

druid.indexer.fork.property.druid.processing.numMergeBuffers=8
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=1000000000
druid.indexer.fork.property.druid.processing.numThreads=4

Hadoop indexing

druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

druid.realtime.cache.useCache=true
druid.realtime.cache.populateCache=true
druid.cache.type=caffeine
druid.cache.sizeInBytes=2000000000
druid.query.groupBy.maxMergingDictionarySize=100000000
#druid.query.groupBy.maxOnDiskStorage=5000000000
druid.query.groupBy.defaultStrategy=v2

Historicals

druid.service=druid/historical
druid.plaintextPort=8083

HTTP server threads

druid.server.http.numThreads=800

Processing threads and buffers

druid.processing.buffer.sizeBytes=500000000
druid.processing.numMergeBuffers=40
druid.processing.numThreads=63
druid.processing.tmpDir=var/druid/processing

Segment storage

druid.segmentCache.locations=[{“path”:“var/druid/segment-cache”,“maxSize”:“300g”}]

Query cache

druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true
druid.cache.type=caffeine
druid.cache.sizeInBytes=2000000000

druid.query.groupBy.maxMergingDictionarySize=100000000
#druid.query.groupBy.maxOnDiskStorage=1000000000
druid.query.groupBy.defaultStrategy=v2

Can some one help? its blocking my deployment

I am using 1TB SSD for data on all nodes. The query is very simple, generally takes 200 to 500ms. its built on AWS ec2 instances

Could you elaborate on what you mean by ‘TPS’ please?

Assuming you are referring to Ingestion, nothing jumps out as being too much out of the ordinary in your MM config, but you could increase the worker capacity to this range (24-63) and reduce druid.server.http.numThreads to 100

Thanks Vijeth, Even without Kafka ingestion Historical are not going more than 50 TPS. Can you please share broker and router configs for 16 core server? surprisingly adding more historical nodes not helping. Also, What taxcount is recommended in Kafka ingestionSpec?

Hi rao222, I am still not sure what you mean by TPS. Here are the typical settings for a 16 node broker:

“druid.processing.numMergeBuffers”: 60,
“druid.processing.numThreads”: 1,
“druid.broker.http.numConnections”: 25,
“druid.processing.buffer.sizeBytes”: 500000000,
“druid.server.http.numThreads”: 60

In order for us to give you more guidance, could you provide your kafka specs/requirements in order for us to size the supervisor? ex. Expected throughput, number of partitions, query/ingestion SLA etc.

Hi Vijeth, Thanks for the information. TPS – Number of concurrent queries (throghput). Its a single group by query is being called with different company id and return 30 rows order by __time desc. The query takes around 200ms. but if I go up more than 20 concurrent, same query takes around 20 sec or more. also it times out. using more tasks even further slows down, Also can you please let me know what you mean sizing supervisor?

Ah, I see that you are looking at improving query concurrency. Assuming the historicals are 64 CPU machines, you really should be seeing better performance. especially when you scale the cluster horizontally.

I would start with cleaning up the config files as I see some conflicting parameters in them such as druid.broker.cache.useResultLevelCache being both true and false.

Your connection pooling parameters deviate a bit from best practices. I would reduce the druid.broker.http.numConnections on the broker to a more reasonable value of let’s say 50 assuming you have 1 broker, and then calculate the druid.server.http.numThreads on the historicals while considering how many brokers are in your cluster.

You can use this link for guidance:

If your cluster has 1 broker then I would say go with druid.server.http.numThreads=60 in our example

Also, please test with reducing the number of merge buffers to 20.

Please test with these and let me know how it goes, we can then look at more options to troubleshoot.

@Vijeth_Sagar I followed all best practices available in Druid documentation and also upgraded servers to compute intensive graviton3 ec2 instances. still I am struggling to get good TPS. is there any way, we can have zoom call and go over.

@Vijeth_Sagar one thing noticed is when i add multiple historicals, some nodes showing over 90% and my load test crashing. I also tried making replication factor is equal to number of nodes that means all nodes have all the data. I also stopped ingestion while doing it,

@rao222

Right off the bat, I would definitely not recommend a replication factor equal to number of historicals. Please set it to 2.

Are you collecting metrics for your cluster? If yes, please check the average query/segment/time.

That your cluster is choking and not responding to queries tells me you may have a bad query that is locking up the cluster. You would typically see query/segment/time equal to the duration your cluster is frozen if you order by descending.

When you mention that you are not getting more than 10 concurrent queries simultaneously, how are you testing this?

1 Like

Did you get further information about what was happening, @rao222 ?

And definitely +1 for metrics…!!

Adding historicals increases parallelism by providing more cores to queries – but the segments themselves need to be balanced well across the historicals to get good parallelism. Do you have good balance?

As it’s one segment scanned per thread in a query, if you have only three segments in your query, you will not get faster performance by adding just more historicals. How many segments do you have being scanned in the query?

Also, you cannot get faster than the lowest segment scan time as that is the lowest level of the parallelism. How fast are your segments being scanned?

I also just noted this:

Just confirming that you have 300GB disk space in each historical?

Our data is realtime data. If we use too many middle manager tasks and commit data (taskDuration) more frequently, its adding too many smaller segments that historicals not liking it. Even if I use “maxRowsPerSegment”: 10000000. The compaction says its compacted 100% but not merging segments. If I use bigger taskDuration like 1 hour then middlemanagers spiking 100% and queries are slow.

Do you know your current segment profile? You could configure autocompaction to ensure that segments are compacted as soon as possible to the recommended number of rows.

There are some example SYS queries here:

You can edit the configuration of autocompaction in the console: here’s an explanation of all the options. Maybe you need to increase inputSegmentSizeBytes

I’d say a one-hour task duration is about right – could you reduce the number of tasks to produce fewer segments?

Re: spiking 100%, that could mean that you need to allocate more resources to your peons.

OOI have you tried querying different periods of time to see if performance is different? Particularly, querying data you know is coming from the MM tasks (e.g. the last hour) versus data that you know is coming from Historicals (e.g. yesterday) to see if there is a difference in speed?

I allocated 63 out of 64 cores to peon(task) and running only 1 task for server. when the data reach over million, I am seeing spike in latencies. is druid work for high concurrency? We need 500 TPS. also any tweaks we need to do on master/coordinator/overload. Currently I use derby with 1 master server. I also changed to indexer in place of middleManager its same.
below is indexer runtime properties

druid.service=druid/indexer
druid.plaintextPort=8091

Number of tasks per indexer

druid.worker.capacity=8

Task launch parameters

druid.indexer.task.baseTaskDir=var/druid/task

HTTP server threads

druid.server.http.numThreads=800

Processing threads and buffers on Indexer

druid.processing.numMergeBuffers=40
druid.processing.buffer.sizeBytes=512MiB
druid.processing.numThreads=63

Hadoop indexing

druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

druid.realtime.cache.useCache=true
druid.realtime.cache.populateCache=true
druid.cache.type=caffeine
druid.cache.sizeInBytes=2048MiB
druid.query.groupBy.maxMergingDictionarySize=100000000
druid.query.groupBy.maxOnDiskStorage=1000000000
druid.query.groupBy.defaultStrategy=v2

Indexer JVM
-server
-Xms24g
-Xmx24g
-XX:MaxDirectMemorySize=54g
-XX:+ExitOnOutOfMemoryError
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

Can you share the ingestion spec?
How many partitions do you have in the kafka topic?

Parallelism of the real-time ingestion can only be as high as the number of partitions in the topic.

You will also need to define how many tasks are used to do the ingestion ioConfig.taskCount defines the number of tasks the ingestion will use concurrently. If enough worker slots are available across middle managers, this is the number of tasks that will be running. Each task will be assigned kafka partition count / taskCount partitions to consume from.
Docs for this are here.

This is also useful to understand how worker capacity relates to ingestion.

Hope it helps,
Sergio

Here is the ingestion spec. I am using 4 servers. I also tried increasing but not helping any in getting more TPS. The problem I am facing if I use 5 to 10 TPS it works fine. but if I go more than 50, getting timeout and druid is frozen. middleManager is unable to handle more TPS (over 50) even if there are only few millions of stream data. Also no issues with syncing data from Kafka but only issues are when querying with more TPS. our query is very simple:

SELECT company_id,server_timestamp ,screen_id, action, object
FROM WHERE company_id = $companyId AND screen_id IS NOT NULL
GROUP BY company_id, server_timestamp,screen_id, action, object
ORDER BY server_timestamp DESC LIMIT 30
{
“type”: “kafka”,
“spec”: {
“ioConfig”: {
“topic”: “",
“consumerProperties”: {
“security.protocol”: “SSL”,
“ssl.enabled.protocols”: “TLSv1.2”,
“ssl.truststore.location”: “/data/druid/keystore.jks”,
“ssl.truststore.password”: "
",
“bootstrap.servers”: "
****”,
“auto.offset.reset”: “latest”
},
“taskCount”: 4,
“replicas”: 1,
“taskDuration”: “PT30M”,
“useEarliestOffset”: false,
“skipOffsetGaps”: false
},
“tuningConfig”: {
“type”: “kafka”,
“maxRowsPerSegment”: 10000000,
“maxRowsInMemory”: 50000,
“indexSpec”: {
“bitmap”: {
“type”: “roaring”
},
“longEncoding”: “longs”
},
“resetOffsetAutomatically”: true,
“httpTimeout”: “PT30S”,
“reportParseExceptions”: “false”,
“logParseExceptions”: “false”,
“handoffConditionTimeout”: 0,
“shutdownTimeout”: “PT80S”,
“offsetFetchPeriod”: “PT30S”,
“intermediatePersistPeriod”: “PT30S”,
“buildV9Directly”: true,
“dimensionCompression”: “LZ4”,
“metricCompression”: “LZ4”
},
“dataSchema”: {
“dataSource”: “clickstream_33”,
“parser”: {
“type”: “eventbusParser”,
“parseSpec”: {
“format”: “json”,
“flattenSpec”: {
“useFieldDiscovery”: true,
“fields”: [
{
“type”: “path”,
“name”: “value_sentAt”,
“expr”: “sentAt”
},
{
“type”: “path”,
“name”: “value_timestamp”,
“expr”: “timestamp”
},
{
“type”: “path”,
“name”: “value_properties_current_timestamp”,
“expr”: “properties.current_timestamp”
},
{
“type”: “path”,
“name”: “value_properties_company_id”,
“expr”: “properties.company_id”
},
{
“type”: “path”,
“name”: “value_context_traits_company_id”,
“expr”: “context.traits.company_id”
},
{
“type”: “path”,
“name”: “value_traits_company_id”,
“expr”: “traits.company_id”
},
{
“type”: “path”,
“name”: “value_properties_firm_id”,
“expr”: “properties.firm_id”
},
{
“type”: “path”,
“name”: “value_properties_realmId”,
“expr”: “properties.realmId”
},
{
“type”: “path”,
“name”: “value_properties_realm_id”,
“expr”: “properties.realm_id”
},
{
“type”: “path”,
“name”: “value_traits_realm_id”,
“expr”: “traits.realm_id”
},
{
“type”: “path”,
“name”: “value_context_groupId”,
“expr”: “context.groupId”
},
{
“type”: “path”,
“name”: “value_context_groupid”,
“expr”: “context.groupid”
},
{
“type”: “path”,
“name”: “value_groupId”,
“expr”: “groupId”
},
{
“type”: “path”,
“name”: “value_context_auth_id”,
“expr”: “context.auth_id”
},
{
“type”: “path”,
“name”: “value_context_traits_authID”,
“expr”: “context.traits.authID”
},
{
“type”: “path”,
“name”: “value_context_traits_auth_id”,
“expr”: “context.traits.auth_id”
},
{
“type”: “path”,
“name”: “value_properties_authID”,
“expr”: “properties.authID”
},
{
“type”: “path”,
“name”: “value_properties_auth_id”,
“expr”: “properties.auth_id”
},
{
“type”: “path”,
“name”: “value_traits_authID”,
“expr”: “traits.authID”
},
{
“type”: “path”,
“name”: “value_traits_auth_id”,
“expr”: “traits.auth_id”
},
{
“type”: “path”,
“name”: “value_properties_url_host_name”,
“expr”: “properties.url_host_name”
},
{
“type”: “path”,
“name”: “value_context_url_host_name”,
“expr”: “context.url_host_name”
},
{
“type”: “path”,
“name”: “value_context_url_hostname”,
“expr”: “context.url_hostname”
},
{
“type”: “path”,
“name”: “value_properties_hosting_app”,
“expr”: “properties.hosting_app”
},
{
“type”: “path”,
“name”: “value_traits_url_host_name”,
“expr”: “traits.url_host_name”
},
{
“type”: “path”,
“name”: “value_context_page_referrer”,
“expr”: “context.page.referrer”
},
{
“type”: “path”,
“name”: “value_properties_referrer”,
“expr”: “properties.referrer”
},
{
“type”: “path”,
“name”: “value_properties_partner”,
“expr”: “properties.partner”
},
{
“type”: “path”,
“name”: “value_context_traits_partner”,
“expr”: “context.traits.partner”
},
{
“type”: “path”,
“name”: “value_traits_partner”,
“expr”: “traits.partner”
},
{
“type”: “path”,
“name”: “value_properties_qbo_user_type”,
“expr”: “properties.qbo_user_type”
},
{
“type”: “path”,
“name”: “value_traits_qbo_user_type”,
“expr”: “traits.qbo_user_type”
},
{
“type”: “path”,
“name”: “value_properties_event_params_userType”,
“expr”: “properties.event_params.userType”
},
{
“type”: “path”,
“name”: “value_context_traits_type”,
“expr”: “context.traits.type”
},
{
“type”: “path”,
“name”: “current_sku”,
“expr”: “properties.current_sku”
},
{
“type”: “path”,
“name”: “value_properties_sku”,
“expr”: “properties.sku”
},
{
“type”: “path”,
“name”: “value_traits_sku”,
“expr”: “traits.sku”
},
{
“type”: “path”,
“name”: “value_properties_product_sku”,
“expr”: “properties.product_sku”
},
{
“type”: “path”,
“name”: “value_properties_app_name”,
“expr”: “properties.app_name”
},
{
“type”: “path”,
“name”: “value_event_header_app_name”,
“expr”: “event_header.app_name”
},
{
“type”: “path”,
“name”: “value_properties_selected_app_name”,
“expr”: “properties.selected_app_name”
},
{
“type”: “path”,
“name”: “value_properties_screen”,
“expr”: “properties.screen”
},
{
“type”: “path”,
“name”: “value_properties_refer_screen”,
“expr”: “properties.refer_screen”
},
{
“type”: “path”,
“name”: “screen_path”,
“expr”: “properties.screen_path”
},
{
“type”: “path”,
“name”: “value_context_page_title”,
“expr”: “context.page.title”
},
{
“type”: “path”,
“name”: “value_properties_page_title”,
“expr”: “properties.page_title”
},
{
“type”: “path”,
“name”: “value_properties_title”,
“expr”: “properties.title”
},
{
“type”: “path”,
“name”: “action”,
“expr”: “properties.action”
},
{
“type”: “path”,
“name”: “object”,
“expr”: “properties.object”
},
{
“type”: “path”,
“name”: “sub_action”,
“expr”: “properties.ui_action”
},
{
“type”: “path”,
“name”: “sub_object”,
“expr”: “properties.ui_object”
},
{
“type”: “path”,
“name”: “server_timestamp”,
“expr”: “event_header.server_timestamp”
}
]
},
“dimensionsSpec”: {
“dimensions”: [
{“type”: “string”,“name”:“company_id”,“createBitmapIndex”:“true”},
{“type”: “long”, “name”: “server_timestamp”},
{“type”: “string”,“name”:“screen_id”,“createBitmapIndex”:“true”},
“action”,
“object”,
“auth_id”,
“url_host_name”,
“referrer_url”,
“partner_name”,
“user_type”,
“current_sku”,
“sku”,
“app_name”,
“screen_path”,
“page_title”,
“sub_action”,
“sub_object”,
“time_stamp”
]
},
“timestampSpec”: {
“column”: “server_timestamp”,
“format”: “auto”,
“missingValue”: “2000-01-01T00:00:00.000Z”
}
}
},
“transformSpec”: {
“transforms”: [
{
“type”: “expression”,
“name”: “time_stamp”,
“expression”: “nvl(value_sentAt,nvl(value_timestamp,nvl(value_properties_current_timestamp,null)))”
},
{
“type”: “expression”,
“name”: “company_id”,
“expression”: “nvl(value_properties_company_id,nvl(value_context_traits_company_id,nvl(value_traits_company_id,nvl(value_properties_firm_id,nvl(value_properties_realmId,nvl(value_properties_realm_id,nvl(value_traits_realm_id,nvl(value_context_groupId,nvl(value_context_groupid,nvl(value_groupId,null))))))))))”
},
{
“type”: “expression”,
“name”: “auth_id”,
“expression”: “nvl(value_context_auth_id,nvl(value_context_traits_authID,nvl(value_context_traits_auth_id,nvl(value_properties_authID,nvl(value_properties_auth_id,nvl(value_traits_authID,nvl(value_traits_auth_id,null)))))))”
},
{
“type”: “expression”,
“name”: “url_host_name”,
“expression”: “nvl(value_properties_url_host_name,nvl(value_context_url_host_name,nvl(value_context_url_hostname,nvl(value_properties_hosting_app,nvl(value_traits_url_host_name,null)))))”
},
{
“type”: “expression”,
“name”: “referrer_url”,
“expression”: “nvl(value_context_page_referrer,nvl(value_properties_referrer,null))”
},
{
“type”: “expression”,
“name”: “partner_name”,
“expression”: “nvl(value_properties_partner,nvl(value_context_traits_partner,nvl(value_traits_partner,null)))”
},
{
“type”: “expression”,
“name”: “user_type”,
“expression”: “nvl(value_properties_qbo_user_type,nvl(value_traits_qbo_user_type,nvl(value_properties_event_params_userType,nvl(value_context_traits_type,null))))”
},
{
“type”: “expression”,
“name”: “sku”,
“expression”: “nvl(value_properties_sku,nvl(value_traits_sku,nvl(value_properties_product_sku,null)))”
},
{
“type”: “expression”,
“name”: “app_name”,
“expression”: “nvl(value_properties_app_name,nvl(value_event_header_app_name,nvl(value_properties_selected_app_name,null)))”
},
{
“type”: “expression”,
“name”: “screen_id”,
“expression”: “nvl(value_properties_screen,nvl(value_properties_refer_screen,null))”
},
{
“type”: “expression”,
“name”: “page_title”,
“expression”: “nvl(value_context_page_title,nvl(value_properties_page_title,nvl(value_properties_title,null)))”
}
],
“filter”:{
“type”:“not”,
“field”: {
“type”:“selector”,
“dimension”:“server_timestamp”,
“value”:0
},
“field”: {
“type”:“selector”,
“dimension”:“server_timestamp”,
“value”:null
},
“field”: {
“type”:“selector”,
“dimension”:“company_id”,
“value”:null
},
“field”: {
“type”:“selector”,
“dimension”:"__time",
“value”:“2000-01-01T00:00:00.000Z”
}
}
},
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “DAY”,
“queryGranularity”: “DAY”,
“rollup”: “false”
}
}
}
}

A taskCount = 4 means that 4 tasks will be created to ingest from the kafka topic, which means a maximum parallelism of 4. This means that only 4 out of the available worker capacity will be used for this.

How many partitions do you have in kafka topic ? Even if you have 4 tasks, if you only have one partition, you will only have one task. You’ll need at least as many partitions as the taskCount you want in order to consume them in parallel.

Another item I noticed is that your queryGranularity is set to “DAY”, this means that all timestamps will be truncated to dates. Is that what you are looking for? If so, then you’ll likely want to enable rollup and aggregate metrics during ingestion.

This doc has the info you need regarding queryGranularity and rollup

BTW… I think you may need a filter on the __time column so that you are limiting the timeframe you look at in your query. Otherwise you would need to potentially read all segments, sort them and then take the top 30.

Thanks for quick response. There are no issues with ingestion. I do see all data in druid table on-time and also peons not taking much resources when there are no read queries. The issue is I am not getting read throughput. I need 500TPS. I can get now under 50 TPS. adding more servers not helping.

Currently I am using only one day worth of data (dropbeforePeriod =1day), so no __time filter is required. I added server_timestamp as second column after company_id which is used to find last 30 events (order by server_timestamp desc). all queries are filtered by company_id which is first dimension. Also, I can not rollup as they need last 30 events.
“dimensions”: [
{“type”: “string”,“name”:“company_id”,“createBitmapIndex”:“true”},
{“type”: “long”, “name”: “server_timestamp”},
{“type”: “string”,“name”:“screen_id”,“createBitmapIndex”:“true”},
“action”,
“object”,
“auth_id”,
“url_host_name”,
“referrer_url”,
“partner_name”,
“user_type”,
“current_sku”,
“sku”,
“app_name”,
“screen_path”,
“page_title”,
“sub_action”,
“sub_object”,
“time_stamp”
]
},

It is a little bit length, but could you confirm that you have been through all of the tuning guide in the docs? For example, there is the question of whether you have enough Jetty threads on the broker for the number of middlemanagers.

Also, the segment size optimisation tasks?