[druid-user] Ingestion to different data source

Hi,

I want to ingest the data to different data source using Kafka ingestion based on dimension.

E.g., data from each city should go to different data source(i.e., city specific data source)

{ “timestamp”: “1646935998”, “city”: “Boston”, “temperature”:48.3}
{ “timestamp”: “1646935998”, “city”: "Austin ", “temperature”:68.3}

Is there a way to ingest data based on dimension(‘city’) to it’s specific data source ‘temperature**_boston**’ and ‘temperature**_austin**’ respectively?

Thanks in advance!

Regards,
William

Hey William!

There is a one-to-one relationship between a Kafka ingestion supervisor and a Druid table datasource.

To ingest to multiple tables, you would need multiple supervisors. Then you can apply a filter condition to each supervisor (it goes inside the “transformSpec”) according to which rows should end up in each table.

An alternative would be to bifurcate upstream in Kafka, with two topics. Then each supervisor would listen only to the topic in question. That would be helpful if you and your team would rather maintain the filtering logic outside of the ingestion specifications in Druid.

You also get to scale the ingestion for each one of your topics independently with the second approach: say it’s a 60/40 split – you could have a 60/40 split in your resources on Druid, too. In the other approach, using filtering, the ingestion on each supervisor would have to be scaled to be able to handle 100% of rows from the stream.

Hope that’s helpful!

Hi,

Thanks Peter for sharing different approach to handle it.

With both approach, we need to have multiple supervisors which would increase when data arriving from new cities. This might need more resource in the Middle Manager as and when new supervisors are added based on taskCount in each supervisor.

I just want to open up bit on batch injection(Hadoop based). May write a file to HDFS which contains data for one city and run one-time ingestion task for every file. Would the ingestion speed and resource required for task similar to Kafka ingestion task?

Thanks in advance!

Regards,
William

Hi William,

Just to clarify:

  1. You’re thinking about writing out a file for each city to HDFS;
  2. Running a one-time/batch ingestion from HDFS for each city;
  3. And are wondering how the ingestion speed and required resources for multiple batch ingestions would compare to a Kafka ingestion with multiple supervisors or multiple topics?

I came across this discussion about improving ingestion performance which might be of some limited help. Within that discussion, there’s a link to Basic cluster tuning. Finally, I’m including this video about Apache Druid Real-Time Ingestion Challenges And Best Practices.

Best,

Mark

Hey William,
Coming at this from another angle. Why are you trying to create a distinct datasource for each city?
Perhaps you can achieve the same goal by using secondary partitioning on city.
If you haven’t already read this, I think you’ll find it useful:

Let us know how it goes,
Sergio

Hi,

Thanks Mark and Serigo for sharing different approaches and materials.

I will clarify the use case,

  1. Dimensions could be multiple between 1-25 and it could differ for each city
  2. Metric could be multiple and the metric names could differ for each city
  3. Volume of data(rows) per hour could be ~2 Billion consisting of data arriving from multiple cities. And, It needs to be stored for a week time minimum.
  4. Query can span across time range between 2 to 7 days to fetch data based on a few dimensions.
  5. Retention of data older than X days based on city (Retention rules per city)

Would it be possible to achieve the above use cases(mainly 3, 4 & 5) with a single data source? Aren’t queries slower?

I’m thinking of having a distinct data source(to have less data) for each city for faster query. And, in a way this helps to address the use case 5 as well.
Hence, I’m checking which ingestion would be better (stream / batch) to address the use cases.

Hope this clarifies, Thanks in advance!

Regards,
William

The different dimensions/metrics may be your limiting factor. If you could get dimensions and metrics to be the same even if some values are NULL. You should be able to do this with a single datasource.
By using secondary partitioning on City and filtering the queries by City, performance should be similar to separate data sources since it will use pruning to only read the segment partitions for the corresponding city.
At a minimum it is worth a test.

Let us know how it goes,

Sergio