Best approach to sharding/partitioning data by a dimension?

Hey all,

I would like to know what your opinions are on the best means of sharding or partitioning the data based on a dimension. Our concern is that, while most queries will be over a few days of data, some queries we need to support with fast response time will be over years of data. These queries will, however, be constrained to a single value of a dimension (“bundle_id”). We are looking at ways that we could split the data by bundle_id so that queries will be over a much smaller data set. The two reasonable approaches for this seem to be:

  • Defining the dataSource as “dataSource_<bundle_id>” or perhaps “dataSource_<hash(bundle_id) % buckets>”. This would of course create many dataSources, and feels like a bit of a hack. As such it raises the concerns that Druid is not set up to be used in this fashion. Is this the case, or is this actually a reasonable approach?
  • Implementing our own shardSpec that defines the shards by a hash of the bundle_id. I see that there is already defined a HashBasedNumberedShardSpec that does something similar. The difference is that this one uses a hash of the entire row, whereas we would like to just hash the bundle_id. Same question as above - is this a reasonable approach?
    What are your thoughts on these, and are we overlooking something more straightforward?

Thanks in advance, and of course thank you for all the work you all have put into Druid.

Ryan Coonan

Hey Ryan,

Are you using batch indexing or realtime indexing?

With batch indexing, you can use single-dimension partitioning to partition your data by bundle_id (see “Single-dimension partitioning” on http://druid.io/docs/latest/ingestion/batch-ingestion.html). Druid always partitions by time first, but the secondary partition within each time bucket will be bundle_id.

With realtime indexing, you have a couple of options.

  1. Partition on bundle_id upfront- you’d do this by tweaking the stream you send to Druid. If you’re using Kafka then you can have your Kafka producer partition your topic by a hash of bundle_id. If you’re using Tranquility then you can define a custom Partitioner.

  2. Reindex your older data periodically (see “dataSource” input spec on http://druid.io/docs/latest/ingestion/batch-ingestion.html). You can use this in concert with single-dimension partitioning to repartition your data after initial indexing.

Thanks for the reply Gian. We are primarily concerned with realtime ingestion, as the batch ingestion partitioning seems more straightforward.

  1. Partition on bundle_id upfront- you’d do this by tweaking the stream you send to Druid. If you’re using Kafka then you can have your Kafka producer partition your topic by a hash of bundle_id. If you’re using Tranquility then you can define a custom Partitioner.

I don’t think I am understanding what you mean here correctly. As far as what it would look like once it got to Druid, what would this mean? Is this equivalent to the first approach I mentioned in my original post? We would have some number of hashed buckets by bundle_id, then in Druid it would ultimately mean us putting data into different data sources like actions_0, actions_1, actions_2, etc. by bundle_id hash?

The periodic reindexing approach does sound like a good option, too.

Thanks again,

Ryan Coonan

Hey Ryan,

I mean that if you have (let’s say) 3 partitions of your Druid datasource, then you can set things up such that rows for the same bundle always go to the same Druid partition. With that you’ll get better rollup and better locality. They would still be all in the same datasource though.