Realtime count values not exact before and after handoff

hello,

so our payment model depends on (exact) unique counts per day. We have segmentGranularity=1day, and all historical data is good. However segments from realtime return wrong figures (higher), because how druid internally stores the data. What happens is for example with intermediatePersistPeriod<segmentGranularity:

1, a new intermediate segment (spill, i’m not sure about the right word) is created

2, a new event is received, it is inserted with count=1

3, the same event is received, it increments counter, count=2

4, intermediatePersistPeriod pass, a new intermediate segment (spill) is created

5, the same event is received, it is inserted with count=1

so now a count aggregator returns 2, a longSum 3. I saw other people encounter this issue[1], but for them generally the longSum is the solution (because they are interested in the total count, not the unique count), unfortunately not for us. As i said once handoff happens it does aggregate intermediate segments (spills), so after that the count returns the right figure again.

We have tried nested groupBy queries, but the performance was bad (i guess expected). Currently our solution is to do intermediatePersistPeriod=24H (which in turn requires windowPeriod=24H to not loose data when it fails), which feels like an abuse. We were wondering if Appenderators, or like the epic changeset coming to Realtime solves this, or are there any plans fixing this (given it’s not just a ‘known way of working’, in which case not sure what other option we would have)?

[1] https://groups.google.com/d/msg/druid-development/T8KP4W8PX3s/-rBmpszHo1AJ

thank you

sry i just wanted to also mention: when i say the same event i just mean the same according to cardinality (e.g. all the dimensions are the same except timestamp), so it’s not a duplicate event from the message bus, they are real distinct events, just not unique distinct events (per day), again the count figure inexactness is only an artifact of druid’s internal data format.

Hey Igy,

Druid is not really intended to guarantee perfect rollup at all times, especially for realtime, where intermediate spills are not merged until the end of the segmentGranularity period. It’s also possible that two messages that should roll up actually go to different realtime indexers and end up in different segments. So in realtime the rollup is more of a performance optimization and less of a perfect guarantee. None of the planned changes coming in Druid would change this behavior.

Some possible things you could do instead:

  1. Hadoop based batch indexing does guarantee perfect rollup. You could use Hadoop to “reindex” your realtime segments a few hours or days after initial ingestion, which would compact them such that rows are rolled up how you’d expect (even if they originally went to different realtime indexers).

  2. You could use the “cardinality” aggregator at query time in “byRow: true” mode to get an approximate rolled-up row count. This uses HLL so it is not exact.

  3. If you have a single column you want to count the values of, you could use lexicographic topNs and page through them, basically retrieving all unique values. But this would be pretty slow if you need to retrieve millions of values.

hey gian,

see inline

Hey Igy,

Druid is not really intended to guarantee perfect rollup at all times, especially for realtime, where intermediate spills are not merged until the end of the segmentGranularity period. It’s also possible that two messages that should roll up actually go to different realtime indexers and end up in different segments. So in realtime the rollup is more of a performance optimization and less of a perfect guarantee. None of the planned changes coming in Druid would change this behavior.

Some possible things you could do instead:

  1. Hadoop based batch indexing does guarantee perfect rollup. You could use Hadoop to “reindex” your realtime segments a few hours or days after initial ingestion, which would compact them such that rows are rolled up how you’d expect (even if they originally went to different realtime indexers).

how would this solve our problem? after handoff the data is already good. i guess i cant reindex partial realtime stuff before the segment interval ends. again historical works ok, our problem is only that we need to query both historical and realtime, and realtime simply returns bad figures (until the segment is handed off to historical, when suddenly the query returns good values)

  1. You could use the “cardinality” aggregator at query time in “byRow: true” mode to get an approximate rolled-up row count. This uses HLL so it is not exact.

as i said we require this number to be exact, our monetization model is based on this figure.

  1. If you have a single column you want to count the values of, you could use lexicographic topNs and page through them, basically retrieving all unique values. But this would be pretty slow if you need to retrieve millions of values.

i think having intermediatePersist=windowPeriod=24H is a much easier solution than this.

again i feel this is a bug in druid. it simply returns different figures for the same exact query for the same exact data depending on the data is stored in realtime vs historical.

regards, PoTa

so to repeat: count aggregator for realtime data returns random figures (random in the sense it is a factor of your distribution of your data and the persistPeriod), which does not sound right. once data is handed off to historical count returns the expected number. i believe this is (a design) bug, where the details of your chosen realtime persist format leaks out.

Hey PoTa,

IMO the behavior you’re looking for is really outside the scope of what Druid is intended to do. There’s no concept of a primary key, and no guarantee that there will only be a single row with a particular set of dimensions. Usually that is the case but it is not guaranteed. For all aggregators other than “count”, it doesn’t affect the result of the query. We usually recommend against using “count” at query time for this reason, unless you’re just trying to count the number of Druid rows (which will be greater than or equal to the number of unique dimension combinations).

That lack of guarantee is actually useful for performance and flexibility, since it means we don’t need to shuffle data around when scaling realtime ingestion up or down, or when adding new data to a previously existing time range. So I think it’s not likely that we will start making this guarantee in the future.

That being said you should get what you want if:

  • you always send messages with the same dimension set to the same realtime partition (generally this also means avoiding scaling up & down)

  • you “disable” intermediate persists by setting maxRowsInMemory and intermediatePersistPeriod really high. Just watch your heap to make sure you don’t run out of heap memory.

Hey Gian,

