Segment Dimensions and Size empty in Realtime Node when using Kafka, no hand-off to Historical Node

Hi. I’ve been trying to get a simple cluster running using a Realtime, Historical, Coordinator and Broker node (so similar to Simple Cluster Configuration page but with the Overlord replaced by a Realtime node). My realtime node is based on the Loading Streaming Data tutorial, using Kafka for input. Based on several other discussions in this group and the info I’ve gathered from the docs, my current config looks like this:

config/_common/common.runtime.properties

Extensions

druid.extensions.coordinates=[“io.druid.extensions:druid-kafka-eight”,“io.druid.extensions:postgresql-metadata-storage”]

Zookeeper

druid.zk.service.host=localhost

Metadata Storage (postgresql)

druid.metadata.storage.type=postgresql

druid.metadata.storage.connector.connectURI=jdbc:postgresql://192.168.0.253:5432/druid

druid.metadata.storage.connector.user=druid

druid.metadata.storage.connector.password=driud

Deep storage (local filesystem for examples - don’t use this in production)

druid.storage.type=local

druid.storage.storage.storageDirectory=/tmp/druid/localStorage

Query Cache (we use a simple 10mb heap-based local cache on the broker)

druid.cache.type=local

druid.cache.sizeInBytes=10000000

Indexing service discovery

druid.selectors.indexing.serviceName=overlord

Monitoring (disabled for examples)

druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”]

Metrics logging (disabled for examples)

druid.emitter=logging

``

config/broker/runtime.properties

druid.host=localhost

druid.port=8082

druid.service=broker

druid.broker.cache.useCache=true

druid.broker.cache.populateCache=true

Bump these up only for faster nested groupBy

druid.processing.buffer.sizeBytes=100000000

druid.processing.numThreads=1

``

config/coordinator/runtime.properties

druid.host=localhost

druid.port=8081

druid.service=coordinator

The coordinator begins assignment operations after the start delay.

We override the default here to start things up faster for examples.

druid.coordinator.startDelay=PT70s

``

config/historical/runtime.properties

druid.host=localhost

druid.port=8083

druid.service=historical

We can only 1 scan segment in parallel with these configs.

Our intermediate buffer is also very small so longer topNs will be slow.

druid.processing.buffer.sizeBytes=100000000

druid.processing.numThreads=1

druid.segmentCache.locations=[{“path”: “/tmp/druid/indexCache”, “maxSize”: 10000000000}]

druid.server.maxSize=10000000000

``

config/realtime/runtime.properties

druid.host=localhost

druid.port=8084

druid.service=realtime

We can only 1 scan segment in parallel with these configs.

Our intermediate buffer is also very small so longer topNs will be slow.

druid.processing.buffer.sizeBytes=100000000

druid.processing.numThreads=1

Enable Real monitoring

druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”,“io.druid.segment.realtime.RealtimeMetricsMonitor”]

druid.monitoring.monitors=[“io.druid.segment.realtime.RealtimeMetricsMonitor”]

``

examples/indexing/wikipedia.spec

[

{

“dataSchema” : {

“dataSource” : “wikipedia-kafka”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “timestamp”,

“format” : “auto”

},

“dimensionsSpec” : {

“dimensions”: [“page”,“language”,“user”,“unpatrolled”,“newPage”,“robot”,“anonymous”,“namespace”,“continent”,“country”,“region”,“city”],

“dimensionExclusions” : ,

“spatialDimensions” :

}

}

},

“metricsSpec” : [{

“type” : “count”,

“name” : “count”

}, {

“type” : “doubleSum”,

“name” : “added”,

“fieldName” : “added”

}, {

“type” : “doubleSum”,

“name” : “deleted”,

“fieldName” : “deleted”

}, {

“type” : “doubleSum”,

“name” : “delta”,

“fieldName” : “delta”

}],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “FIVE_MINUTE”,

“queryGranularity” : “MINUTE”

}

},

“ioConfig” : {

“type” : “realtime”,

“firehose”: {

“type”: “kafka-0.8”,

“consumerProps”: {

“zookeeper.connect”: “localhost:2181”,

“zookeeper.connection.timeout.ms” : “15000”,

“zookeeper.session.timeout.ms” : “15000”,

“zookeeper.sync.time.ms” : “5000”,

“group.id”: “druid-example”,

“fetch.message.max.bytes” : “1048586”,

“auto.offset.reset”: “largest”,

“auto.commit.enable”: “false”

},

“feed”: “wikipedia-kafka”

},

“plumber”: {

“type”: “realtime”

}

},

