Querying with a limit and grouping by a numeric field returns inconsistent results

I have the following query:

SELECT TIME_FORMAT(__time,'YYYY/MM/dd') AS "data_date", campaign AS "Campaign", (SUM(demand_revenue * (1.0 + agency_fee))) AS "Spend" 
 FROM dtv_impression_request 
 WHERE __time >= '2021-06-01 00:00:00' AND __time <= '2021-08-15 23:59:59' 
 AND (platform_client_id = 514) 
 GROUP BY campaign, TIME_FORMAT(__time,'YYYY/MM/dd')  
 ORDER BY TIME_FORMAT(__time,'YYYY/MM/dd') asc
 LIMIT 25

I have run the exact same query back to back with in seconds of each other and the exact same query would yield different results for the spend field. What is interesting is, if I remove the limit field in the query it will return back the exact same results in the spend field when that query is run back to back, which is what is expected. Another interesting observation is that if I leave in the limit field, but cast campaign, which is in the group by to CHAR (it is ingested as a LONG), I get back the exact same/expected results back to back. Basically, I am noticing inconsistencies in the values of the spend calculation when there is a limit and a numeric group by field in the query.

We have tried the same type of query on a different dataset and we are experiencing the same issue on both the production and canary environment.

The Druid cluster that this query is being run on is version .21.1.

That is interesting… Can you enter the query in the druid console, and get “Explain SQL Query” output for each of the 3? It sounds like your query is using an (approximate) TopN, and the other 2 may not be for some reason. (Especially the CAST is interesting.) You can force exact results in the query context (useApproximateTopN: false) , and that might be more consistent (but slower). If you’re submitting in the console, there are toggles next to Run for turing this off, too.

@Ben_Krug,

Thanks for the reply. Here are the 3 queries with their explain (Original with limit, no limit, and limit but with campaign converted to char):

Limit Query:
SELECT TIME_FORMAT(__time,‘YYYY/MM/dd’) AS “data_date”, campaign AS “Campaign”, (SUM(demand_revenue * (1.0 + agency_fee))) AS “Spend”
FROM dtv_impression_request
WHERE __time >= ‘2021-06-01 00:00:00’ AND __time <= ‘2021-08-15 23:59:59’
AND (platform_client_id = 514)
GROUP BY campaign, TIME_FORMAT(__time,‘YYYY/MM/dd’)
ORDER BY TIME_FORMAT(__time,‘YYYY/MM/dd’) asc
LIMIT 25

