[druid-user] How to improve the theta sketch performance

Hi everyone,

We have a cluster running some real-time kinesis ingestion jobs. We used theta sketch to aggregate our user_id for some count-distinct situations. However, the performance of the queries is not satisfied and acceptable.

Our cluster has 1 overload-coordinator; 1 router-broker; 1 historical; 2 middlemanager running 2 kinesis job

Our kinesis job:
segmentGranularity: five minutes

sketch size:65536 (uid)

{
“type”: “thetaSketch”,
“name”: “uid”,
“fieldName”: “user_id”,
“size”: 65536,
“shouldFinalize”: true,
“isInputThetaSketch”: false
}

4 dimenisions, 2 other matrics.
taskCount: 4
replicas: 1

When I run a query like:

SELECT
TIME_FORMAT(TIME_FLOOR("__time", ‘PT5M’), ‘YYYY-MM-dd HH:mm’, ‘-08:00’) as “date”,
APPROX_COUNT_DISTINCT_DS_THETA(uid, 32768) users
FROM app_MIN
WHERE “__time” >= CURRENT_TIMESTAMP - INTERVAL ‘48’ HOUR AND “__time” <= CURRENT_TIMESTAMP - INTERVAL ‘0’ HOUR
GROUP BY 1

It takes about 15s to run with useCache=true and populateCache=true which is not acceptable. We want the performance has a value around 1s.
The distinct users in our dataset is around 370k per 5min.
We cannot decrease the sketch size or time interval because of our requirement.
In addition, I set queryCache on our historical and set druid.cache.sizeInBytes=10g, but nothing seems to have changed.

My question is:
1 Can we do anything to improve the speed of our queries OR is this the bottleneck of druid sketch so we cannot do anything on the druid level?

2 Does query cache work for sketch queries? If it does, how should I config my cluster?

Thanks

If I understand correctly, this may be something on the druid side. How many segments are there for each 5-minute period?
If you had one per query interval (5 minutes), then a 48-hour query would only need to aggregate fewer than 600 sketches.
(It looks as if you compute the sketches during aggregation.) If you have a lot of segments, compaction might help.

Or nodes could be overloaded and slow in general - it sounds like a small setup.

I’m not clear why you ingest with 65536 and query with 32768, or whether that impacts performance much.

I assume the user_id is a string? Sketches on longs are faster to compute, but in this case that should impact ingestion
more than the query times. (Since you compute the sketches during ingestion.)

(Not sure about query cache and sketches.)

Thanks for your reply.
I post my kinesis job below:

