Please advice for cluster setup tuning to speed long period queries

Good day!

Our current setup (druid v0.11) have the following specs and components running:

  • server 1 (4CPU, 32Gb RAM, 2Tb SSD): overlord, coordinator
  • server 2 (8CPU, 32Gb RAM, 2Tb SSD): broker
  • server 3 (4CPU, 40Gb RAM, 2Tb SSD): historical-hot (P1M), middlemanager
  • server 4 (4CPU, 40Gb RAM, 2Tb SSD): historical-cold (forever), middlemanager

``

Our hot tier have data for last 30 days, cold tier have everything.

Our data schema has 7 dimensions and 2 metrics.

We push to druid about 10 millions rows each 5 minutes, and that ends up with about 0.8Gb of data per hour, or 19 Gb per day, or 500Gb per month.

We currently have something about almost 3 month of data available for querying. The segment size is about 500Mb each, so there are about 480 segments per day. Data is also configured to be stored on hdfs.

Upon each query we are using 1 groupBy query to get the metrics for specified array of dimensions combinations that are then grouped by that dimension combination (we used to use several parallel timeseries queries in each passing only one dimension combination each time, but we found that groupBy query with OR and nested AND filters on array of dimension combination works faster).

Here’s an example:

{
“queryType”: “groupBy”,
“dataSource”: “prod”,
“granularity”: “DAY”,
“dimensions”: [“xxx”, “yyy”, “zzz”],
“filter”: {
“type”: “or”,
“fields”: [
{
“type”: “and”,
“fields”: [
{ “type”: “selector”, “dimension”: “xxx”, “value”: “some-xxx-1” },
{ “type”: “selector”, “dimension”: “yyy”, “value”: “some-yyy-1” },
{ “type”: “selector”, “dimension”: “zzz”, “value”: “some-zzz-1” }
]
},
{
“type”: “and”,
“fields”: [
{ “type”: “selector”, “dimension”: “xxx”, “value”: “some-xxx-2” },
{ “type”: “selector”, “dimension”: “yyy”, “value”: “some-yyy-2” },
{ “type”: “selector”, “dimension”: “zzz”, “value”: “some-zzz-2” }
]
},
{
“type”: “and”,
“fields”: [
{ “type”: “selector”, “dimension”: “xxx”, “value”: “some-xxx-3” },
{ “type”: “selector”, “dimension”: “yyy”, “value”: “some-yyy-3” },
{ “type”: “selector”, “dimension”: “zzz”, “value”: “some-zzz-3” }
]
}
]
},
“aggregations”: [
{ “type”: “doubleSum”, “name”: “sum”, “fieldName”: “val” }
],
“intervals”: [“2017-12-01T00:00:00.195Z/2017-12-30T00:00:00.195Z”],
“context”: {
“chunkPeriod”: “P3D”
}
}

``

Our problem right now is that there is a lot of disk-read operations going on when data for, let’s say, 10 days is requested since each and every segment have some rows specified by filter. For example 10 days query may need to look through 19*10=190Gb of data in segments and gets completed after about 10 minutes. Unfortunately we cannot introduce any old data roll-up to greater granularity since users may require precise historical data with similar minimal query granularity as for the most recent data.

So to overcome it we see two ways:

  1. [easy] We introduce more servers to hot tier. Like 10 more of them. In that case if user request data for 10 days, druid will (it should, at least) split the task to 10 tasks sending “get data for 1 day” to each historical node.

  2. [smart] We forget about hot and cold tiers and introduce new set of tiers. Since roughly 95% of request will be for data for last day, 80% for last week data, 30% for last month data and some fractions to get figures on longer periods, we need to have tiers, for example, like P1W, P1M, P6M and THEREST. Then for at least P1W ones we could ask for a much powerful servers for that tier so that they have enough RAM to hold one week of data entirely in RAM (to avoid disk-reads at all). Then P1M we can do the opposite - have like 10 of them but with much smaller specs so that they have to do disk-reads, but the tasks will be parallelized by druid. Probably do something similar for the rest of tiers, but having smaller amount of servers.

2.5 [addition to smart solution] Add realtime node(s) as well that holds, let’s say, latest 6 hours of data.

I would kindly ask for group users for advice on how to proceed and also maybe you could advice on any cluster/configuration changes preferably be done in order to speed thing up. Or even please propose any cluster setup you think will be best.

Here are our configurations:

HISTORICAL-HOT/COLD NODE CONFIGURATION (JVM)

