Druid realtime is not working

Hi,

I am having an issue with druid realtime sad to say. I was working not now not after a kafka and zk rebuild. I am 2 partitions yet druid is not doing what is supposed to do which is log data and i dont know how do debug.

Here is the realtime spec file. I am using druid 0.8.1

Below is an endless output in the log files. So…what is going on? I write to kafka…and i get nothing now when I query. I restarted everything.

Thanks

-01-07T15:08:00,227 INFO [datasource-topic-impression-2015-12-31T05:24:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-31T05:24:00.000Z/2015-12-31T05:25:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,227
INFO [datasource-topic-impression-2016-01-04T03:52:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2016-01-04T03:52:00.000Z/2016-01-04T03:53:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,227
INFO [datasource-topic-impression-2015-12-24T07:10:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-24T07:10:00.000Z/2015-12-24T07:11:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,228
INFO [datasource-topic-impression-2015-12-31T22:28:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-31T22:28:00.000Z/2015-12-31T22:29:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,228
INFO [datasource-topic-impression-2015-12-19T00:16:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-19T00:16:00.000Z/2015-12-19T00:17:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,228
INFO [datasource-topic-impression-2015-12-21T14:51:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-21T14:51:00.000Z/2015-12-21T14:52:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,229 INFO

{
“dataSchema” : {
“dataSource” : “datasource-topic-impression”,
“parser” : {
“type” : “string”,
“parseSpec” : {
“format” : “json”,
“timestampSpec” : {
“column”: “utcdt”,
“format”: “iso”
},
“dimensionsSpec” : {
“dimensions”: ,
“dimensionExclusions” : ,
“spatialDimensions” :
}
}
},
“metricsSpec” : [{
“type” : “count”,
“name” : “pixel”
}],
“granularitySpec” : {
“type” : “uniform”,
“segmentGranularity” : “minute”,
“queryGranularity” : “NONE”
}
},
“ioConfig” : {
“type” : “realtime”,
“firehose”: {
“type”: “kafka-0.8”,
“consumerProps”: {
“zookeeper.connect”: “<%=@zookeeper%>”,
“zookeeper.connection.timeout.ms” : “15000”,
“zookeeper.session.timeout.ms” : “15000”,
“zookeeper.sync.time.ms” : “5000”,
“group.id”: “topic-impression-<%=@environment%>”,
“fetch.message.max.bytes” : “1048586”,
“auto.offset.reset”: “largest”,
“auto.commit.enable”: “false”
},
“feed”: “topic-pixel-<%=@environment%>”
},
“plumber”: {
“type”: “realtime”
}
},
“tuningConfig”: {
“shardSpec”: {
“type”: “linear”,
“partitionNum”: <%=@partitionNum%>
},
“type” : “realtime”,
“maxRowsInMemory”: 500000,
“intermediatePersistPeriod”: “PT10m”,
“windowPeriod”: “PT10m”,
“basePersistDirectory”: “/tmp/realtime/basePersist”,
“rejectionPolicy”: {
“type”: “serverTime”
}
}
}

-01-07T15:08:00,227 INFO [datasource-topic-impression-2015-12-31T05:24:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-31T05:24:00.000Z/2015-12-31T05:25:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,227 INFO [datasource-topic-impression-2016-01-04T03:52:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2016-01-04T03:52:00.000Z/2016-01-04T03:53:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,227 INFO [datasource-topic-impression-2015-12-24T07:10:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-24T07:10:00.000Z/2015-12-24T07:11:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,228 INFO [datasource-topic-impression-2015-12-31T22:28:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-31T22:28:00.000Z/2015-12-31T22:29:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,228 INFO [datasource-topic-impression-2015-12-19T00:16:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-19T00:16:00.000Z/2015-12-19T00:17:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,228 INFO [datasource-topic-impression-2015-12-21T14:51:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-21T14:51:00.000Z/2015-12-21T14:52:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,229 INFO [datasource-topic-impression-2015-12-24T12:26:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-24T12:26:00.000Z/2015-12-24T12:27:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,229 INFO [datasource-topic-impression-2015-12-22T19:53:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-22T19:53:00.000Z/2015-12-22T19:54:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,230 INFO [datasource-topic-impression-2016-01-05T07:35:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2016-01-05T07:35:00.000Z/2016-01-05T07:36:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,230 INFO [datasource-topic-impression-2015-12-23T08:21:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-23T08:21:00.000Z/2015-12-23T08:22:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,231 INFO [datasource-topic-impression-2015-12-22T15:47:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-22T15:47:00.000Z/2015-12-22T15:48:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,231 INFO [datasource-topic-impression-2015-12-21T06:10:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-21T06:10:00.000Z/2015-12-21T06:11:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,231 INFO [datasource-topic-impression-2015-12-26T11:52:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-26T11:52:00.000Z/2015-12-26T11:53:00.000Z, schema=io.druid.segment.indexing.DataSchema@1e11c391}]
2016-01-07T15:08:00,232 INFO [datasource-topic-impression-2015-12-22T12:57:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Already pushed sink[Sink{interval=2015-12-22T12:57:00.000Z/2015-12-22T12:58:00.0

PS I see this in the logs How do I turn on?

this is how I start

/usr/bin/java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=/var/realtime.spec -classpath
/var/druid-0.8.1/lib/*:/var/druid-0.8.1/config/realtime io.druid.cli.Main server realtime

Heres my conf

druid.host=192.xxx.xxx:8082
druid.service=realtime
druid.port=8082

druid.zk.service.host=2.zk.do.production.sf.f:2181,1.zk.do.production.sf.:2181,3.zk.do.production.sf:2181

druid.extensions.coordinates=[“io.druid.extensions:druid-kafka-eight:0.8.1”,“io.druid.extensions:mysql-metadata-storage:0.8.1”,"io.druid.extensions:druid-s3-ext$

These configs are only required for real hand off

druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://mysql.do.production.sf:3306/druid
druid.metadata.storage.connector.user=r
druid.metadata.storage.connector.password=

druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1

druid.monitoring.monitors=[“io.druid.segment.realtime.RealtimeMetricsMonitor”]

#druid.storage.type=s3
#druid.storage.storageDirectory=/tmp/realtime
druid.s3.secretKey=
druid.s3.accessKey=
druid.storage.bucket=adfadf
druid.storage.baseKey=production-realtime/v1
druid.storage.disableAcl=true

druid.realtime.specFile=/var/realtime.spec

2016-01-07T16:50:36,712 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [2] Unparseable events! Turn on debug logging to see exception stack trace.
2016-01-07T16:51:36,707 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1] Unparseable events! Turn on debug logging to see exception stack trace.
2016-01-07T16:56:36,709 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1] Unparseable events! Turn on debug logging to see exception stack trace.
2016-01-07T16:57:36,710 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1] Unparseable events! Turn on debug logging to see exception stack trace.

Hi Druid,

Is this the proper forum to post on druid issues?

Hi David,

I think you can turn the debug logs on in {druid home}/config/_common/log4j2.xml

I turned on debugging…

this is all I see in the logs and goes on and on… Kafka is working FYI. I totality rebuilt kafka and create a new topic and updated the topic in realtime spec. I am seeing kafka writes in realtime. Druid realtime is not working.

2016-01-08T11:26:04,222 INFO [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452252249617-c6d54200-leader-finder-thread] kafka.utils.VerifiableProperties - Property client.id is overridden to topic-impression1-production
2016-01-08T11:26:04,222 INFO [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452252249617-c6d54200-leader-finder-thread] kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to xxx.xxx.xxx.xxx:9092,104.236.183.120:9092
2016-01-08T11:26:04,222 INFO [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452252249617-c6d54200-leader-finder-thread] kafka.utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
2016-01-08T11:26:04,223 INFO [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452252249617-c6d54200-leader-finder-thread] kafka.client.ClientUtils$ - Fetching metadata from broker id:2,host:104.236.183.120,port:9092 with correlation id 480 for 1 topic(s) Set(topic-impression-production)

is the an issue with kafka 0.8.2?

In the logs the only thing that looks like an error is the below.

java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.10-0.8.2.1.jar:?]
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) ~[kafka_2.10-0.8.2.1.jar:?]
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) ~[kafka_2.10-0.8.2.1.jar:?]
at kafka.producer.SyncProducer.send(SyncProducer.scala:113) ~[kafka_2.10-0.8.2.1.jar:?]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) [kafka_2.10-0.8.2.1.jar:?]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) [kafka_2.10-0.8.2.1.jar:?]
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) [kafka_2.10-0.8.2.1.jar:?]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [kafka_2.10-0.8.2.1.jar:?]
2016-01-08T11:53:33,784 INFO [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452254007350-8e231702-leader-find$
2016-01-08T11:53:33,785 INFO [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452254007350-8e231702-leader-find$
2016-01-08T11:53:33,786 INFO [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452254007350-8e231702-leader-find$
2016-01-08T11:53:33,787 INFO [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452254007350-8e231702-leader-find$
2016-01-08T11:53:33,788 WARN [topic-impression1-production_do-druidrealtime-sf-production-20160107160311-1452254007350-8e231702-leader-find$
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.10-0.8.2.1.jar:?]
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) ~[kafka_2.10-0.8.2.1.jar:?]
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) ~[kafka_2.10-0.8.2.1.jar:?]
at kafka.producer.SyncProducer.send(SyncProducer.scala:113) ~[kafka_2.10-0.8.2.1.jar:?]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) [kafka_2.10-0.8.2.1.jar:?]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) [kafka_2.10-0.8.2.1.jar:?]
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) [kafka_2.10-0.8.2.1.jar:?]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [kafka_2.10-0.8.2.1.jar:?]

I

I see this error. This is how I turned on logging in druid

Below is all i see

2016-01-08T12:21:12,501 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [2] Unparseable events! Turn on debug logging to see exception stack trace.

Hi David,
It seems that druid is not able to parse the events properly.

Can you confirm that events in the kafka topic are valid json ?

Does Real-time indexing supports messages other than the json format, for example csv format?

Hi Fernando,

The Kafka ingest can handle a variety of types: JSON, CSV, TSV, Avro, etc i.e the ones that Druid has a builtin parser for or that you’ve written a Druid extension for).

In my opinion, there should not be an issue with CSV.

Thanks,

Vaibhav