Tranquility throws No hosts are available for disco

Hi,

I’m trying to work on a setup with tranquility as kafka consumer.

My setup is working as expected only that after a period of time tranquility start throwing

WARN c.m.tranquility.beam.ClusteredBeam - Emitting alert: [anomaly] Failed to propagate events: druid:overlord/aggevent
{
“eventCount” : 872,
“timestamp” : “2016-05-16T13:00:00.000Z”,
“beams” : “MergingPartitioningBeam(DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 0, tasks = [index_realtime_aggevent_2016-05-16T13:00:00.000Z_0_0/aggevent-013-0000-0000]), DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 1, tasks = [index_realtime_aggevent_2016-05-16T13:00:00.000Z_1_0/aggevent-013-0001-0000]), DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 2, tasks = [index_realtime_aggevent_2016-05-16T13:00:00.000Z_2_0/aggevent-013-0002-0000]), DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 3, tasks = [index_realtime_aggevent_2016-05-16T13:00:00.000Z_3_0/aggevent-013-0003-0000]), DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 4, tasks = [index_realtime_aggevent_2016-05-16T13:00:00.000Z_4_0/aggevent-013-0004-0000]))”
}
com.twitter.finagle.NoBrokersAvailableException: No hosts are available for disco!firehose:druid:overlord:aggevent-013-0004-0000, Dtab.base=, Dtab.local=
at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]

``

then

c.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“alerts”,“timestamp”:“2016-05-16T13:10:21.611Z”,“service”:“tranquility”,“host”:“localhost”,“severity”:“anomaly”,“description”:“Failed to propagate events: druid:overlord/aggevent”,“data”:{“exceptionType”:“com.twitter.finagle.NoBrokersAvailableException”,“exceptionStackTrace”:“com.twitter.finagle.NoBrokersAvailableException: No hosts are available for disco!firehose:druid:overlord:aggevent-013-0004-0000, Dtab.base=, Dtab.local=\n\tat com.twitter.finagle.NoStacktrace(Unknown Source)\n”,“timestamp”:“2016-05-16T13:00:00.000Z”,“beams”:"MergingPartitioningBeam(DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 0, tasks = [index_realtime_aggevent_2016-05-16T13:00:00.000Z_0_0/aggevent-013-0000-0000]), DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 1, tasks = [index_realtime_aggevent_2016-05-16T13:00:00.000Z_1_0/aggevent-013-0001-0000]), DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 2, tasks = [index_realtime_aggevent_2016-05-16T13:00:00.000Z_2_0/aggevent-013-0002-0000]), DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 3, tasks = [i:

``

I’ve read that it can be related by the fact that middleManager are under pressure.

Unfortunately I can’t check this as the log of the middleManager and historicalNode are clear without error.

I’ve also add tracing garbage collection and I can’t detect any problem with memory.

I’m using S3 as deep storage Is there any relation with this problem.

Another point after some time throwing these error I can see the tranquility kafka consumer lag increasing. But after some time it can can back to work as expected.

I think that there is a lack of ressource for indexing process but how to increase resource for this ?

I’m running on AWS with

1 Master Node : m3.large

3 DataNode : r3.2xlarge

1 queryNode : m3.large

Could you help me to find what the problem is I join also my kafka.json config

{
“dataSources”: {
“aggevent”: {
“spec”: {
“dataSchema”: {
“dataSource”: “aggevent”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“timestampSpec”: {
“column”: “eventTimestamp”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”: [“eventType”, “publisherId”, “contentId”, “adNetworkId”, “countryCode”],
“dimensionExclusions”:
}
}
},
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “hour”,
“queryGranularity”: “none”
},
“metricsSpec”: [
{
“name”: “Pub Net”,
“type”: “doubleSum”,
“fieldName”: “publisherNet”
},
{
“name”: “Pub Gross”,
“type”: “doubleSum”,
“fieldName”: “publisherGross”
},
{
“name”: “IA Net”,
“type”: “doubleSum”,
“fieldName”: “iaNet”
},
{
“name”: “IA Gross”,
“type”: “doubleSum”,
“fieldName”: “iaGross”
},
{
“name”: “Count”,
“type”: “longSum”,
“fieldName”: “occurrences”
}]
},
“ioConfig”: {
“type”: “realtime”
},
“tuningConfig”: {
“type”: “realtime”,
“maxRowsInMemory”: “300000”,
“intermediatePersistPeriod”: “PT30M”,
“windowPeriod”: “PT30M”
}
},
“properties”: {
“task.partitions”: “5”,
“task.replicants”: “1”,
“task.warmingPeriod”: “PT10M”,
“tranquility.maxBatchSize” : “10000”,
“reportDropsAsExceptions” : “false”,
“topicPattern”: “AGGREGATION_EVENT_JSON”
}
}
},
“properties”: {
“zookeeper.connect”: “xxx.xxx.xxx.xxx:2181”,
“druid.discovery.curator.path”: “/druid/discovery”,
“druid.selectors.indexing.serviceName”: “druid/overlord”,
“commit.periodMillis”: “5000”,
“consumer.numThreads”: “2”,
“kafka.zookeeper.connect”: “xxx.xxx.xxx.xxx:2181”,
“kafka.group.id”: “tranquility-kafka”,
“kafka.auto.offset.reset”: “largest”
}
}

``

Hi Richard,
How many middlemanagers you have and what is druid.worker.capacity set to ?

Fwiw, as you are running with 5 partitions and replication 1 and windowPeriod 30 minutes,

make sure you have (number of middlemanagers * worker capacity) >= 10 in order to have enough capacity to run tasks for consecutive hours concurrently.

Many Thanks !!!
Working great now