What does Druid use the druid_datasource metadata table for?

We use Druid to ingest Kafka data.
I have a few basic questions:

  1. I found a druid_datasource metadata table (hosted with postgres in our case). I see a bunch of Kafka offsets stored in there.
    My question is what does Druid use this table and the offsets stored inside it for?
  2. Where does Druid get its Kafka offsets from? Does it get them from Kafka? Or does it maintain its own offsets?
  3. What happens if I truncate the druid_datasource table?
  4. When I reset the supervisor (useEarliestOffset=false), how does Druid know where to resume reading Kafka from? (kind of the same question as #2 I guess…)

Problem we’re trying to solve
We had our Kafka+Druid hosted in AWS. We’re migrating it to Google. Problem is, there is no way for us to reliably migrate the Kafka offsets in AWS as is to Google. We have to do some translation magic. But because of that, Druid is breaking because it’s looking for old offsets. We’re trying to figure out how to get Druid to pickup the translated offsets but we can’t tell whether Druid gets offsets from Kafka (consumer groups) or uses the ones it stores in the druid_datasource metadata table.

Relates to Apache Druid 0.18.8

Thanks!

Hey Prayas: Druid stores offsets following the usual consumer pattern, updating them when it knows that it has safely ingested and started advertising the data from Kafka. As regards resets, there are a number of methods and you can see them documented here – things like resetOffsetAutomatically for example.

Not sure if you intended to respond to me, because the user you’re addressing is different.

Druid stores offsets following the usual consumer pattern

My question is where does it store the offsets? druid_datasource table or in Kafka?
Thanks

Hey! Where the heck did I find the name “Prayas” ??!?! Sorry! :slight_smile:

No it’s not stored in Kafka.

1 Like

No worries, and thanks for your answers!

Okay, I just checked my druid_datasource table and it’s empty. My druid is ingesting from kafka. Where is it getting offset info from?
For example, I tried the following steps and druid is able to remember the latest offset somehow:

  1. Publish 20 items to kafka
  2. Wait for druid to ingest.
  3. Check druid supervisor status:
"activeTasks": [
            {
                "id": "index_kafka_fabric-test_e45c178eef5cef6_lcmjjeji",
                "startingOffsets": {
                    "0": 20
                },
                "startTime": "2021-12-07T18:25:45.051Z",
                "remainingSeconds": 3415,
                "type": "ACTIVE",
                "currentOffsets": {
                    "0": 20
                },
                "lag": {
                    "0": 0
                }
            }
        ],
  1. Reset supervisor
  2. Check status of supervisor:
"activeTasks": [
            {
                "id": "index_kafka_fabric-test_e45c178eef5cef6_lcmjjeji",
                "startingOffsets": {
                    "0": 20
                },
                "startTime": "2021-12-07T18:25:45.051Z",
                "remainingSeconds": 3415,
                "type": "ACTIVE",
                "currentOffsets": {
                    "0": 20
                },
                "lag": {
                    "0": 0
                }
            }
        ],

I know that Druid knows it has to use latest offset based on the useEarliestOffset flag. But how does it know WHAT the latest offset is if druid_datasource is empty?

select * from druid_datasource;
> (0 rows)

Just found this post in search maybe this will help a little?

There is also https://groups.google.com/g/druid-user/c/WEfJLxrYjI4/m/wodukk2SBAAJ

Also, do note that the offset is only updated once the task has succeeded – this is for the exactly once ingestion guarantee.

This may be interesting to you as well:

Thanks Peter, this helps and clarifies much of the confusion.

I have one (hopefully) last question - If druid_datasource is empty and useEarliestOffset is false, how does Druid decide what the “latest offset” is?
Thanks

Edit: also, the Kafka ingestion docs are useful. But it doesn’t specify anything about the druid_datasource table and how it manages offsets :man_shrugging:

My understanding (could be wrong) is that useEarliestOffset means use the earliest record found on the kafka side. Setting it false means to use the most recent record it can find. Ie, start from the beginning (or end) of what you can find in kafka now. If you mean where exactly it gets that from kafka (records or some kind of kafka metadata store), I’m not sure. Maybe that’s what you’re asking though…

Figured this one out.

  • Druid doesn’t use Kafka for finding the offsets. It uses the metadata tables for keeping track of Kafka offsets.
  • In the absence of Kafka, it checks useEarliestOffset flag. False results in getting the latest offset from Kafka. True results in getting the earliest offset from Kafka (0?)
  • For our problem of duplicate ingestion, we used a combination of rollup & useEarliestOffset=true