command=/usr/bin/java
-server
-Xms12g
-Xmx12g
-XX:MaxDirectMemorySize=12G
-Djava.security.krb5.conf=/app/krb5/conf/krb5.conf
-Djava.security.auth.login.config=/app/druid/conf/druid/_common/jaas.conf
-Duser.timezone=UTC -Dfile.encoding=UTF-8
-Djava.io.tmpdir=/app/druid/tmp/
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-cp /app/druid/conf/druid/_common/:/app/druid/conf/druid/historical-hot/:/app/druid/lib/*
io.druid.cli.Main server historical
directory=/app/druid
stderr_logfile = /app/druid/logs/service/druid-historical-hot-stderr.log
stdout_logfile = /app/druid/logs/service/druid-historical-hot-stdout.log
priority=4
startsecs=1
startretries=3
autostart=false
autorestart=true
user=some-user

``

HISTORICAL-HOT/COLD NODE CONFIGURATION (RUNTIME)

druid.service=druid/historical-hot
druid.server.tier=hot
druid.server.maxSize=1500000000000
druid.processing.buffer.sizeBytes=2147483647
druid.processing.numThreads=3
druid.server.http.numThreads=50
druid.server.http.maxIdleTime=PT10m
druid.server.http.defaultQueryTimeout=600000
druid.segmentCache.locations=[{“path”: “/app/druid/tmp/indexCache”, “maxSize”: 1500000000000}]
druid.monitoring.monitors=[“io.druid.server.metrics.HistoricalMetricsMonitor”, “com.metamx.metrics.JvmMonitor”]

``

BROKER NODE CONFIGURATION (JVM)

command=/usr/bin/java
-server
-Xms12g
-Xmx12g
-XX:MaxDirectMemorySize=20g
-Djava.security.krb5.conf=/app/krb5/conf/krb5.conf
-Djava.security.auth.login.config=/app/druid/conf/druid/_common/jaas.conf
-Duser.timezone=UTC -Dfile.encoding=UTF-8
-Djava.io.tmpdir=/app/druid/tmp/
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-cp /app/druid/conf/druid/_common/:/app/druid/conf/druid/broker/:/app/druid/lib/*
io.druid.cli.Main server broker
directory=/app/druid
stderr_logfile = /app/druid/logs/service/druid-broker-stderr.log
stdout_logfile = /app/druid/logs/service/druid-broker-stdout.log
priority=4
startsecs=1
startretries=3
autostart=false
autorestart=true
user=some-user

``

BROKER NODE CONFIGURATION (RUNTIME)

druid.service=druid/broker
druid.port=8082
druid.broker.http.numConnections=20
druid.server.http.numThreads=50
druid.server.http.maxIdleTime=PT5m
druid.broker.http.readTimeout=PT15M
druid.processing.buffer.sizeBytes=2147483647
druid.processing.numThreads=7
druid.broker.balancer.type=connectionCount
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
druid.cache.type=local
druid.cache.sizeInBytes=2000000000

``

COORDINATOR INJECTION SPECS

{
“type”: “kafka”,
“dataSchema”: {
“dataSource”: “prod”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“dimensionsSpec”: {
“dimensionExclusions”: [“timestamp”, “value”],
“dimensions”: [“xxx”, “yyy”, “zzz”, “aaa”, “bbb”, “ccc”, “ddd”]
},
“format”: “json”,
“timestampSpec”: {
“column”: “timestamp”,
“format”: “auto”
}
}
},
“metricsSpec”: [{
“type”: “count”,
“name”: “count”
}, {
“type”: “longSum”,
“name”: “val”,
“fieldName”: “value”,
“expression”: null
}],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “THIRTY_MINUTE”,
“queryGranularity”: “FIVE_MINUTE”,
“rollup”: true,
“intervals”: null
}
},
“tuningConfig”: {
“type”: “kafka”,
“maxRowsInMemory”: 250000,
“maxRowsPerSegment”: 8000000,
“intermediatePersistPeriod”: “PT10M”,
“basePersistDirectory”: “/app/druid/tmp/1515054089532-0”,
“maxPendingPersists”: 0,
“indexSpec”: {
“bitmap”: {
“type”: “concise”
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“buildV9Directly”: true,
“reportParseExceptions”: false,
“handoffConditionTimeout”: 0,
“resetOffsetAutomatically”: true,
“workerThreads”: null,
“chatThreads”: 2,
“chatRetries”: 8,
“httpTimeout”: “PT20S”,
“shutdownTimeout”: “PT80S”,
“offsetFetchPeriod”: “PT30S”
},
“ioConfig”: {
“topic”: “parser-out”,
“replicas”: 1,
“taskCount”: 4,
“taskDuration”: “PT3600S”,
“consumerProperties”: {
“bootstrap.servers”: “some-server-1:9092, some-server-2:9092, some-server-3:9092, some-server-4:9092”
},
“startDelay”: “PT5S”,
“period”: “PT30S”,
“useEarliestOffset”: false,
“completionTimeout”: “PT5400S”,
“lateMessageRejectionPeriod”: null,
“earlyMessageRejectionPeriod”: null,
“skipOffsetGaps”: false
},
“context”: null
}

``

Thank you!