Unable to consume messages from kafka with kafka firehose

Hi everyone. I’ve deployed a small druid cluster on AWS to test - 4 nodes, just like the tutorial walked me through, using a separate zookeeper cluster and mysql database.

I had good luck with getting wikipedia example working, but I’m unable to get a kafka firehose example working. Here is the spec I made:

[

{

“dataSchema” : {

“dataSource” : “joshtest”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “ts”,

“format” : “millis”

},

“dimensionsSpec” : {

“dimensions”: [“brand”,“metric”,“value”,“ts”],

“dimensionExclusions” : ,

“spatialDimensions” :

}

}

},

“metricsSpec” : [{

“type” : “count”,

“name” : “count”

},{

“type” : “longSum”,

“name” : “sum”,

“fieldName” : “value”

}],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “DAY”,

“queryGranularity” : “NONE”

}

},

“ioConfig” : {

“type” : “realtime”,

“firehose”: {

“type”: “kafka-0.8”,

“consumerProps”: {

“zookeeper.connect”: “zoo01:2181,zoo02,zoo03”,

“zookeeper.connection.timeout.ms” : “15000”,

“zookeeper.session.timeout.ms” : “15000”,

“zookeeper.sync.time.ms” : “5000”,

“group.id”: “druid-josh-test”,

“fetch.message.max.bytes” : “1048586”,

“auto.offset.reset”: “largest”,

“auto.commit.enable”: “false”

},

“feed”: “brand_single”

},

“plumber”: {

“type”: “realtime”

}

},

“tuningConfig”: {

“type” : “realtime”,

“maxRowsInMemory”: 500000,

“intermediatePersistPeriod”: “PT10m”,

“windowPeriod”: “PT60m”,

“basePersistDirectory”: “/mnt/tmp/realtime/basePersist”,

“rejectionPolicy”: {

“type”: “serverTime”

}

}

}

]

``

I can see the new druid consumer in zookeeper with my consumer name, so I know druid is connecting properly and to the correct topic, but it doesn’t appear to be consuming any messages. I can consume messages off my single-partition topic and they look like what I’d expect:

{“ts”:1426395202, “brand”:“foo”, “value”: 1, “metric”:“updates”}

{“ts”:1426395225, “brand”:“bar”, “value”: 2, “metric”:“searches”}

{“ts”:1426395254, “brand”:“foo”, “value”: 3, “metric”:“searches”}

{“ts”:1426395287, “brand”:“bar”, “value”: 4, “metric”:“updates”}

{“ts”:1426395342, “brand”:“bar”, “value”: 5, “metric”:“searches”}

{“ts”:1426395355, “brand”:“foo”, “value”: 6, “metric”:“searches”}

{“ts”:1426395376, “brand”:“bar”, “value”: 7, “metric”:“updates”}

``

Lastly, in case they help, here are the logs from my realtime broker:

2015-03-15T06:01:56,172 INFO [main-SendThread(zoo02:2181)] org.apache.zookeeper.ClientCnxn - Session establishment complete on server zoo02/10.249.1.197:2181, sessionid = 0x24bfc3708dd000b, negotiated timeout = 15000

2015-03-15T06:01:56,649 INFO [main] io.druid.segment.realtime.RealtimeManager - Firehose acquired!

2015-03-15T06:01:56,649 INFO [main] io.druid.segment.realtime.RealtimeManager - Someone get us a plumber!

2015-03-15T06:01:56,655 INFO [main] io.druid.segment.realtime.plumber.RealtimePlumber - Creating plumber using rejectionPolicy[serverTime-PT60M]

2015-03-15T06:01:56,672 INFO [main] io.druid.segment.realtime.RealtimeManager - We have our plumber!

2015-03-15T06:01:56,678 INFO [chief-joshtest] io.druid.segment.realtime.RealtimeManager - FireChief[joshtest] state ok.

2015-03-15T06:01:56,687 INFO [main] org.eclipse.jetty.server.Server - jetty-9.2.5.v20141112

2015-03-15T06:01:56,720 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Loading previously persisted segment at [/mnt/tmp/realtime/basePersist/joshtest/2015-03-15T00:00:00.000Z_2015-03-16T00:00:00.000Z/0]

2015-03-15T06:01:56,753 INFO [chief-joshtest] io.druid.guice.PropertiesModule - Loading properties from common.runtime.properties

2015-03-15T06:01:56,760 INFO [chief-joshtest] io.druid.guice.PropertiesModule - Loading properties from runtime.properties

