druid pre-aggregation time improvement

Hi all,

We recently had an increase in the volume of events coming into our druid ingestion stream, which led to an increase in our batch indexing time, despite our segment size remaining the same. Looking at the logs, it seems a majority of this time is spent on the pre-aggregation phase. The number of reducers for the IndexGeneratorJob is always 1, which from reading the code, looks like it is set depending on the segment granularity (if our segment granularity is 1 day, and we are running daily batch indexing job, there will only be 1 reducer). Please let me know if this is not a correct assumption.

I’d really like to reduce the amount of time needed for this batch indexing. One solution I can employ on my side is to pre-process the druid row generated json data; essentially doing the pre-aggregation in a separate map reduce job before handing off to the druid hadoop index task (it seems to me there is no limitation forcing us to do this pre-aggregation with a single reducer). But this seems odd, not being able to make use of the druid built in pre-aggregation. Is there a reason it is isn’t written to run as a separate MR job that could take advantage of more parallelism? Has anyone else encountered this issue and if so, what solution was employed to improve batch indexing time?

We are running druid 0.8.0.

Thanks,

James

Hi James, Druid 0.8.3 and 0.9.0 introduced significant optimizations to batch ingestion time. Can you try that first to see if the performance is sufficient?

Hey James,

There will be one reducer per generated Druid segment, of which there could be many per day. The idea is that if you set segmentGranularity = DAY and targetPartitionSize = 3M, and your job is going to generate 9M rows per day, that’s 3 segments and so you’ll get 3 reducers for that day. For most folks this is enough to keep the amount of data per reducer reasonable. But I could see it not being good enough if your pre-aggregation is actually really effective (like hundreds or thousands of input rows going into each Druid row). For this case it could be better to add another job that does pre-aggregation only and then the indexing job just reads what that wrote (right now we do pre-aggregation and indexing in the same job).

If you’re interested in helping out with a contribution to Druid then we could hash out the details of what that would look like. I think it’d be something like,

  • add an optional job before IndexGeneratorJob that pre-aggregates your data and writes it to a tmp location, using as many reducers as necessary

  • teach the IndexGeneratorJob how to operate on that pre-aggregated data (should be skipping a bunch of things it would have done, and reading from that tmp location instead of your actual data location)

  • add a config to enable the extra pre-aggregation job

  • possibly also enable pre-aggregation automatically if the determine-partitions job determines that the rollup ratio is above a certain threshold (to avoid swamping the index generator reducers)

And also, yeah, like FJ said upgrading to 0.8.3 is an easy way of cutting down indexing times.

Thanks for the quick feedback Fangjin and Gian. I’ll try an updated version of druid first to see how much time we cut down, but I think this is definitely a case where pre-aggregation is very effective (logs show reduction from ~80 million lines to 80,000 rows multiple times per job). I’ll try to setup an external job as previously proposed to do the pre-aggregation and as you said try to have it as an optional step in the druid workflow in the future.

Thanks!

Hey James,

FYI, I filed an issue for this: https://github.com/druid-io/druid/issues/2594

If you do end up wanting to implement this in Druid, please post on the issue so we can coordinate. Thanks!

Hi,

Can you try “useCombiner” option in the tuningConfig ? Please see http://druid.io/docs/0.8.3/ingestion/batch-ingestion.html for the documentation of same.

– Himanshu