"Tranquility" package exception

Hi,

I used the “tranquility” package to ingest the data into druid through storm.

But there are some exceptions frequently.Here is the exception log. Could somebody help?

java.io.IOException: Unable to push events to task: index_realtime_wordcount_2015-06-18T03:09:00.000Z_0_0 (status = TaskRunning)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:160) ~[stormjar.jar:na]

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:146) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$6.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Try$.apply(Try.scala:13) ~[stormjar.jar:na]

at com.twitter.util.Future$.apply(Future.scala:90) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:824) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:823) ~[stormjar.jar:na]

at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:100) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.k(Promise.scala:100) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.apply(Promise.scala:110) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.apply(Promise.scala:91) [stormjar.jar:na]

at com.twitter.util.Promise$$anon$2.run(Promise.scala:345) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:186) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:212) [stormjar.jar:na]

at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:86) [stormjar.jar:na]

at com.twitter.util.Promise.runq(Promise.scala:331) [stormjar.jar:na]

at com.twitter.util.Promise.updateIfEmpty(Promise.scala:642) [stormjar.jar:na]

at com.twitter.util.Promise.update(Promise.scala:615) [stormjar.jar:na]

at com.twitter.util.Promise.setValue(Promise.scala:591) [stormjar.jar:na]

at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:76) [stormjar.jar:na]

at com.twitter.finagle.transport.ChannelTransport.handleUpstream(ChannelTransport.scala:45) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:194) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [stormjar.jar:na]

at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) [stormjar.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) [stormjar.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [stormjar.jar:na]

at com.twitter.finagle.channel.ChannelStatsHandler.messageReceived(ChannelStatsHandler.scala:86) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [stormjar.jar:na]

at com.twitter.finagle.channel.ChannelRequestStatsHandler.messageReceived(ChannelRequestStatsHandler.scala:35) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [stormjar.jar:na]

at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [stormjar.jar:na]

at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [stormjar.jar:na]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65]

at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]

com.twitter.finagle.NoBrokersAvailableException: No hosts are available for druid:firehose:wordcount-09-0000-0000

at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]

I found other discussion about it, it says the middle manager is low capacity.

But I found I have 3 middle manager nodes, they can process 7 task simultaneously. And there are only one or two tasks pending

And Here is my BeamFactory






public class MyBeamFactory implements BeamFactory<Map<String,Object>> {

   public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
      CuratorFramework curator = CuratorFrameworkFactory.newClient("zkhost:2181", new BoundedExponentialBackoffRetry(100, 1000, 5));

      curator.start();

      String indexService = "overlord";
      String firehousePattern = "druid:firehose:%s";
      String dicoveryPath = "/druid/discovery";
      String datasource = "wordcount";
      final List<String> dimensions = ImmutableList.of("word");
      final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
            new LongSumAggregatorFactory("num", "num")
      );
      return DruidBeams.builder(new Timestamper<Map<String, Object>>() {

         public DateTime timestamp(Map<String, Object> stringObjectMap) {
            return new DateTime(stringObjectMap.get("timestamp"));
         }
      })
      .curator(curator)
            .discoveryPath(dicoveryPath)
      .location(DruidLocation.create(indexService, firehousePattern, datasource))
            .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.MINUTE))
            .tuning(
                  ClusteredBeamTuning
                        .builder().segmentGranularity(Granularity.MINUTE)
                        .warmingPeriod(new Period("PT1M"))
                        .windowPeriod(new Period("PT1M"))
//                      .maxSegmentsPerBeam(10)
                        .partitions(1)
                        .replicants(1)
                        .build()
            ).buildBeam();
   }
}