IMO the behavior you’re looking for is really outside the scope of what Druid is intended to do. There’s no concept of a primary key, and no guarantee that there will only be a single row with a particular set of dimensions. Usually that is the case but it is not guaranteed. For all aggregators other than “count”, it doesn’t affect the result of the query. We usually recommend against using “count” at query time for this reason, unless you’re just trying to count the number of Druid rows (which will be greater than or equal to the number of unique dimension combinations).

That lack of guarantee is actually useful for performance and flexibility, since it means we don’t need to shuffle data around when scaling realtime ingestion up or down, or when adding new data to a previously existing time range. So I think it’s not likely that we will start making this guarantee in the future.

i understand the problems, however i still feel like these are technical limitations of your current chosen design, not an expected way of working. when i explained this situation to either fj or nishant they reacted the same (both telling me they believe this is a bug). pls don’t get me wrong, i’m not insisting you must ‘fix’ this, i’m just trying to present a valid use case (what we expect from druid) and better understand our alternatives.

obviously it’s very hard for me to suggest anything, because i am not enough familiar with the full design nor the codebase. also i believe it’s technically possible to do what i am asking for, but that of course does not mean it’s viable to do it, nor that’s in the best interest for druid. however at this point i’m not sure what else i can provide/do to move this issue forward.

That being said you should get what you want if:

  • you always send messages with the same dimension set to the same realtime partition (generally this also means avoiding scaling up & down)

based on discussion with Nishant tranquility already does/supports this, but i’m not sure about your comment about scaling up/down.

  • you “disable” intermediate persists by setting maxRowsInMemory and intermediatePersistPeriod really high. Just watch your heap to make sure you don’t run out of heap memory.

yes as i said we are already doing this, however it feels like an abuse. first we need to set windowPeriod also high (otherwise when it fails and we need to restart it it would ignore events). second - and this is more like a question - we are having subpar performance with our realtime tasks (like the same query for 1 month runs for 0.05sec on historicals which runs 1.5sec for 1 day on realtime), we tried to raise middlemanager (peon) numthreads but it didnt help (tranq/realtime task params dont seem to have anything for this). also is there any log for these mm task transformed realtime nodes? the logs for the tasks themselves show nothing related to querying.

thanks, regards, PoTa

Hey PoTa,

I think the best way forward is a concrete proposal that would describe how we could change this behavior (or a firm decision that we’re not going to change the behavior). Perhaps FJ or Nishant could help out with that since they think this is a bug and I do not :slight_smile:

About your other questions,

  • My comment about scaling up/down refers to what happens when you tell Tranquility you want a different number of Druid partitions than you currently have. It uses consistent hashing to send messages with the same dimension set to the same partition, but when you change the number of partitions, it’s possible that two messages with the same dimension set will end up on different partitions (if one was sent before scaling and one was sent after). In this case they will never be combined together, even after handoff, without doing a background reindex/compaction.

  • About your query slowness, it might be related to the fact that you aren’t letting your tasks persist. Queries on the in-memory data are slower than queries on the on-disk data. Also queries on the on-disk data can be parallelized (since there are multiple spills and each can be queried independently) but queries on the in-memory data cannot. You could confirm by setting intermediatePersistPeriod lower and seeing if that helps performance. You could work around this by using a shorter segmentGranularity.

  • About query logs, try enabling the logging emitter and set your emitter logLevel to INFO.

After understanding the issue more, I don’t think it is a bug, and an unfortunate artifact of Druid until exactly once from Kafka stuff is merged.

FJ, this behavior would remain even after that.

hey Gian,

so as i said i can only suggest things that i don’t know if viable at all, but here it goes

so to recap problem: druid does provide perfect rollup for historical segments. druid does not provide perfect rollup for realtime segments. this causes different amount of (druid) rows (as witnessed by query time count aggregator) in the same segment (data) before and after handoff. unfortunately the (query time) count aggregator is the single way of querying druid precise cardinality (efficiently), and thus does not work for realtime. it adds to the problem that the ‘error’ rate (count in realtime vs count in historical) is a factor of the distribution of the data (among others), which is something can not be reasoned about, more importantly the only way of forcing it to be upper-bounded is having serious negative impact on (query) performance (raising intermediate persist period would lower error rate, but will raise query response time).

‘perfect’ solution: not doing separate intermediate persists, but (periodically) ‘growing’ a single persist would solve perfect rollup. this has the problem not accounting for (aggregating the data from) the most recent ‘slice’ that is in memory (and thus would still produce ‘duplicates’ i guess).

‘perfect’ ‘on-the-fly’ solution: i was wondering if introducing an abstraction on top of realtime intermediate segments (that would fix the count) is viable. meaning it would need first aggregate the data somehow. this could only care for count, as that is the only aggregator ‘broken’. this could be part of realtime itself or sit between broker and realtime.

‘tolerating’ solution: something like periodically rolling up intermediate persists (as opposed to rolling up only once before handoff). this way the error rate would be bounded by the ratio of intermediate rollup and the perfect rollup, so if we would have 10 minute intermediate rollup VS 1 day perfect rollup it would minimize ‘overcounting’.

regards, PoTa

When talk with PoTa, his message event is (from_journal_id,from_article_id,user_id) and his query is select count(distinct user_id) group by from_journal_id, from_article_id.
So his requirement is to support exact distinct count with realtime data.

在 2016年2月20日星期六 UTC+8下午5:10:45,Igy Warrijor写道:

hey,

that is not exactly right. our schema is (from_journal_id,from_article_id,to_journal_id,to_article_id,user_id) and the query is select count(distinct(to_journal_id,to_article_id,user_id)) group by from_journal_id, from_article_id, so our distinct is always on ALL dimensions and per day, that is why count is working with qG=sG=1D on historical/perfect rollup and does not on realtime/not perfect rollup.

regards, PoTa