Creating DAY granularity segments while batch-indexing hourly

Hi,

is it possible to create DAY granularity segments by submitting hadoop indexing task each hour?

I tried that out, this is the hourly task http://pastebin.com/raw/GMH5MMEZ but the DAY segment in deep storage has 24 versions after all.

All the index.zip files in the DAY segment have the same size but different md5sum.

Strange thing is that, the overall record count is way higher that in the original data-source that I’m trying to reindex. So I suspect it is not that easy.

Would you please give me some pointers ?

Hi Jakub,

Could you explain in more detail what you’re trying to do by doing hourly ingests of day granularity? Is this for historical data (older than a day old) or recent data? In your config, you have:

      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "DAY",
        "intervals" : [ "2016-08-03T01:00:00.000Z/2016-08-03T02:00:00.000Z" ]
      }

In general, the interval should cover a time period >= your segmentGranularity; if you submit another task to ingest the next interval (2016-08-03T02:00/2016-08-03T03:00), it will generate a segment spanning the whole day (2016-08-03/2016-08-04) but only containing data for 2:00-3:00 that will overshadow the previous segment and you won’t get the merged results that you may have been expecting.

Hey David,

we have a lambda-architecture-like pipeline (many ETLs) which is partitioned by HOUR, including Druid until this moment when it stopped scaling because we need to query data from 2015.

So I’m forced to reindex past 2 years of druid data with DAY granularity/segments for it to scale.

But the problem is that Druid is now the only component with DAY partitioning and all the other components in the pipeline have HOUR partitioning.

So I have 3 options :

  1. either I will work it around by queuing 24 incoming hours of data and index hadoop task to Druid with DAY segment granularity

  2. figure out how to index data hourly to druid but still getting the performance boost by having DAY segments

  3. optimize Druid for queries that hit 17520 HOUR segments.

I couldn’t figure out 2) because as you say, the HOUR segments overshadow itself withing DAY segment.

So I’m force to do the 1) hack, which is a problem because the pipeline is using partition introspection for dependency management, ie. it uses coordinator’s metadata API to know which hours are present in druid …

So I guess I will have to do 3), keep the HOUR segments and try to optimize historical node for it to be able to handle queries that hit 17520 segments.

Thank you for responding David, Jakub

Hey Jakub,

Some thoughts that might help (and a lot of this you may already know):

  • Ideally, your goal should be to generate Druid segments that are sized somewhere between 500MB-1GB. If your 17520 hourly granularity segments are significantly smaller than this, then you should be looking at moving to larger time buckets like DAY.

  • Your Druid segments don’t all need to have the same granularity - i.e. you can have some that are HOUR, some DAY, some YEAR, etc. Druid will take all the available segments and put together a timeline of the most recently generated segments (latest version number) that contain data for a given time, regardless of the segment’s granularity, and will use this to know how to answer queries.

  • If you really want to do this hourly DAY granularity ingestion, it should be possible to every hour submit a batch indexing job with DAY segment granularity but instead of specifying an hour long interval like 2016-08-03T02:00/2016-08-03T03:00, specify the interval for the full day like 2016-08-03/2016-08-04. You would then trigger this job with the same day long interval each hour and it would generate successively larger segments that contain the data for the past hour + all the other hours in the day before that, and the 24th run of the day would be a segment containing all of your data. You just have to be sure that you retain the data for all the previous hours. Also you’d want to have some segment killing tasks set up otherwise you’ll be using an excessive amount of deep storage.

  • Having said that, I’m not totally clear about your setup, but probably one of the following is what you actually want to do:

    • have a realtime ingestion pipeline that generates HOUR segments combined with a batch ingestion job that takes those segments and merges them together into DAY granularity every 24 hours or so. Your batch ingestion job can generate the DAY segment either from the raw data that was fed into the realtime indexers or by reading the segments generated by the realtime indexers and re-indexing them with a different schema (i.e. DAY segment granularity).

    • or, if you’re getting bursts of data from your pipeline every hour instead of a continuous stream, setting up a realtime pipeline may be overkill. In that case, it’d probably make sense to submit hourly ingestion tasks with HOUR segment granularity for the past hour of data, and then once per day run another ingestion task to generate a segment with DAY granularity that again can either source the input from the original data fed into Druid or from the completed segments generated by the hourly ingestion tasks. This aggregated DAY segment will overshadow the previous HOUR segments and will be used when responding to queries. Again, Druid has no problem running queries with segments of different granularities.

Hope this helps.