2015-03-15T06:01:56,787 INFO [chief-joshtest] org.skife.config.ConfigurationObjectFactory - Using method itself for [${base_path}.columnCache.sizeBytes] on [io.druid.query.DruidProcessingConfig#columnCacheSizeBytes()]

2015-03-15T06:01:56,788 INFO [chief-joshtest] org.skife.config.ConfigurationObjectFactory - Assigning value [100000000] for [druid.processing.buffer.sizeBytes] on [io.druid.query.DruidProcessingConfig#intermediateComputeSizeBytes()]

2015-03-15T06:01:56,788 INFO [chief-joshtest] org.skife.config.ConfigurationObjectFactory - Assigning value [2] for [druid.processing.numThreads] on [io.druid.query.DruidProcessingConfig#getNumThreads()]

2015-03-15T06:01:56,788 INFO [chief-joshtest] org.skife.config.ConfigurationObjectFactory - Assigning default value [processing-%s] for [${base_path}.formatString] on [com.metamx.common.concurrent.ExecutorServiceConfig#getFormatString()]

2015-03-15T06:01:56,829 INFO [chief-joshtest] io.druid.guice.JsonConfigurator - Loaded class[interface io.druid.segment.data.BitmapSerdeFactory] from props[druid.processing.bitmap.] as [io.druid.segment.data.BitmapSerde$DefaultBitmapSerdeFactory@2a7a9be3]

Mar 15, 2015 6:01:57 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory register

INFO: Registering com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider as a provider class

Mar 15, 2015 6:01:57 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory register

INFO: Registering io.druid.server.StatusResource as a root resource class

Mar 15, 2015 6:01:57 AM com.sun.jersey.server.impl.application.WebApplicationImpl _initiate

INFO: Initiating Jersey application, version ‘Jersey: 1.17.1 02/28/2013 12:47 PM’

2015-03-15T06:01:57,145 INFO [chief-joshtest] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[joshtest_2015-03-15T00:00:00.000Z_2015-03-16T00:00:00.000Z_2015-03-15T00:00:00.000Z] at path[/druid/segments/druid01:8084/2015-03-15T06:01:57.143Z0]

2015-03-15T06:01:57,176 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Expect to run at [2015-03-16T01:00:00.000Z]

2015-03-15T06:01:57,179 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Starting merge and push.

2015-03-15T06:01:57,180 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Found [1] sinks. minTimestamp [2015-03-15T00:00:00.000Z]

2015-03-15T06:01:57,180 WARN [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - [2015-03-15T00:00:00.000Z] < [2015-03-15T00:00:00.000Z] Skipping persist and merge.

2015-03-15T06:01:57,180 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks to persist and merge

Mar 15, 2015 6:01:57 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider to GuiceManagedComponentProvider with the scope “Singleton”

Mar 15, 2015 6:01:57 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.server.QueryResource to GuiceInstantiatedComponentProvider

Mar 15, 2015 6:01:57 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.segment.realtime.firehose.ChatHandlerResource to GuiceInstantiatedComponentProvider

Mar 15, 2015 6:01:57 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.server.StatusResource to GuiceManagedComponentProvider with the scope “Undefined”

2015-03-15T06:01:57,730 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@17ea726{/,null,AVAILABLE}

2015-03-15T06:01:57,744 INFO [main] org.eclipse.jetty.server.ServerConnector - Started ServerConnector@281bda84{HTTP/1.1}{0.0.0.0:8084}

2015-03-15T06:01:57,744 INFO [main] org.eclipse.jetty.server.Server - Started @8769ms

2015-03-15T06:01:57,745 INFO [main] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking start method[public void io.druid.server.coordination.AbstractDataSegmentAnnouncer.start()] on object[io.druid.server.coordination.BatchDataSegmentAnnouncer@9175bc3].

2015-03-15T06:01:57,745 INFO [main] io.druid.server.coordination.AbstractDataSegmentAnnouncer - Announcing self[DruidServerMetadata{name=‘druid01:8084’, host=‘druid01:8084’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}] at [/druid/announcements/druid01:8084]

2015-03-15T06:01:57,757 INFO [ServerInventoryView-0] io.druid.curator.inventory.CuratorInventoryManager - Created new InventoryCacheListener for /druid/segments/druid01:8084

2015-03-15T06:01:57,758 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘druid01:8084’, host=‘druid01:8084’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]

``

Hope someone can help - druid sounds really promising but as you I’m sure you’re aware it’s pretty complicated to understand!

-Joshua Buss

Hi Joshua,

“ts:” 1426395254 actually looks like “posix” rather than “millis”. Do things work if you switch the timestampFormat? If not, can you please try with Druid 0.7.0 (if you’re not already)? If things still get stuck with 0.7.0, can you attach a thread dump of the realtime node? (jstack -l [pid])

Ah, thank you. I actually never found any documentation on how the different timestamp formaters work, so I took a stab in the dark with “millis” since I’m using millisecond timestamps.

I think that did help - now I get an error:

INFO: Binding com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider to GuiceManagedComponentProvider with the scope “Singleton”

2015-03-16T01:49:25,236 INFO [joshtest-2015-03-15T00:00:00.000Z-persist-n-merge] io.druid.segment.IndexMerger - outDir[/mnt/tmp/realtime/basePersist/joshtest/2015-03-15T00:00:00.000Z_2015-03-16T00:00:00.000Z/merged/v8-tmp] completed index.drd in 18 millis.

2015-03-16T01:49:25,283 INFO [joshtest-2015-03-15T00:00:00.000Z-persist-n-merge] io.druid.segment.IndexMerger - outDir[/mnt/tmp/realtime/basePersist/joshtest/2015-03-15T00:00:00.000Z_2015-03-16T00:00:00.000Z/merged/v8-tmp] completed dim conversions in 39 millis.

2015-03-16T01:49:25,318 ERROR [joshtest-2015-03-15T00:00:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Failed to persist merged index[joshtest]: {class=io.druid.segment.realtime.plumber.RealtimePlumber, exceptionType=class java.lang.NullPointerException, exceptionMessage=null, interval=2015-03-15T00:00:00.000Z/2015-03-16T00:00:00.000Z}

java.lang.NullPointerException

at io.druid.segment.IndexMerger.makeIndexFiles(IndexMerger.java:645) ~[druid-processing-0.7.0.jar:0.7.0]

at io.druid.segment.IndexMerger.merge(IndexMerger.java:319) ~[druid-processing-0.7.0.jar:0.7.0]

at io.druid.segment.IndexMerger.mergeQueryableIndex(IndexMerger.java:206) ~[druid-processing-0.7.0.jar:0.7.0]

at io.druid.segment.IndexMerger.mergeQueryableIndex(IndexMerger.java:199) ~[druid-processing-0.7.0.jar:0.7.0]

at io.druid.segment.realtime.plumber.RealtimePlumber$4.doRun(RealtimePlumber.java:438) [druid-server-0.7.0.jar:0.7.0]

at io.druid.common.guava.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:40) [druid-common-0.7.0.jar:0.7.0]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_75]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_75]

at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]

Mar 16, 2015 1:49:25 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.server.QueryResource to GuiceInstantiatedComponentProvider

Mar 16, 2015 1:49:25 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.segment.realtime.firehose.ChatHandlerResource to GuiceInstantiatedComponentProvider

Mar 16, 2015 1:49:25 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.server.StatusResource to GuiceManagedComponentProvider with the scope “Undefined”

2015-03-16T01:49:25,697 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@492951af{/,null,AVAILABLE}

2015-03-16T01:49:25,707 INFO [main] org.eclipse.jetty.server.ServerConnector - Started ServerConnector@1898cb7b{HTTP/1.1}{0.0.0.0:8084}

2015-03-16T01:49:25,708 INFO [main] org.eclipse.jetty.server.Server - Started @8773ms

2015-03-16T01:49:25,708 INFO [main] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking start method[public void io.druid.server.coordination.AbstractDataSegmentAnnouncer.start()] on object[io.druid.server.coordination.BatchDataSegmentAnnouncer@79da2cc7].

2015-03-16T01:49:25,708 INFO [main] io.druid.server.coordination.AbstractDataSegmentAnnouncer - Announcing self[DruidServerMetadata{name=‘druid04:8084’, host=‘druid04:8084’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}] at [/druid/announcements/druid04we2:8084]

2015-03-16T01:49:25,720 INFO [ServerInventoryView-0] io.druid.curator.inventory.CuratorInventoryManager - Created new InventoryCacheListener for /druid/segments/druid04we2:8084

2015-03-16T01:49:25,721 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘druid04we2:8084’, host=‘druid04:8084’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]

