Realtime node memory leak?

We’ve got a realtime node that is pretty consistently running out of heap space with a 6GB heap size. I would think its a problem with our spec, but we’ve got a segment granularity of an hour and it pretty consistently takes about 24 hours for the node to run out of memory. Assuming that the segment should be flushed from memory once finalized, the spec shouldn’t be the cause of this. I had jstatd running the last time this happened, it was utilizing 5.7GB of the total 6GB it had, but the GC activity was at about 100%. The Old/Eden gens were pretty much completely full, so I think it was just recovering survivor space each GC run…

Are there any known memory leaks in Druid 0.8 realtime nodes? Not sure if it’s relevant, but we are running two separate realtime nodes on the same machine. There is enough memory for both, and the same realtime node always runs out of heap.

Just a bit of further information, we are pulling data from Kafka in this realtime node. After doing some memory sampling with VisualVM, it looks like the bulk of the memory usage is taken up by the FireChief thread. I’m a little surprised at this, I wouldn’t have thought the firehose would have much memory utilization since it’s just passing through to the Druid store.

Hi Taylor,
AFAIK, there is no known memory leak in realtime nodes.

I think it can be due to handoff not working properly or a improper configuration.
How many events are you trying to ingest per hour ?

Can you confirm that segment handoff is working as expected and older segments are getting removed from realtime node and handed over to historical nodes ?

Also what is the windowPeriod you are running with ? can you share your realtime node spec also in case something is wrong with the configuration?

It can spike to a few million an hour. I’m able to see the the older segments in the deep storage directory and the historical nodes are reading them in when queried. When the node runs out of memory, this obviously stops happening but it appears to be working properly up until that point. Is there anything in the log that might indicate a failed hand-off?

Here is the spec we’re using:

[

{

"dataSchema" : {

  "dataSource" : "requests",

  "parser" : {

    "type" : "string",

    "parseSpec" : {

      "format" : "json",

      "timestampSpec" : {

        "column" : "@timestamp",

        "format" : "auto"

      },

      "dimensionsSpec" : {

        "dimensions": ["site","env","method","statuscode","bytes","duration","resource_type","repo","clientip","timestamp"],

        "dimensionExclusions" : [],

        "spatialDimensions" : []

      }

    }

  },

  "metricsSpec" : [

    {

      "type" : "count",

      "name" : "count"

    },

    {

      "type":"longSum",

      "name":"bytesSum",

      "fieldName":"bytes"

    },

    {

      "type":"longSum",

      "name":"durationSum",

      "fieldName":"duration"

    },

    { 

      "type" : "hyperUnique", 

      "name" : "unique_ips", 

      "fieldName" : "clientip" 

    }

  ],

  "granularitySpec" : {

    "type" : "uniform",

    "segmentGranularity" : "HOUR",

    "queryGranularity" : "NONE"

  }

},

"ioConfig" : {

  "type" : "realtime",

  "firehose": {

    "type": "kafka-0.8",

    "consumerProps": {

      "zookeeper.connect": "zk1:2181,zk2:2181,zk3:2181",

      "zookeeper.connection.timeout.ms" : "15000",

      "zookeeper.session.timeout.ms" : "15000",

      "zookeeper.sync.time.ms" : "5000",

      "group.id": "druid-requests",

      "fetch.message.max.bytes" : "1048586",

      "auto.offset.reset": "largest",

      "auto.commit.enable": "false"

    },

    "feed": "log.json.request_log"

  },

  "plumber": {

    "type": "realtime"

  }

},

"tuningConfig": {

  "type" : "realtime",

  "maxRowsInMemory": 5000000,

  "intermediatePersistPeriod": "PT15m",

  "windowPeriod": "PT10m",

  "basePersistDirectory": "/data/druid/index/realtime",

  "rejectionPolicy": {

    "type": "serverTime"

  }

}

}

]

Hi Taylor,

If an exception occurs during segment handoff, I believe you’ll see something like this in the logs:

log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
   .addData("interval", sink.getInterval())
   .emit();

It may be near a log message of this form, which gets written when the realtime abandons a segment after handoff:
log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion);


Thanks,
Jon