Custom aggregations based on the time dimension

Hi there,

I’d like to get your feedback on the best way to achieve the following.

I’ve got a datasource with the following columns:

revenue_local(double), currency_id(int), user_id(HyperLogLog)

I want to multiply the revenue_local column by a conversion rate, based on the time of the row and the currency_id.

So for instance you can imagine that we have a table such as the one below to do the conversion :

currency_id,date,rate

1,01/05/2015,1.2

1,02/05/2015,1.21

1,03/05/2015,1.19

2,02/05/2015,3.62

etc.

I want to do a query that will return sum(revenue_local * conversion rate), distinct(user_id) for a given granularity (all, month, day) and filters.

I see a couple of alternative to achieve that:

1°) Do two separate queries.

  • first query will return the distinct(user_id)

  • second query to return the revenue_local by day, then we apply the conversion rate by day and sum for a given granularity.

Cons of this approach:

  • twice the filtering

  • there’s a lot of logic that needs to happen at the application level (conversion rates, sum, merge of the two queries)

2°) Use a javascript aggregation such as that:

function applyRate(current, time, revenue_local) {

// Resolve the first day of the month the current entry was created

var date = new Date(time);

date.setDate(01);

var startOfMonth = date.getTime();

// Currency conversion table (by first day of each month within a date range)

var rates = {

‘1420070400000’: 1.18,

‘1422748800000’: 1.22,

‘1425168000000’: 1.15,

‘1427846400000’: 1.19,

‘1430438400000’: 1.18,

‘1433116800000’: 1.22,

‘1435708800000’: 1.15,

‘1438387200000’: 1.19

};

var rate = rates[startOfMonth] || 1;

return current + (revenue_local * rate);

}

note: this sample is using montly conversion rates and doesn’t take into account the currency_id, but the idea is that we want to hardcode the conversion table in the function.

Pros of this approach:

  • all the aggregation happens in druid

Cons:

  • this sample query is 5x times slower than just making a simple sum, and the conversion table will be a lot bigger in real life, so I’m expecting worse performances

3°) Combine the previous approach with sub queries.

The idea would be to do a sub query that returns result by day, then apply another query on top of it with the end granularity that would have the javascript aggregation. That would limit the performance penalty of the javascript aggregation because instead of being applied on all rows, it will only be applied on 1 row/day/currency

Unfortunately that doesn’t work, because sub queries only work for group by. I also tried to write my query as a groupBy, but I can’t find a way to access the __time field in the aggregator. Here’s the query:

{

“queryType”: “groupBy”,

“dataSource”: {

“type”: “query”,

“query”: {

“queryType”: “groupBy”,

“dataSource”: “userflow”,

“granularity”: {“type”: “period”, “period”: “P1M” },

“dimension”: “client_id”,

“aggregations”: [

{

“type”: “longSum”,

“fieldName”: “revenue_local”,

“name”: “revenue_local”

}

],

“filter”: {

“type”: “selector”,

“dimension”: “client_id”,

“value”: 2255

},

“intervals”: [

“2014-08-01T00/2015-02-01T00”

]

}

},

“granularity”: “all”,

“aggregations”: [

{

“type”: “javascript”,

“name”: “convertedRevenue”,

“fieldNames”: [

“__time”, // DOESN’T SEEM TO WORK

“revenue_local”

],

“fnAggregate”: “function lol(e,t,a){var n=new Date(t);n.setDate(1);var r=n.getTime(),i={1420070400000:1.18,1422748800000:1.22,1425168000000:1.15,1427846400000:1.19,1430438400000:1.18,1433116800000:1.22,1435708800000:1.15,1438387200000:1.19},l=i[r]||1;return e+a*l}”,

“fnCombine”: “function(partialA, partialB) { return partialA + partialB; }”,

“fnReset”: “function() { return 0; }”

}

],

“intervals”: [

“2014-08-01T00/2015-02-01T00”

]

}

4°) Develop our own aggregator and/or support timeseries sub queries

Does anyone have a similar scenario? How do you handle it?

Thanks in advance,

Julien.

In Druid the most straight forward way to do this is at ETL time, where you supply the event as revenue_local(double), revenue_normal(double), currency_id(int), user_id(HLL). This is assuming you have some sort of normalizable base that you always want the currency expressed as.

If you want to ask “What is my Turkish Lira revenue in terms of GBP, USD, and EUR as determined at query time” I don’t think there’s a straightforward way to calculate that without resorting to something like you have indicated. Note that doing the “sub query” approach will probably not work for topN if you have more than 1000 things in your group (or whatever you set druid.query.topN.minTopNThreshold to) unless you set your threshold really high.

Also, FYI, indicating time in ISO format (2015-01-05) and using pipes ( | ) to separate table elements helps both our international friends (in many places 1,01 means 1.01) and those of us who look at international stuff all the time and who may or may not have had to look at your example a few times to figure out what it meant. I also haven’t had my coffee yet :slight_smile:

Good point about the formatting, I’ll make sure to improve that next time :slight_smile:

As for doing that at ETL time, I know this is the best way and we already do it extensively. however the problem in our case is that the fx rate are known a lot later than when the data is ingested and they can change… So we’d rather apply them at query time than reprocess the data

Hello

Did you find best solution for this?
I have similar case where there will be cost based on time one per day and I have no clue how to apply the new cost at query time.

Also, what is this current thing added on function, it makes no sense to me

Thank you

We ended up writing our own java aggregator to optimise performance but it was quite similar to option 2. However I switched job at the end of 2015 and didn’t follow closely druid’s release since. There might be a better way now.