“tuningConfig”: {

“type” : “realtime”,

“maxRowsInMemory”: 500000,

“intermediatePersistPeriod”: “PT3m”,

“windowPeriod”: “PT3m”,

“basePersistDirectory”: “/tmp/realtime/basePersist”,

“rejectionPolicy”: {

“type”: “messageTime”

}

}

}

]

``

The problem is, when I ingest data via Kafka, the realtime node seems to process everything (based on what I see in the log, attached), but it never gets handed over to the Historical node. When viewing the Cluster Server and Segment information on the coordinator console, it seems that my “Segment dimenions” are empty and the “Segment size” is always 0. When testing the sample apps with Wikipedia IRC ingestion it seems okay and handoff and persistence works, so I know the problem isn’t with my Postgres setup. See attached screenshot of the Coordinator Console for some info; note the 2015-07-09 segments is example IRC data, and 2015-07-13 is Kafka data.

The data I’m using is based on the examples/indexing/wikipedia_data.json file, I update the timestamps to fit into the 3 minute “windowPeriod”. None of the message are rejected in the logs, they are all seem to have been processed based on the the line:

2015-07-13T10:29:08,769 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-07-13T10:29:08.768Z”,“service”:“realtime”,“host”:“localhost:8084”,“metric”:“events/processed”,“value”:5,“user2”:“wikipedia-kafka”}]

``

Any help would really be appreciated. From what I can tell, the hand-off to historical doesn’t happy because of the missing dimension segments, but I just can’t figure out why that data is wrong in the first place.

Thanks.

Broker.output.log (617 KB)

Historical.output.log (593 KB)

Realtime.output.log (202 KB)

Coordinator.output.log (846 KB)

Hi Wynand, I think you’re running into these two things,

  1. Segments served by realtime nodes don’t have “dimensions” and “size” filled out until they’re finalized and handed off.

  2. When using the “messageTime” rejectionPolicy, handoff of a particular segment only occurs after the realtime node sees an event timestamped outside the windowPeriod of that segment. With “serverTime”, handoff occurs on a timer regardless of whether new data is coming in or not. We recently updated the docs to be more clear about this and to point out that generally serverTime is the best rejection policy to use in production. (messageTime is meant to make it easier to try out realtime ingestion on older data.)

Hope that helps.

Hi Gian.

Thanks for the response. I did originally try it with “serverTime”, and just checked it again now, but I still get the same result. The logs show processed, but the hand-off never seems to happen.

Some additional things noticed testing further:

  1. It seems that using Kafka for input I never see anything being written to the /tmp/realtime/basePersist/wikipedia-kafka folder. Could that be why it never gets handed to the Historical Node?
  2. I also tested using the Receiver Firehose for direct HTTP ingestion as well, and that reacts the same way: processing seems happy, no hand-off to historical, nothing in basePersist folder.
  3. I re-tested my cluster using the examples/wikipedia/wikipedia_realtime.spec file for the Realtime Node, and with the config for the other nodes unchanged from what I used with Kafka and HTTP; that seems to persist fine and I see data in the basePersist and /tmp/druid/localStorage/ folders.
    Can it be that there is something strange with the Receiver and Kafka Firehoses that causes these issues?

Thanks!

WP

Ok, so, turns out it PEBKAC. I was just testing incorrectly because I misunderstood what it means for a segment to be finalised.

Using some different data generated over a longer period of time with an even smaller “windowPeriod” and “intermediatePersistPeriod”, it seems that the persistence just kicked in much later than I expected.

For anyone else who might have a similar issues, a short explanation:

  1. When there was only one segment of data, it stayed in-memory on the Realtime Node. As soon as a second segment (after “segmentGranularity” of time) was created, they both got stored in the /tmp/realtime/basePersist folder
  2. By the time the third segment was written to the basePersist folder, it took a further “intermediatePersistPeriod” of time before it was written to /tmp/druid/localStorage
  3. Once it was written to local storage, it took a further minute or so for the information in the Coordinator Console to update.
    So it seems it’s actually been working all along, I was just testing incorrectly.

Thanks for the feedback! I hope this helps someone in the future having similar issues avoid the headache I experienced.

Regards,

WP

Hey Wynand, good to know you got it figured out, and thanks for writing up your experience!