Hi David,

  • If you really want to do this hourly DAY granularity ingestion, it should be possible to every hour submit a batch indexing job with DAY segment granularity but instead of specifying an hour long interval like 2016-08-03T02:00/2016-08-03T03:00, specify the interval for the full day like 2016-08-03/2016-08-04. You would then trigger this job with the same day long interval each hour and it would generate successively larger segments that contain the data for the past hour + all the other hours in the day before that, and the 24th run of the day would be a segment containing all of your data. You just have to be sure that you retain the data for all the previous hours. Also you’d want to have some segment killing tasks set up otherwise you’ll be using an excessive amount of deep storage.

Are you talking about the hadoop indexing delta ingestion in the “multi” input spec? For the hours to get appended on each other instead of overshadowing within the DAY segment? I think that this InputSpec only supports type “static” which is kind of inconvenient.

  • Having said that, I’m not totally clear about your setup, but probably one of the following is what you actually want to do:
    • have a realtime ingestion pipeline that generates HOUR segments combined with a batch ingestion job that takes those segments and merges them together into DAY granularity every 24 hours or so. Your batch ingestion job can generate the DAY segment either from the raw data that was fed into the realtime indexers or by reading the segments generated by the realtime indexers and re-indexing them with a different schema (i.e. DAY segment granularity).

Reading raw data twice would be very inefficient. Do you mean the Append Task for doing the latter? Is it possible to have 2 data sources : HOUR & DAY and merge segments from HOUR datasource to the DAY datasource? That would be nifty.

  • or, if you’re getting bursts of data from your pipeline every hour instead of a continuous stream, setting up a realtime pipeline may be overkill. In that case, it’d probably make sense to submit hourly ingestion tasks with HOUR segment granularity for the past hour of data, and then once per day run another ingestion task to generate a segment with DAY granularity that again can either source the input from the original data fed into Druid or from the completed segments generated by the hourly ingestion tasks. This aggregated DAY segment will overshadow the previous HOUR segments and will be used when responding to queries. Again, Druid has no problem running queries with segments of different granularities.

Yeah, bursts of data, it’s a micro batching pipeline with ETL components that “ticks” every hour. Realtime indexing is overkill.

This would actually be perfect if I could access both kinds of segments, DAY and HOUR too, but I guess this cannot be done between 2 data sources right?

One think I’m not sure about, how do you create DAY segment from existing HOUR segments without touching the raw input data again? I know of Append and Merge tasks but that is not it I guess.

Thanks a million times for helping, I’m almost there :slight_smile:

Probably the best source of documentation on this is this page: http://druid.io/docs/latest/ingestion/update-existing-data.html
As far as I know, the multi inputSpec should be able to handle all inputSpec types (granularity, static, dataSource, etc.) - the only limitation I know of is it can only contain a single dataSource inputSpec. This would allow you to merge new raw data into an existing set of segments.

Regarding creating DAY segments from HOUR segments, you can run another batch ingestion job and use the dataSource inputSpec directly. You can choose to have this generate segments in the same dataSource (which will create a new version that will overshadow the HOUR segments) or if you’d like, have it generate segments in a new dataSource. You would choose this by setting the dataSource field in dataSchema accordingly, just like with any other ingestion spec.

The append and merge tasks have some limitations in the types of shardSpecs they can work with (they can only merge single partition segments together into another single partition segment) and they don’t run in a distributed fashion the way the Hadoop indexing tasks do, so I would look at the multi and dataSource inputSpecs first.

Perfect David, I didn’t know about the “dataSource” InputSpec, so I will set up 2 datasources: hour & day, leave the “hour” dataSource be part of my pipeline and schedule a daily job that will mirror it to the “day” datasource. Only the “day” dataSource will be queried by client apps.

You saved my day, I was going to be hopeless without your help !

Sounds good! Happy to help.

Hi David,

thanks again one more time, it worked out !!!

Just for future reference if anybody was trying that on s3 deep storage. “dataSource” InputSpec on s3 storage eventually fails on missing AWS credentials, that are not stored in the descriptor.json files.

So one needs to supply it through “jobProperties” :

"tuningConfig" : {
  "type" : "hadoop",
  "jobProperties" : {

    "fs.s3.awsAccessKeyId" : "foo",
    "fs.s3n.awsAccessKeyId" : "foo",
    "fs.s3.awsSecretAccessKey" : "bar",
    "fs.s3n.awsSecretAccessKey" : "bar"
  }

``

I was supplying credentials directly to the InputSpec path so it took me a while before I figured out.