How to make dsql discover new kafka topics as tables?

Hi team,

So far i’ve setup kafka/druid on my system and have been able to inject new data to the ‘wikipedia’ topic via kafka-viewer and the results are instantly available via the dsql query.

however, new topics created aren’t visible in dsql and i only see ‘wikipedia’ table.

\d

┌──────────────┬────────────┐

│ TABLE_SCHEMA │ TABLE_NAME │

├──────────────┼────────────┤

│ druid │ wikipedia │

└──────────────┴────────────┘

How can i make dsql discover new topics added to the broker? Am i missing any step needed to make the topic be available as a ‘table’ in dsql?

command used to create new topic:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkafka

command used to create wikipedia topic which is visible in dsql:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

Any suggestion please?

Warm Regards,

Raghu

Hi Raghu,

Have you been using quickstart-tutorial of druid?

For Druid to have a table available from Kafka topic, you need to submit a supervisor task of type ‘kafka’.

Once submitted, this task will spawn workers on Middle Manager nodes to read from Kafka topic specified in supervisor task specification & create the datasource(table).

Please refer below URL for further details:

http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html
Please Note: Reason why you are able to query wikipedia table from dsql terminal is because the quickstart-tutorial contains a specification for reading from kafka topic ‘wikipedia’ & store the data in druid datasource ‘wikipedia’. Also, dsql is just an interface to query datasources in Druid & there are other ways available as well to query Druid.

Please let me know if you any further doubts. I’ll more than happy to help.

Happy Druiding!

Thanks Divya,

I still am not able to see the topic appearing under dsql tables. See below steps I’ve done -

Files under quickstart/tutorial for the new topic called ‘testkafka’

  1. testkafka-index.json

  2. testkafka-kafka-supervisor.json

The overlord is on 8090, bootstrap is on 9092:

After running the below command i can see it appearing under ‘Supervisors’ on the Druid Overlord Console but not on dsql

curl -X POST -H ‘Content-Type: application/json’ -d @quickstart/tutorial/testkafka-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor

{“id”:“testkafka”}

File contents:

cat testkafka-index.json

{

“type”: “index”,

“spec”: {

“dataSchema”: {

“dataSource”: “testkafka”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“dimensionsSpec”: {

“dimensions”: [

“raw_string”

]

},

“timestampSpec”: {

“column”: “time”,

“format”: “iso”

}

}

},

“metricsSpec”: ,

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “day”,

“queryGranularity”: “none”,

“intervals”: [

“2015-09-12/2015-09-13”

],

“rollup”: false

}

},

“ioConfig”: {

“type”: “index”,

“firehose”: {

“type”: “local”,

“baseDir”: “quickstart/tutorial/”,

“filter”: “”

},

“appendToExisting”: false

},

“tuningConfig”: {

“type”: “index”,

“maxRowsPerSegment”: 5000000,

“maxRowsInMemory”: 25000,

“forceExtendableShardSpecs”: true

}

}

}

> cat testkafka-kafka-supervisor.json

{

“type”: “kafka”,

“dataSchema”: {

“dataSource”: “testkafka”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “time”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: [

“raw_string”

]

}

}

},

“metricsSpec”: ,

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “DAY”,

“queryGranularity”: “NONE”,

“rollup”: false

}

},

“tuningConfig”: {

“type”: “kafka”,

“reportParseExceptions”: false

},

“ioConfig”: {

“topic”: “testkafka”,

“replicas”: 2,

“taskDuration”: “PT10M”,

“completionTimeout”: “PT20M”,

“consumerProperties”: {

“bootstrap.servers”: “localhost:9092”

}

}

}

Regards

Raghu

Raghu,

The process where datasources or segments are available for querying will require them to be committed, meaning all segments are in deep storage and published to historicals. Check the logs from overlord and coordinator and that should tell you what’s going wrong. You can also check if the new topic(new data source) has segments in deep storage. It could be a number of things - running out of space in either deep storage (if using local) or historicals, broken link between deep storage and historicals, MiddleManager is having issues, etc.

Rommel Garcia

thanks Rommel,

it appears i needed to run the ‘ingestion task spec’ to be able to create the testkafka datasource which i did using the below command -

“bin/post-index-task --file quickstart/tutorial/testkafka-index.json”

now the above step seem to be failing due to “java.lang.NullPointerException: parseSpec requires dimensionSpec” error which I can’t seem to figure out the issue here. I’ve both parseSpec and dimensionSpec provided in the index file.

Any clue as to if anything is wrong in the below index file?

cat testkafka-index.json

{

“type”: “index”,

“spec”: {

“dataSchema”: {

“dataSource”: “testkafka”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “iso”

},

“dimensionSpec”: {

“dimensions”: [

“page”

]

}

}

},

“metricsSpec”: ,

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “day”,

“queryGranularity”: “none”,

“intervals”: [

“2015-09-12/2015-09-13”

],

“rollup”: false

}

},

“ioConfig”: {

“type”: “index”,

“firehose”: {

“type”: “local”,

“baseDir”: “quickstart/tutorial/”,

“filter”: “testkafka-sampled.json”

},

“appendToExisting”: false

},

“tuningConfig”: {

“type”: “index”,

“maxRowsPerSegment”: 5000000,

“maxRowsInMemory”: 25000,

“forceExtendableShardSpecs”: true

}

}

}

> cat testkafka-sampled.json

{“timestamp”:“2013-08-31T01:02:33Z”,“page”:“Gypsy Danger”}

Regards,

Raghu

Is your JSON ingest spec properly formatted? Sometimes the curly braces screws up parameters if they don’t line up.

Rommel Garcia

thats again Rommel, it appears there might’ve been a space between the tag and the colon in the json file causing the failure.

Anyway, after correcting it I’m able to push the data to the datasource “testkafka” now using

curl -X ‘POST’ -H ‘Content-Type:application/json’ -d @quickstart/tutorial/testkafka-index.json http://localhost:8090/druid/indexer/v1/task

which is now also queryable via dsql.

but i can’t see the topic data into dsql table. How do i make any data,injected to the topic, available via dsql? Note that this is working for the ‘wikipedia’ topic where any new data i add to the topic gets immediately available via dsql but not working for my"testkafka" topic.

Regards,

Raghu

Hi Rommel,

The topics i’m creating are still not visible while searching from dsql. i’ve recreated the below steps for a new topic and i still can’t see the topic (infotopic) appearing as one of the datasource within dsql. could you advise if i missing anything here?

This poc would help me deploy druid in my org, the idea is to replace MQs with kafka and ingest data using druid.

Create topic: