Hybrid batch/streaming setup question

I have a bit of an open-ended question here, apologies for not being more focused.

We currently have a production Druid up and running that gets its data entirely on index-task batch ingestion. Our primary use case is reporting, and the batches to be ingested are generated by a data warehousing team. It collects output from variety of asynchronous back-end processes and creates batches to follow an “eventually correct” model for the data. So it can take up to an hour (in a worst case) for the first appearance of an event in one of the batches, and then additional data can decorate the event over time. We just keep reingesting the same batch repeatedly as it becomes more and more correct. The hour latency for first appearance of an event is okay for the reporting use case where we’re looking at week or month long ranges and aggregating.

We’re now looking at using Druid for a secondary use case where we wouldn’t be working with aggregations, but instead displaying the events in a list form for a drill-down sort of view of the data. The Druid Select query seems great for powering this sort of view - it’s much much faster than the Postgres datastore that currently powers this view, which cannot produce results for our bigger clients in anything close to a responsive manner. However, the hour latency before first appearance of an event is not acceptable for this view. We’re looking at ways of speeding things up from the data warehouse/ingestion pipeline side, but ideally we’d like the event to show up more in a matter of a minute or two, and it would be very hard to get the pipeline up to that speed. It’s okay if some of the additional data flowing in through the batch process takes longer.

It seems like some kind of Lambda architecture / hybrid batch/streaming model might work here. But I’m having a hard time figuring it out. The fundamental issue is that if we stream in an event but then later ingest a batch for that segment which doesn’t contain that streamed event, it will be overwritten and lost. (Though eventually it will show up in the batch and get back into Druid later on.)

I’ve thought about restricting the batch ingestion to avoid overwriting recent events but it’s problematic. Our ingestion granularity is a day. Also the events we’re tracking are phone calls which can last up to 4 hours and the timestamp is the local start time of the call. So if we get a realtime event seconds after the end of a 4 hour phone call, it’s going to be placed in the segment -4 hours from the event. So would we have to avoid ingesting batches for that entire segment - basically realtime only and no batch ingestion until we are at least 4 hours past the segment boundary? Or can we restrict the range that the batch ingestion operates on using its interval in the task scheme? E.g., it’s noon and we’re notified there’s a fresh batch, can we do the ingestion so that it only covers midnight to 8 AM in today’s segment and (streamed) events already in the segment aren’t touched?

Even if so, it’s not that easy because as I mentioned, the timestamp is local start time, but we format it as UTC for ingestion. So a call which started at 1AM UTC -12 hours local time, we will use a timestamp of 1AM UTC but we’ll not actually get a realtime event until 1PM UTC at the earliest (actually later than that depending on how long the call is - 5 PM UTC if it’s a four hour call). So under that scenario we’d have to restrict the batch to -16 hours from present to avoid erasing any calls. In addition to getting really complicated and potentially error prone, that’s probably too long for us to wait to get any of the batch updates that we need for our main reporting use case.

So, any suggestions? Or anywhere my thinking is wrong? Again, apologies for the open-endedness of this question. I’ll be happy to fill in any more details that may be relevant.

Thanks,

Ryan

Hi Ryan,

For more info about lambda architectures you can check out: http://static.druid.io/docs/radstack.pdf

This is one approach.

Hey Ryan,

In general with a hybrid batch/streaming setup you want to think in terms of “most recent X time is owned by realtime, anything earlier than that is owned by batch”. Time ranges owned by batch might not have been batch-loaded yet, but the idea is they could be, and realtime shouldn’t be writing there anymore.

In Druid the segmentGranularity is the granularity you are allowed to use for “X”. It’s not possible for a segmentGranular interval to be written to by both batch and realtime simultaneously. So it sounds like in your case you’d be happier using segmentGranularity = “hour” rather than “day”.

Thanks for the tips. I’ll read that paper when I have a chance too.

It seems like the lag time between our event “start” combined with the local time conversion just creates what is effectively a really long lag time before we can get first hear about “realtime” event, just too wide of a window to keep open without doing any batch updates. It may not be something we can finesse, but I’ll keep exploring it. Our program management might find it acceptable to keep a smaller period open for some realtime events at the cost of some outliers (the four-hour calls taking place in a timezone with a really large offset from UTC). Maybe using finer granularity for the trailing day or week can help too; I’ll look at that for sure.

It sounds like life might be easier if you convert times between time zones at ingestion, is there a reason you aren’t doing that? So instead of representing a call that started at 1AM UTC-12 as 1AM UTC, representing it as 1PM UTC.

The reason the decision was made to ingest it as local time is that all of our query/consumption use cases are in local time. Query date ranges are in local dates, we do grouping by hour of day local time and day of week local time, etc.

However, this is turning out to be such a pain both for ingestion and for our desire to move to a hybrid streaming/batch setup that we are likely going to change gears and just use actual UTC time as you suggest. (We don’t need to convert to get it - we already have it, it’s more that we’ll want to not convert to local time!) We’ll have to do a bit more work on the query side to support a local date-based API but that’s looking to be well worth it. “Always store in UTC” is a rule of thumb for a reason. :slight_smile: