Thanks for your reply.
I post my kinesis job below:
“type”: “kinesis”,
“spec”: {
“dataSchema”: {
“dataSource”: “app_MIN_V4”,
“timestampSpec”: {
“column”: “time”,
“format”: “posix”,
“missingValue”: null
},
“dimensionsSpec”: {
“dimensions”: [
{
“type”: “string”,
“name”: “appid”,
“multiValueHandling”: “SORTED_ARRAY”,
“createBitmapIndex”: true
},
{
“type”: “string”,
“name”: “active”,
“multiValueHandling”: “SORTED_ARRAY”,
“createBitmapIndex”: false
},
{
“type”: “string”,
“name”: “install”,
“multiValueHandling”: “SORTED_ARRAY”,
“createBitmapIndex”: false
},
{
“type”: “string”,
“name”: “pay”,
“multiValueHandling”: “SORTED_ARRAY”,
“createBitmapIndex”: false
}
],
“dimensionExclusions”: [
“uid”,
“revenue”,
“payMoney”,
“payCount”,
“time”
]
},
“metricsSpec”: [
{
“type”: “longSum”,
“name”: “payCount”,
“fieldName”: “pay”,
“expression”: null
},
{
“type”: “doubleSum”,
“name”: “payMoney”,
“fieldName”: “revenue”,
“expression”: null
},
{
“type”: “thetaSketch”,
“name”: “uid”,
“fieldName”: “uid”,
“size”: 65536,
“shouldFinalize”: true,
“isInputThetaSketch”: false,
“errorBoundsStdDev”: null
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “FIVE_MINUTE”,
“queryGranularity”: “FIVE_MINUTE”,
“rollup”: true,
“intervals”: null
},
“transformSpec”: {
“filter”: null,
“transforms”: [
{
“type”: “expression”,
“name”: “revenue”,
“expression”: “XXX”
},
{
“type”: “expression”,
“name”: “active”,
“expression”: “XXX”
},
{
“type”: “expression”,
“name”: “pay”,
“expression”: “XXX”
},
{
“type”: “expression”,
“name”: “install”,
“expression”: “XXX”
}
]
}
},
“ioConfig”: {
“stream”: “ETL_STREAM”,
“inputFormat”: {
“type”: “json”,
“flattenSpec”: {
“useFieldDiscovery”: true,
“fields”: [
{
“type”: “path”,
“name”: “time”,
“expr”: “.time"
},
{
"type": "path",
"name": "v3",
"expr": ".content.values.v3”
},
{
“type”: “path”,
“name”: “usd”,
“expr”: “$.content.values.revenue_usd"
},
{
"type": "path",
"name": "props",
"expr": ".content.values.$props”
}
]
},
“featureSpec”: {}
},
“endpoint”: “kinesis.us-west-2.amazonaws.com”,
“replicas”: 1,
“taskCount”: 4,
“taskDuration”: “PT3600S”,
“startDelay”: “PT5S”,
“period”: “PT30S”,
“useEarliestSequenceNumber”: false,
“completionTimeout”: “PT1800S”,
“lateMessageRejectionPeriod”: “PT86400S”,
“earlyMessageRejectionPeriod”: null,
“lateMessageRejectionStartDateTime”: null,
“recordsPerFetch”: 2000,
“fetchDelayMillis”: 0,
“awsAssumedRoleArn”: null,
“awsExternalId”: null,
“deaggregate”: true,
“type”: “kinesis”
},
“tuningConfig”: {
“type”: “kinesis”,
“maxRowsInMemory”: 1000000,
“maxBytesInMemory”: 0,
“maxRowsPerSegment”: 5000000,
“maxTotalRows”: null,
“intermediatePersistPeriod”: “PT10M”,
“basePersistDirectory”: “/opt/apache-druid-0.20.1/var/tmp/druid-realtime-persist9030703509115802670”,
“maxPendingPersists”: 0,
“indexSpec”: {
“bitmap”: {
“type”: “roaring”,
“compressRunOnSerialization”: true
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”,
“segmentLoader”: null
},
“indexSpecForIntermediatePersists”: {
“bitmap”: {
“type”: “roaring”,
“compressRunOnSerialization”: true
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”,
“segmentLoader”: null
},
“buildV9Directly”: true,
“reportParseExceptions”: false,
“handoffConditionTimeout”: 0,
“resetOffsetAutomatically”: false,
“skipSequenceNumberAvailabilityCheck”: true,
“segmentWriteOutMediumFactory”: null,
“workerThreads”: null,
“chatThreads”: null,
“chatRetries”: 8,
“httpTimeout”: “PT10S”,
“shutdownTimeout”: “PT80S”,
“recordBufferSize”: 1000000,
“recordBufferOfferTimeout”: 5000,
“recordBufferFullWait”: 5000,
“fetchSequenceNumberTimeout”: 20000,
“fetchThreads”: null,
“logParseExceptions”: true,
“maxParseExceptions”: 2147483647,
“maxSavedParseExceptions”: 0,
“maxRecordsPerPoll”: 100,
“intermediateHandoffPeriod”: “P2147483647D”,
“repartitionTransitionDuration”: “PT120S”,
“offsetFetchPeriod”: “PT30S”
}
}
}
I set “taskDuration”: “PT3600S”(1 hour) “lateMessageRejectionPeriod”: “PT86400S”(1 day) and “taskCount”: 4
If Im right, I think the number of segments per 5minutes should be A multiple of 4. But we actually did compact and set skipOffsetFromLatest to PT32H. Therefore, the time before 32 hours ago has less segments, but the queries with those intervals did not have better performance.
The sketch size however does impact performance much. It can reduce half of time if I change size from 65536 to 32768. The less size means less query time but more bias.
I also set druid.query.groupBy.maxOnDiskStorage=10g on historical. If not the query will fail with a “Resource limit exceeded” error. Don’t know if that impacts the performance
在2021年4月20日星期二 UTC+8 上午12:50:18ben....@imply.io 写道: