Kafka indexing service support in druid 0.13.0-incubating

Hello,

I have been trying the Kafka indexing service to consume events from Kafka stream. Does 0.13.0-incubating support this feature?

Druid version: druid 0.13.0-incubating

Kafka version: kafka_2.12-2.4.0

I submitted following spec to druid overlord:

{

“type”: “kafka”,

“dataSchema”: {

“dataSource”: “split-reports”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: [“value”],

“dimensionExclusions”: [

“timestamp”

]

},

“metricsSpec”: [

{

“name”: “views”,

“type”: “count”

}

],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “minute”,

“queryGranularity”: “NONE”

}

},

“tuningConfig”: {

“type”: “kafka”,

“maxRowsPerSegment”: 5000000

},

“ioConfig”: {

“topic”: “split-reports-events”,

“inputFormat”: {

“type”: “json”

},

“consumerProperties”: {

“bootstrap.servers”: “localhost:9092”

},

“taskCount”: 1,

“replicas”: 1,

“taskDuration”: “PT1M”

}

}

``

I get the following response:
{

“id”: “split-reports”

}

``

I can see the supervisor running on the overlord console

However when i send an event to the kafka topic i don’t see any tasks getting created for this supervisor.

I also have druid 0.17 setup and the exact same spec works fine when submitted from the management UI.

Are there any additional steps required in 0.13.0-incubating to get kafka indexing service to work? Any leads would be helpful.

Thanks,

Prathamesh

Druid 0.13 supports kafka indexing , Here is the doc for Druid 0.13:
http://druid.apache.org/docs/0.13.0-incubating/development/extensions-core/kafka-ingestion.html

Please see the overlord log if there is some errors/issues.

Thanks and Regards,

Vaibhav

Hi Vaibhav,

I was wondering why same indexing specification works for 0.17 and fails for 0.13.0-incubating.

I can see following error in the overlord log:

2020-02-27T09:50:10,185 INFO [KafkaSupervisor-split-reports] org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor - [split-reports] supervisor is running.

2020-02-27T09:50:10,188 INFO [KafkaSupervisor-split-reports] org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor - Creating new task group [0] for partitions [0]

2020-02-27T09:50:10,315 INFO [KafkaSupervisor-split-reports] org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor - Number of tasks [0] does not match configured numReplicas [1] in task group [0], creating more tasks

2020-02-27T09:50:10,322 WARN [KafkaSupervisor-split-reports] org.apache.druid.segment.indexing.DataSchema - No parser has been specified

2020-02-27T09:50:10,335 ERROR [KafkaSupervisor-split-reports] org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor - KafkaSupervisor[split-reports] failed to handle notice: {class=org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor, exceptionType=class java.lang.NullPointerException, exceptionMessage=parser, noticeClass=RunNotice}

java.lang.NullPointerException: parser

at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229) ~[guava-16.0.1.jar:?]

at org.apache.druid.indexing.kafka.KafkaIndexTask.(KafkaIndexTask.java:126) ~[?:?]

at org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.createKafkaTasksForGroup(KafkaSupervisor.java:1960) ~[?:?]

at org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.createNewTasks(KafkaSupervisor.java:1902) ~[?:?]

at org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.runInternal(KafkaSupervisor.java:882) ~[?:?]

at org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor$RunNotice.handle(KafkaSupervisor.java:582) ~[?:?]

at org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.lambda$tryInit$5(KafkaSupervisor.java:958) ~[?:?]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_201]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]

``

I am not able to figure out where in the specification the parser is to be specified. It works for 0.17 without this “parser” specification. Has the way the specification is to be written changed between the 2 versions?

Thanks,

Prathamesh

Hi Vaibhav,

It appears that the newer version can do without the parser specification and nested objects inside the parser specification has become top level elements in the json. I’ll try with the older specification in the link you have shared.

Thanks,

Prathamesh

Please try the spec as suggested in Druid 0.13 doc.
It’s not good idea to use druid 0.17 spec in lower version of druid or vice versa

Thanks

Vaibhav

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "product",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "time",
          "format": "auto"
        },
          "dimensionsSpec" : {
            "dimensions": [
              { "name" : "time", "type" : "string"},
              { "name" : "productId", "type" : "string" },
              { "name" : "productName", "type" : "string" },
              { "name" : "productDesc", "type" : "string" },
              { "name" : "createdBy", "type" : "string" },
              { "name" : "modelYear", "type" : "string" }
            ]
          }
        }
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "NONE",
        "rollup" : false
      }
    },
    "ioConfig": {
      "type":"kafka",
      "topic": "product",
      "consumerProperties": {
        "bootstrap.servers": "192.168.225.60:9092"
      },
      "taskDuration": "PT1H",
      "useEarliestOffset": "true"
    },
    "tuningConfig" : {
      "type" : "kafka",
      "maxRowsPerSegment" : 500000
    }
  }

Check using this spec, make changes for dimensions, data source and topic as per your requirements.

Make sure you specify proper IP address pf machine instead of localhost.

Hopefully with these changes your issue would be resolved.