{
“queryType”: “groupBy”,
“dataSource”: {
“type”: “table”,
“name”: “dtv_impression_request”
},
“intervals”: {
“type”: “intervals”,
“intervals”: [
“2021-06-01T00:00:00.000Z/2021-08-15T23:59:59.001Z”
]
},
“virtualColumns”: [
{
“type”: “expression”,
“name”: “v0”,
“expression”: “timestamp_format(”__time",‘YYYY/MM/dd’,‘UTC’)",
“outputType”: “STRING”
}
],
“filter”: {
“type”: “selector”,
“dimension”: “platform_client_id”,
“value”: “514”,
“extractionFn”: null
},
“granularity”: {
“type”: “all”
},
“dimensions”: [
{
“type”: “default”,
“dimension”: “campaign”,
“outputName”: “d0”,
“outputType”: “LONG”
},
{
“type”: “default”,
“dimension”: “v0”,
“outputName”: “d1”,
“outputType”: “STRING”
}
],
“aggregations”: [
{
“type”: “doubleSum”,
“name”: “a0”,
“fieldName”: null,
“expression”: “(“demand_revenue” * (1.0 + “agency_fee”))”
}
],
“postAggregations”: ,
“having”: null,
“limitSpec”: {
“type”: “default”,
“columns”: [
{
“dimension”: “d1”,
“direction”: “ascending”,
“dimensionOrder”: {
“type”: “lexicographic”
}
}
],
“limit”: 25
},
“context”: {
“groupByStrategy”: “v2”,
“populateCache”: false,
“sqlQueryId”: “e33194bc-583d-4fef-8230-79d0cc36ebb4”,
“useApproximateCountDistinct”: false,
“useApproximateTopN”: false,
“useCache”: false
},
“descending”: false
}


No Limit Query:
SELECT TIME_FORMAT(__time,‘YYYY/MM/dd’) AS “data_date”, campaign AS “Campaign”, (SUM(demand_revenue * (1.0 + agency_fee))) AS “Spend”
FROM dtv_impression_request
WHERE __time >= ‘2021-06-01 00:00:00’ AND __time <= ‘2021-08-15 23:59:59’
AND (platform_client_id = 514)
GROUP BY campaign, TIME_FORMAT(__time,‘YYYY/MM/dd’)
ORDER BY TIME_FORMAT(__time,‘YYYY/MM/dd’) asc

{
“queryType”: “groupBy”,
“dataSource”: {
“type”: “table”,
“name”: “dtv_impression_request”
},
“intervals”: {
“type”: “intervals”,
“intervals”: [
“2021-06-01T00:00:00.000Z/2021-08-15T23:59:59.001Z”
]
},
“virtualColumns”: [
{
“type”: “expression”,
“name”: “v0”,
“expression”: “timestamp_format(”__time",‘YYYY/MM/dd’,‘UTC’)",
“outputType”: “STRING”
}
],
“filter”: {
“type”: “selector”,
“dimension”: “platform_client_id”,
“value”: “514”,
“extractionFn”: null
},
“granularity”: {
“type”: “all”
},
“dimensions”: [
{
“type”: “default”,
“dimension”: “campaign”,
“outputName”: “d0”,
“outputType”: “LONG”
},
{
“type”: “default”,
“dimension”: “v0”,
“outputName”: “d1”,
“outputType”: “STRING”
}
],
“aggregations”: [
{
“type”: “doubleSum”,
“name”: “a0”,
“fieldName”: null,
“expression”: “(“demand_revenue” * (1.0 + “agency_fee”))”
}
],
“postAggregations”: ,
“having”: null,
“limitSpec”: {
“type”: “default”,
“columns”: [
{
“dimension”: “d1”,
“direction”: “ascending”,
“dimensionOrder”: {
“type”: “lexicographic”
}
}
],
“limit”: 2147483647
},
“context”: {
“groupByStrategy”: “v2”,
“populateCache”: false,
“sqlQueryId”: “c9122e7d-982e-46c4-8407-1fa371cb4da7”,
“useApproximateCountDistinct”: false,
“useApproximateTopN”: false,
“useCache”: false
},
“descending”: false
}


Limit Query with Cast:
SELECT TIME_FORMAT(__time,‘YYYY/MM/dd’) AS “data_date”, CAST(campaign AS CHAR) AS “Campaign”, (SUM(demand_revenue * (1.0 + agency_fee))) AS “Spend”
FROM dtv_impression_request
WHERE __time >= ‘2021-06-01 00:00:00’ AND __time <= ‘2021-08-15 23:59:59’
AND (platform_client_id = 514)
GROUP BY CAST(campaign AS CHAR), TIME_FORMAT(__time,‘YYYY/MM/dd’)
ORDER BY TIME_FORMAT(__time,‘YYYY/MM/dd’) asc
limit 25

{
“queryType”: “groupBy”,
“dataSource”: {
“type”: “table”,
“name”: “dtv_impression_request”
},
“intervals”: {
“type”: “intervals”,
“intervals”: [
“2021-06-01T00:00:00.000Z/2021-08-15T23:59:59.001Z”
]
},
“virtualColumns”: [
{
“type”: “expression”,
“name”: “v0”,
“expression”: “timestamp_format(”__time",‘YYYY/MM/dd’,‘UTC’)",
“outputType”: “STRING”
}
],
“filter”: {
“type”: “selector”,
“dimension”: “platform_client_id”,
“value”: “514”,
“extractionFn”: null
},
“granularity”: {
“type”: “all”
},
“dimensions”: [
{
“type”: “default”,
“dimension”: “campaign”,
“outputName”: “d0”,
“outputType”: “STRING”
},
{
“type”: “default”,
“dimension”: “v0”,
“outputName”: “d1”,
“outputType”: “STRING”
}
],
“aggregations”: [
{
“type”: “doubleSum”,
“name”: “a0”,
“fieldName”: null,
“expression”: “(“demand_revenue” * (1.0 + “agency_fee”))”
}
],
“postAggregations”: ,
“having”: null,
“limitSpec”: {
“type”: “default”,
“columns”: [
{
“dimension”: “d1”,
“direction”: “ascending”,
“dimensionOrder”: {
“type”: “lexicographic”
}
}
],
“limit”: 25
},
“context”: {
“groupByStrategy”: “v2”,
“populateCache”: false,
“sqlQueryId”: “4ddcdc66-da06-485e-a84e-38f22de6a6e9”,
“useApproximateCountDistinct”: false,
“useApproximateTopN”: false,
“useCache”: false
},
“descending”: false
}

Let me know if you see anything.

Did you always have useApproximateTopN: false, or was that just set? Each plan uses a groupBy, which I thought should be precise. It’s strange if the first query, as planned, isn’t… If that’s the case, I’ll dig a little more, I have a vague memory I might have seen something like that before.

@Ben_Krug useApproximateTopN is always set to false on our side for all our queries; I also double checked to make sure the first query had it set to false as well and it is.

We have done some testing in EC2 trying out different versions of druid and we believe that this issue started happening when we did an upgrade from druid .20.1 to .21.1. We noticed that if we have a historical node running .20 and another historical node running .21, the exact issue above happens. If we have all historical nodes as .21.1 or all .20 then the issue above does not happen.

Unfortunately, we double checked the versions on all our services in production and they all match at .21.1, but since we were able to reproduce the same issue with mismatching versions in ec2, we strongly believe there might be some weird issue that occurred during the upgrade process.

@Ben_Krug do you have any ideas if there are additional checks we can look for to see if there is something that stay staled after upgrade process or if there are checks that we can perform to ensure that the upgrade process was done properly? Or is there some service that needs to reset/wiped?

This is where my head went first:

Ie, I wondered if the LIMIT is being pushed down to the historicals inconsistently between versions. I mean… I don’t know… :smiley:

Re: checks, I would use the status APIs.

@petermarshallio,

Yeah I am not sure. We did check via the status APIs that is how we confirmed that all the services are all on the same version, .21.1.

Just wondering if there is anything else we can check; we even tried wiping zookeeper. If we do a clean install of druid .21.1 or .20.2 on a test cluster we do not run into this issue, that is why we suspect we ran into an upgrade issue.

@petermarshallio,

Another interesting find… we don’t have forceLimitPushDown enabled (we are using the default which is false, but I have done tests where I specifically set the query context to have forceLimitPushDown to false), but if I ensure that all the fields in the group by are in the order by statement, this issue goes away.

Interesting… I think this might be a good Github issue to get the full answer from the devs. I think that ORDER BY may be applied in the data nodes after the GROUP BY – so perhaps when you don’t ORDER BY the fan in is unordered, so different results from different nodes (perhaps even different segments) where the order of the results isn’t the same would give slightly different results… if that makes sense???

@petermarshallio ,

Thanks for the suggestion. I have brought it up on github: Inconsistent results if running a limit query with a group by on a numeric dimensional field and not adding that field to order by statement post upgrade of Druid from 20.1 to .21.1 · Issue #11758 · apache/druid · GitHub

1 Like