“type”: “kinesis”,
“spec”: {
“dataSchema”: {
“dataSource”: “app_MIN_V4”,
“timestampSpec”: {
“column”: “time”,
“format”: “posix”,
“missingValue”: null
},
“dimensionsSpec”: {
“dimensions”: [
{
“type”: “string”,
“name”: “appid”,
“multiValueHandling”: “SORTED_ARRAY”,
“createBitmapIndex”: true
},
{
“type”: “string”,
“name”: “active”,
“multiValueHandling”: “SORTED_ARRAY”,
“createBitmapIndex”: false
},
{
“type”: “string”,
“name”: “install”,
“multiValueHandling”: “SORTED_ARRAY”,
“createBitmapIndex”: false
},
{
“type”: “string”,
“name”: “pay”,
“multiValueHandling”: “SORTED_ARRAY”,
“createBitmapIndex”: false
}
],
“dimensionExclusions”: [
“uid”,
“revenue”,
“payMoney”,
“payCount”,
“time”
]
},
“metricsSpec”: [
{
“type”: “longSum”,
“name”: “payCount”,
“fieldName”: “pay”,
“expression”: null
},
{
“type”: “doubleSum”,
“name”: “payMoney”,
“fieldName”: “revenue”,
“expression”: null
},
{
“type”: “thetaSketch”,
“name”: “uid”,
“fieldName”: “uid”,
“size”: 65536,
“shouldFinalize”: true,
“isInputThetaSketch”: false,
“errorBoundsStdDev”: null
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “FIVE_MINUTE”,
“queryGranularity”: “FIVE_MINUTE”,
“rollup”: true,
“intervals”: null
},
“transformSpec”: {
“filter”: null,
“transforms”: [
{
“type”: “expression”,
“name”: “revenue”,
“expression”: “XXX”
},
{
“type”: “expression”,
“name”: “active”,
“expression”: “XXX”
},
{
“type”: “expression”,
“name”: “pay”,
“expression”: “XXX”
},
{
“type”: “expression”,
“name”: “install”,
“expression”: “XXX”
}
]
}
},
“ioConfig”: {
“stream”: “ETL_STREAM”,
“inputFormat”: {
“type”: “json”,
“flattenSpec”: {
“useFieldDiscovery”: true,
“fields”: [
{
“type”: “path”,
“name”: “time”,
“expr”: “.time" }, { "type": "path", "name": "v3", "expr": ".content.values.v3”
},
{
“type”: “path”,
“name”: “usd”,
“expr”: “$.content.values.revenue_usd" }, { "type": "path", "name": "props", "expr": ".content.values.$props”
}
]
},
“featureSpec”: {}
},
“endpoint”: “kinesis.us-west-2.amazonaws.com”,
“replicas”: 1,
“taskCount”: 4,
“taskDuration”: “PT3600S”,
“startDelay”: “PT5S”,
“period”: “PT30S”,
“useEarliestSequenceNumber”: false,
“completionTimeout”: “PT1800S”,
“lateMessageRejectionPeriod”: “PT86400S”,
“earlyMessageRejectionPeriod”: null,
“lateMessageRejectionStartDateTime”: null,
“recordsPerFetch”: 2000,
“fetchDelayMillis”: 0,
“awsAssumedRoleArn”: null,
“awsExternalId”: null,
“deaggregate”: true,
“type”: “kinesis”
},
“tuningConfig”: {
“type”: “kinesis”,
“maxRowsInMemory”: 1000000,
“maxBytesInMemory”: 0,
“maxRowsPerSegment”: 5000000,
“maxTotalRows”: null,
“intermediatePersistPeriod”: “PT10M”,
“basePersistDirectory”: “/opt/apache-druid-0.20.1/var/tmp/druid-realtime-persist9030703509115802670”,
“maxPendingPersists”: 0,
“indexSpec”: {
“bitmap”: {
“type”: “roaring”,
“compressRunOnSerialization”: true
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”,
“segmentLoader”: null
},
“indexSpecForIntermediatePersists”: {
“bitmap”: {
“type”: “roaring”,
“compressRunOnSerialization”: true
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”,
“segmentLoader”: null
},
“buildV9Directly”: true,
“reportParseExceptions”: false,
“handoffConditionTimeout”: 0,
“resetOffsetAutomatically”: false,
“skipSequenceNumberAvailabilityCheck”: true,
“segmentWriteOutMediumFactory”: null,
“workerThreads”: null,
“chatThreads”: null,
“chatRetries”: 8,
“httpTimeout”: “PT10S”,
“shutdownTimeout”: “PT80S”,
“recordBufferSize”: 1000000,
“recordBufferOfferTimeout”: 5000,
“recordBufferFullWait”: 5000,
“fetchSequenceNumberTimeout”: 20000,
“fetchThreads”: null,
“logParseExceptions”: true,
“maxParseExceptions”: 2147483647,
“maxSavedParseExceptions”: 0,
“maxRecordsPerPoll”: 100,
“intermediateHandoffPeriod”: “P2147483647D”,
“repartitionTransitionDuration”: “PT120S”,
“offsetFetchPeriod”: “PT30S”
}
}
}

I set “taskDuration”: “PT3600S”(1 hour) “lateMessageRejectionPeriod”: “PT86400S”(1 day) and “taskCount”: 4
If Im right, I think the number of segments per 5minutes should be A multiple of 4. But we actually did compact and set skipOffsetFromLatest to PT32H. Therefore, the time before 32 hours ago has less segments, but the queries with those intervals did not have better performance.

The sketch size however does impact performance much. It can reduce half of time if I change size from 65536 to 32768. The less size means less query time but more bias.

I also set druid.query.groupBy.maxOnDiskStorage=10g on historical. If not the query will fail with a “Resource limit exceeded” error. Don’t know if that impacts the performance

在2021年4月20日星期二 UTC+8 上午12:50:18ben....@imply.io 写道:

groupBy queries can be slower, and 10G sounds like it must be a big groupBy query. If you just do a very
simple query with the sketch, how does that perform? (Eg, just a count and a where.) Could it be a slow
query for other reasons? Or maybe the fact that reducing sketch size helps means that the issue is something
with the memory configuration of the historicals (or brokers).