``

Any ideas what I’m doing wrong now?

-Josh

You might have some leftover files from previous test runs. Can you try stopping the service, clearing out /mnt/tmp/realtime/basePersist/joshtest, and starting again?

ok, did that and restarted, no error… but still not seeing anything actually show up. In the coordinator, I’m getting this:

2015-03-16T03:20:35,428 WARN [DatabaseSegmentManager-Exec–0] io.druid.metadata.SQLMetadataSegmentManager - No segments found in the database!

2015-03-16T03:20:35,600 INFO [DatabaseRuleManager-Exec–0] io.druid.metadata.SQLMetadataRuleManager - Polled and found rules for 1 datasource(s)

2015-03-16T03:20:45,878 INFO [Coordinator-Exec–0] io.druid.server.coordinator.ReplicationThrottler - [_default_tier]: Replicant create queue is empty.

2015-03-16T03:20:45,878 INFO [Coordinator-Exec–0] io.druid.server.coordinator.ReplicationThrottler - [_default_tier]: Replicant terminate queue is empty.

2015-03-16T03:20:45,878 INFO [Coordinator-Exec–0] io.druid.server.coordinator.helper.DruidCoordinatorBalancer - [_default_tier]: One or fewer servers found. Cannot balance.

2015-03-16T03:20:45,878 INFO [Coordinator-Exec–0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Load Queues:

2015-03-16T03:20:45,878 INFO [Coordinator-Exec–0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Server[druid02:8083, historical, _default_tier] has 0 left to load, 0 left to drop, 0 bytes queued, 0 bytes served.

``

Here’s what the realtime’s logs look like now:

2015-03-16T03:17:35,110 INFO [main] io.druid.segment.realtime.RealtimeManager - Calling the FireDepartment and getting a Firehose.

2015-03-16T03:17:35,114 WARN [main-SendThread(zoo03:2181)] org.apache.zookeeper.ClientCnxnSocket - Connected to an old server; r-o mode will be unavailable

2015-03-16T03:17:35,114 INFO [main-SendThread(zoo03:2181)] org.apache.zookeeper.ClientCnxn - Session establishment complete on server zoo03/10.244.154.187:2181, sessionid = 0x34bfc37636f0019, negotiated tim

eout = 30000

2015-03-16T03:17:35,118 INFO [main-EventThread] org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED

2015-03-16T03:17:35,270 INFO [ServerInventoryView-0] io.druid.curator.inventory.CuratorInventoryManager - Created new InventoryCacheListener for /druid/segments/druid02:8083

2015-03-16T03:17:35,280 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘druid02:8083’, host=‘druid02:8083’, maxSize=10000000000, tier=’_default_tier’, type=‘historical’, priority=‘0’}]

2015-03-16T03:17:35,295 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - Inventory Initialized

2015-03-16T03:17:35,560 INFO [main] org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=zoo01:2181,zoo02:2181,zoo03:2181 sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@4442459f

2015-03-16T03:17:35,565 INFO [main-SendThread(zoo03:2181)] org.apache.zookeeper.ClientCnxn - Opening socket connection to server zoo03/10.244.154.187:2181. Will not attempt to authenticate using SASL (unknown error)

2015-03-16T03:17:35,566 INFO [main-SendThread(zoo03:2181)] org.apache.zookeeper.ClientCnxn - Socket connection established to zoo03/10.244.154.187:2181, initiating session

2015-03-16T03:17:35,571 WARN [main-SendThread(zoo03:2181)] org.apache.zookeeper.ClientCnxnSocket - Connected to an old server; r-o mode will be unavailable

2015-03-16T03:17:35,571 INFO [main-SendThread(zoo03:2181)] org.apache.zookeeper.ClientCnxn - Session establishment complete on server zoo03/10.244.154.187:2181, sessionid = 0x34bfc37636f001a, negotiated timeout = 15000

2015-03-16T03:17:36,033 INFO [main] io.druid.segment.realtime.RealtimeManager - Firehose acquired!

2015-03-16T03:17:36,033 INFO [main] io.druid.segment.realtime.RealtimeManager - Someone get us a plumber!

2015-03-16T03:17:36,042 INFO [main] io.druid.segment.realtime.plumber.RealtimePlumber - Creating plumber using rejectionPolicy[serverTime-PT60M]

2015-03-16T03:17:36,042 INFO [main] io.druid.segment.realtime.RealtimeManager - We have our plumber!

2015-03-16T03:17:36,046 INFO [chief-joshtest] io.druid.segment.realtime.RealtimeManager - FireChief[joshtest] state ok.

