Using realtime node reading data from kafka doesn't work

Hi, I want to read data from Kafka at real-time using realtime node. While, it seems doesn’t work.

I start up all the process in a single machine. Including zookeeper, coordinator, overlord, historical, broker, middlemanager and realtime node. The comamnds I use was as list:

  1. ./bin/zkServer.sh start

  2. nohup java cat conf-quickstart/druid/coordinator/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/coordinator:lib/*” io.druid.cli.Main server coordinator >> coordinator.log 2>&1 &

  3. nohup java cat conf-quickstart/druid/overlord/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/overlord:lib/*” io.druid.cli.Main server overlord >> overlord.log 2>&1 &

  4. nohup java cat conf-quickstart/druid/historical/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/historical:lib/*” io.druid.cli.Main server historical >> historical.log 2>&1 &

  5. nohup java cat conf-quickstart/druid/broker/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/broker:lib/*” io.druid.cli.Main server broker >> broker.log 2>&1 &

  6. nohup java cat conf-quickstart/druid/middleManager/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/middleManager:lib/*” io.druid.cli.Main server middleManager >> middlemanager.log 2>&1 &

  7. nohup java cat conf-quickstart/druid/realtime/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/realtime:lib/*” io.druid.cli.Main server realtime >> realtime.log 2>&1 &

The topic metrics in Kafka has been created already.

Then I startup a Kafka console producer, and send one message. From the Kafka console consumer I can see the message which I send. That means Kafka is working fine. The I query Druid using HTTP POST method, while, there is no data reply. Something must be wrong.

The test mesage I send was:

{“unit”:
“milliseconds”, “http_method”: “GET”,
“value”: 79, “timestamp”: “2016-08-25T10:19:18Z”,
“http_code”: “200”, “page”: “/”,
“metricType”: “request/latency”, “server”:
www4.example.com”}

The query command:

curl -L -H’Content-Type: application/json’ -XPOST --data-binary @quickstart/my_query.json http://localhost:8082/druid/v2/?pretty

and response is “”.

The query json I use was:

{

“queryType”: “search”,

“dataSource”:
“metrics”,

“granularity”: “day”,

“searchDimensions”:
[“http_method”],

“query”: {

“type”: “insensitive_contains”,

“value”: “GET”

},

“intervals”: [

“2016-08-24/2016-08-26”

]

}

And the real time node sepc file context was:

[
{
“dataSchema”: {
“dataSource”: “metrics”, “parser”: {
“type”: “string”, “parseSpec”: {
“format”: “json”, “timestampSpec”: {
“column”: “timestamp”, “format”: “auto”
}, “dimensionsSpec”: {
“dimensions”: , “dimensionExclusions”: [ “timestamp”, “value”], “spatialDimensions”:
}
}
}, “metricsSpec”: [
{
“type”: “count”, “name”: “count”
}, {
“type”: “doubleSum”, “name”: “value_sum”, “fieldName”: “value_sum”
}, {
“type”: “doubleMin”, “name”: “value_min”, “fieldName”: “value”
}, {
“type”: “doubleMax”, “name”: “value_max”, “fieldName”: “value”
}
], “granularitySpec”: {
“type”: “uniform”, “segmentGranularity”: “DAY”, “queryGranularity”: “NONE”
}
}, “ioConfig”: {
“type”: “realtime”, “firehose”: {
“type”: “kafka-0.8”, “consumerProps”: {
“zookeeper.connect”: “my-kafka-zookeeper_ip_port_list”, “zookeeper.connection.timeout.ms”: “15000”, “zookeeper.session.timeout.ms”: “15000”, “zookeeper.sync.time.ms”: “5000”, “group.id”: “druid-metrics”, “fetch.message.max.bytes”: “1048586”, “auto.offset.reset”: “largest”, “auto.commit.enable”: “false”
}, “feed”: “metrics”
}, “plumber”: {
“type”: “realtime”
}
}, “tuningConfig”: {
“type”: “realtime”, “maxRowsInMemory”: 1, “intermediatePersistPeriod”: “PT10m”, “windowPeriod”: “PT10m”, “basePersistDirectory”: “/tmp/realtime/basePersist”, “rejectionPolicy”: {
“type”: “serverTime”
}
}
}
]

And the real time node runtime.properties was:

druid.service=druid/realtime
druid.port=8084
druid.processing.numThreads=1
druid.processing.buffer.sizeBytes=204800
druid.realtime.specFile=/home/wangyufeng/druid/druid-0.9.1.1/conf-quickstart/druid/realtime/realtimeSpcFile.json

And the real time node jvm conf file was:

-server
-Xms1g
-Xmx1g
-XX:MaxDirectMemorySize=1280m
-Duser.timezone=UTC+8
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

I also attach the process log if anyone needed.

Am I missing something or any configuration is wrong? And if I use real time node reading data from Kafka, is there any need to start the middlemanager process?

Thanks.

Yufeng Wang

realtime.log (751 KB)

Hi,

Without digging too much into this, we’ve started deprecating real-time nodes. With Druid 0.9.1.1, we’ve introduced a new exactly once ingestion mechanism for Kafka.

If you want, there is an online tutorial: https://imply.io/docs/latest/tutorial-kafka

Also, please see: http://druid.io/docs/0.9.1.1/ingestion/faq.html

Thank you Fangjin. It finally works!!

While, maybe there is a little mistake in quickstart/wikiticker-top-pages.json. The dataSource should be “wikiticker-kafka” not “wikiticker” :slight_smile:

在 2016年8月26日星期五 UTC+8上午6:12:11,Fangjin Yang写道: