Druid Kafka Batch Ingestion

Is there a way to do Kafka batch ingestion in Druid staring form a specific offset or partition for a topic? All I see right now is real time ingestion from Kafka.

The new Kafka indexing service (https://imply.io/docs/latest/tutorial-kafka-indexing-service.html) can ingest historical data. If you set “useEarliestOffset” : false then it will read from the beginning of the topic and ingest everything.

Hello Gian, Thank you so much for the quick reply.

I used this as my kafka indexing spec. But I am currently getting this error: “error”:"Could not resolve type id ‘kafka’ into a subtype of [simple type, class io.druid.indexing.overlord.supervisor.SupervisorSpec]. Do you know the cause for this error? Do I need to add some dependency in the druid.extensions.loadList in the common.runtime file?

Also do you know if I can put PT5M in the segmentGranularity for the granularitySpec? Or does it have to be “HOUR”, “MINUTE”, “SECONDS”, etc.

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "pageviews-kafka",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "time",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": ["url", "user"]
        }
      }
    },
    "metricsSpec": [
      {"name": "views", "type": "count"},
      {"name": "latencyMs", "fieldName": "latencyMs", "type": "doubleSum"}
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "PT5M",
      "queryGranularity": "NONE"
    }
  },
  "ioConfig": {
    "topic": "pageviews",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "useEarliestOffset" : "false",
    "taskCount": 1,
"replicas": 1,

"taskDuration": "PT5M"
}
}

You do need to include the druid-kafka-indexing-service extension. And yeah the segmentGranularity does need to be one of the predefined options.

So this is how my extension.loadList looks like in the common.runtime.properties file:

druid.extensions.loadList=[“druid-hdfs-storage”, “mysql-metadata-storage”, “druid-kafka-eight”, “druid-kafka-indexing-service”]

I am still getting the same error even when I included the “druid-kafka-indexing-service” extension?