2015-03-16T03:17:36,048 INFO [main] org.eclipse.jetty.server.Server - jetty-9.2.5.v20141112

2015-03-16T03:17:36,096 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Expect to run at [2015-03-17T01:00:00.000Z]

2015-03-16T03:17:36,196 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Starting merge and push.

2015-03-16T03:17:36,201 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks. minTimestamp [2015-03-16T00:00:00.000Z]

2015-03-16T03:17:36,202 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks to persist and merge

Mar 16, 2015 3:17:36 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory register

INFO: Registering com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider as a provider class

Mar 16, 2015 3:17:36 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory register

INFO: Registering io.druid.server.StatusResource as a root resource class

Mar 16, 2015 3:17:36 AM com.sun.jersey.server.impl.application.WebApplicationImpl _initiate

INFO: Initiating Jersey application, version ‘Jersey: 1.17.1 02/28/2013 12:47 PM’

Mar 16, 2015 3:17:36 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider to GuiceManagedComponentProvider with the scope “Singleton”

Mar 16, 2015 3:17:36 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.server.QueryResource to GuiceInstantiatedComponentProvider

Mar 16, 2015 3:17:36 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.segment.realtime.firehose.ChatHandlerResource to GuiceInstantiatedComponentProvider

Mar 16, 2015 3:17:36 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

INFO: Binding io.druid.server.StatusResource to GuiceManagedComponentProvider with the scope “Undefined”

2015-03-16T03:17:36,952 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@70de7d0f{/,null,AVAILABLE}

2015-03-16T03:17:36,960 INFO [main] org.eclipse.jetty.server.ServerConnector - Started ServerConnector@3a794ebd{HTTP/1.1}{0.0.0.0:8084}

2015-03-16T03:17:36,960 INFO [main] org.eclipse.jetty.server.Server - Started @8480ms

2015-03-16T03:17:36,960 INFO [main] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking start method[public void io.druid.server.coordination.AbstractDataSegmentAnnouncer.start()] on object[io.druid.server.coordination.BatchDataSegmentAnnouncer@600a73c7].

2015-03-16T03:17:36,960 INFO [main] io.druid.server.coordination.AbstractDataSegmentAnnouncer - Announcing self[DruidServerMetadata{name=‘druid04:8084’, host=‘druid04:8084’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}] at [/druid/announcements/druid04:8084]

2015-03-16T03:17:36,998 INFO [ServerInventoryView-0] io.druid.curator.inventory.CuratorInventoryManager - Created new InventoryCacheListener for /druid/segments/druid04:8084

2015-03-16T03:17:36,998 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘druid04:8084’, host=‘druid04:8084’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]

``

Broker and Historical nodes aren’t showing anything new, and my dashboard UI is blank. I’ve inserted thousands of messages into this topic and still nothing :frowning:

-Josh

Hi Joshua,

Are you inserting recent (within one hour of the wall clock time on your realtime node) data, or older data? If you turn on the realtime metrics monitor, you should see stats on events dropped, ingested, unparsed, etc. If you are not inserting events within one hour of the wall clock time, they will be rejected with your current configuration. Also, given your configuration, handoff will not occur until after segmentGranularity + windowPeriod has passed (so, it may be 2+ hours from when you turn on your realtime node until events are seen).

Thanks for all the quick replies!

How do I turn on the realtime metrics monitor? What are some better segmentGranularity and windowPeriod numbers to use while testing?

I am inserting stats with the current time. Here is what I see in the UI:

Here’s the python I’m using to make messages:

#!/usr/bin/env python

from kafka.client import KafkaClient

from kafka.producer import SimpleProducer

from random import choice

from simplejson import dumps as json_encode

from time import time

import sys

KAFKA_CLIENT = KafkaClient(‘kafka01:9092,kafka02:9092’)

KAFKA_PRODUCER = SimpleProducer(KAFKA_CLIENT)

topic_name = sys.argv[1]

num_messages = int(sys.argv[2])

def make_message():

Example: {“ts”:1426395376, “brand”:“foo”, “value”: 7, “metric”:“update”}

ts = int(time() * 1000)

brand = choice([“foo”, “bar”])

metric = choice([“update”,“search”,“view”,“click”,“browse”,“buy”])

return {“ts”:ts, “brand”:brand, “metric”: metric, “value”: choice(range(1,4))}

for i in range(0, num_messages):

KAFKA_PRODUCER.send_messages(topic_name, json_encode(make_message()))

``

I’ll try adjusting segmentGranularity to MINUTE and windowPeriod to 1m for now.

-Josh

OK - After waiting all night and leaving my cluster running with the MINUTE / 1m tunings, I came back this morning and tried firing another burst of messages into kafka… and I finally see activity on the firehose. The logs now have these two lines for the first time:

2015-03-16T12:24:13,495 INFO [chief-joshtest] io.druid.segment.realtime.plumber.RealtimePlumber - Submitting persist runnable for dataSource[joshtest]

2015-03-16T12:24:13,500 INFO [joshtest-incremental-persist] io.druid.firehose.kafka.KafkaEightFirehoseFactory - committing offsets

``

followed by the same repeating pattern of:

2015-03-16T12:25:00,076 INFO [joshtest-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Starting merge and push.

2015-03-16T12:25:00,076 INFO [joshtest-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks. minTimestamp [2015-03-16T12:24:00.000Z]

2015-03-16T12:25:00,076 INFO [joshtest-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks to persist and merge

``

Still nothing in the UI though or in my historical datastore (cassandra).

-Josh

Hi, see inline.

Thanks for all the quick replies!

How do I turn on the realtime metrics monitor?

In your common.runtime.properties configuration, set:

druid.emitter=logging

In your realtime runtime.properties, set

druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]

When you turn this on, we can get an idea of whether the node is actually ingesting anything.

What are some better segmentGranularity and windowPeriod numbers to use while testing?

If you only want to test that handoff is successful, you can use minutely segments with 0 window period.

Thanks again, now with that change, if I’m reading this correctly anyway, I’m seeing that all my messages are being thrown away:

2015-03-16T18:37:12,260 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-03-16T18:37:12.259Z”,“service”:“realtime”,“host”:“druid04:8084”,“metric”:“events/thrownAway”,“value”:400,“user2”:“joshtest”}]

``

Why would it be throwing these away instead of doing something with them?

-Josh

Hi Joshua, what is your windowPeriod set to? A small correction for your setup to only test handoff, you’ll still need a windowPeriod for the acceptance of events. Try a 10 minute windowPeriod. Events within +/- 10 minutes of your server’s wall clock time should be ingested. For production, we typically use windowPeriod 15-40 minutes and a segmentGranularity of hour.

I just updated to 10m, but that didn’t change anything.

So, on a whim, I decided to try inserting seconds instead of miliseconds as my timestamps, and now I see some metrics finally getting processed.

2015-03-16T19:22:51,935 INFO [chief-joshtest] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[joshtest_2015-03-16T19:22:00.000Z_2015-03-16T19:23:00.000Z_2015-03-16T19:22:00.000Z] at path[/druid/segments/druid04:8084/2015-03-16T19:22:51.933Z0]
2015-03-16T19:23:42,784 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-03-16T19:23:42.784Z”,“service”:“realtime”,“host”:“druid04we2:8084”,“metric”:“events/processed”,“value”:400,“user2”:“joshtest”}]

``

Still nothing in UI though… not sure where to go next…

-Josh

Try querying the node with a simple query

Here’s the magic line in your log that is worth noting:

2015-03-16T03:17:36,960 INFO [main] org.eclipse.jetty.server.ServerConnector - Started ServerConnector@3a794ebd{HTTP/1.1}{0.0.0.0:8084}

So the port is 8084. The query should look something like this:

{

“queryType” : “timeseries”,

“dataSource” : “joshtest”,

“granularity” : “all”,

“intervals”: [ “1970-01-01T00:00:00.000/2019-01-03T00:00:00.000” ],

“aggregations”: [

{ “type”: “count”, “name”: “count” }

]

}

``

And you can stick that in someQuery.json and query it like this:

curl -i -X POST “http://${HOST_IP}:8084/druid/v2/” -H ‘content-type: application/json’ -d @someQuery.json

``

Thank you all for all the great help. I got this all working and druid is blowing my mind.

With the help of a few more people in IRC I also got the cassandra deep storage going so the whole thing is working now. Now I really gotta test it to understand all the tuning options better. My main concern is how long of a window we’re going to want - we definitely need to be able to deal with data that comes in ‘rather late’, so I need to understand the tradeoffs having a larger window implies.

-Josh

Hi Joshua, we run with windows with about 15-40 minutes. Have you thought about running a lambda architecture at all? I think if you require 100% accuracy in your data, the open source world is not going to be able to provide those guarantees very easily. Popular projects such as Kafka, Samza, and Storm all provide at least once (not exactly once) delivery guarantees.