"tranquility" package can't work

Hi,

I used “tranquility” package to ingest data into druid through storm. But it seems the “BeamBolt” didn’t emit any tuple.

Could somebody help me to see it?

The version of storm is 0.9.5, the version of zookeeper is 3.4.6

Here is the storm log:

2015-06-16T16:06:34.012+0000 o.a.z.ClientCnxn [INFO] Opening socket connection to server ip-10-3-12-99.us-west-1.compute.internal/10.3.12.99:2181. Will not attempt to authenticate using SASL (unknown error)

2015-06-16T16:06:34.014+0000 o.a.z.ClientCnxn [INFO] Socket connection established to ip-10-3-12-99.us-west-1.compute.internal/10.3.12.99:2181, initiating session

2015-06-16T16:06:34.020+0000 o.a.z.ClientCnxn [INFO] Session establishment complete on server ip-10-3-12-99.us-west-1.compute.internal/10.3.12.99:2181, sessionid = 0x14d702834f806f7, negotiated timeout = 40000

2015-06-16T16:06:34.024+0000 o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED

2015-06-16T16:06:35.776+0000 o.h.v.i.u.Version [INFO] HV000001: Hibernate Validator 5.1.3.Final

2015-06-16T16:06:36.601+0000 i.d.g.JsonConfigurator [INFO] Loaded class[class io.druid.guice.ExtensionsConfig] from props[druid.extensions.] as [ExtensionsConfig{searchCurrentClassloader=true, coordinates=, defaultVersion=‘null’, localRepository=’/home/luotao/.m2/repository’, remoteRepositories=[http://repo1.maven.org/maven2/, https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local]}]

2015-06-16T16:06:37.550+0000 c.m.e.c.LoggingEmitter [INFO] Start: started [true]

2015-06-16T16:06:39.224+0000 STDIO [ERROR] Jun 16, 2015 4:06:39 PM com.twitter.finagle.Init$ apply

INFO: Finagle version 6.24.0 (rev=1e06db17ca2de4b85209dd2fbc18e635815e994b) built at 20141212-145625

2015-06-16T16:06:39.608+0000 c.m.c.s.n.f.DiscoResolver [INFO] Updating instances for service[overlord] to Set(ServiceInstance{name=‘overlord’, id=‘3a7bba19-97e7-4ec6-909b-75dc3f7373d3’, address=‘ip-10-3-12-100’, port=8090, sslPort=null, payload=null, registrationTimeUTC=1434002793090, serviceType=DYNAMIC, uriSpec=null})

2015-06-16T16:06:39.800+0000 c.m.t.f.FinagleRegistry [INFO] Created client for service: overlord

2015-06-16T16:06:39.873+0000 b.s.d.executor [INFO] Prepared bolt druid-ingest:(2)

And here is the pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>

   <groupId>druid-storm-ingest-test</groupId>
   <artifactId>druid-storm-ingest-test</artifactId>
   <version>1.0-SNAPSHOT</version>

   <build>
      <plugins>
         <!-- any other plugins -->
         <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <executions>
               <execution>
                  <phase>package</phase>
                  <goals>
                     <goal>single</goal>
                  </goals>
               </execution>
            </executions>
            <configuration>
               <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
               </descriptorRefs>
            </configuration>
         </plugin>
      </plugins>
   </build>

   <dependencies>
      <dependency>
         <groupId>com.metamx</groupId>
         <artifactId>tranquility_2.10</artifactId>
         <version>0.4.2</version>
         <exclusions>
            <exclusion>
               <groupId>log4j</groupId>
               <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.storm</groupId>
               <artifactId>storm-core</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-framework</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-recipes</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-x-discovery</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-test</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
      <dependency>
         <groupId>org.apache.storm</groupId>
         <artifactId>storm-core</artifactId>
         <version>0.9.5</version>
         <scope>provided</scope>
      </dependency>
      <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-framework</artifactId>
         <version>2.8.0</version>
         <exclusions>
            <exclusion>
               <groupId>log4j</groupId>
               <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
      <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-recipes</artifactId>
         <version>2.8.0</version>
         <exclusions>
            <exclusion>
               <groupId>log4j</groupId>
               <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
      <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-x-discovery</artifactId>
         <version>2.8.0</version>
         <exclusions>
            <exclusion>
               <groupId>log4j</groupId>
               <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
      <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-test</artifactId>
         <version>2.8.0</version>
         <exclusions>
            <exclusion>
               <groupId>log4j</groupId>
               <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
      <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-x-discovery</artifactId>
         <version>2.8.0</version>
         <exclusions>
            <exclusion>
               <groupId>log4j</groupId>
               <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
      <dependency>
         <groupId>org.apache.zookeeper</groupId>
         <artifactId>zookeeper</artifactId>
         <version>3.4.6</version>
         <exclusions>
            <exclusion>
               <groupId>log4j</groupId>
               <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>log4j-over-slf4j</artifactId>
         <version>1.7.7</version>
      </dependency>

   </dependencies>

   <repositories>
      <repository>
         <id>pub-libs-releases-local</id>
         <name>pub-libs-releases-local</name>
         <url>https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local/</url>
      </repository>
      <repository>
         <id>jboss-public</id>
         <name>jboss-public</name>
         <url>http://repository.jboss.org/nexus/content/groups/public/</url>
      </repository>
      <repository>
         <id>repo1maven2</id>
         <name>repo1maven2</name>
         <url>http://repo1.maven.org/maven2/</url>
      </repository>
      <repository>
         <id>maven2</id>
         <name>maven2</name>
         <url>http://repo.maven.apache.org/maven2/</url>
      </repository>
   </repositories>
</project>

And here is beam factory:

public class MyBeamFactory implements BeamFactory<Map<String,Object>> {
   public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
      CuratorFramework curator = CuratorFrameworkFactory.newClient("zkhost:2181", new BoundedExponentialBackoffRetry(100, 1000, 5));

      curator.start();

      String indexService = "overlord";
      String firehousePattern = "druid:firehose:%s";
      String dicoveryPath = "/druid/discovery";
      String datasource = "wordcount";
      final List<String> dimensions = ImmutableList.of("word");
      final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
            new LongSumAggregatorFactory("num", "num")
      );
      return DruidBeams.builder(new Timestamper<Map<String, Object>>() {

         public DateTime timestamp(Map<String, Object> stringObjectMap) {
            return new DateTime(stringObjectMap.get("timestamp"));
         }
      })
      .curator(curator)
            .discoveryPath(dicoveryPath)
      .location(DruidLocation.create(indexService, firehousePattern, datasource))
            .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.MINUTE))
            .tuning(
                  ClusteredBeamTuning
                        .builder().segmentGranularity(Granularity.HOUR)
                        .windowPeriod(new Period("PT10M"))
                        .partitions(1)
                        .replicants(1)
                        .build()
            ).buildBeam();
   }
}

And here is my topology:

public class TopologyMain {
public static void main(String args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“word-reader”, new WordReader());
builder.setBolt(“word-normalizer”, new WordNormalizer()).shuffleGrouping(“word-reader”);
builder.setBolt(“word-counter”, new WordCounter()).fieldsGrouping(“word-normalizer”, new Fields(“word”));
builder.setBolt(“druid-ingest”, new BeamBolt<Map<String, Object>>(new MyBeamFactory())).shuffleGrouping(“word-counter”);
Config conf = new Config();
conf.setDebug(false);
// conf.put(Config.TOPOLOGY_DEBUG, true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
// LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“Getting-Started-Toplogie”, conf,
// builder.createTopology());
// try {
// Thread.sleep(100000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// cluster.shutdown();
StormSubmitter.submitTopology(“druid-storm-ingest”, conf, builder.createTopology());
}

You may see some useful log messages if you let the topology run for a few minutes (in case something is timing out; some timeouts are a few minutes in length). You could also try setting the log level for the com.metamx.tranquility package to “TRACE”.

You can also try taking a thread dump (jstack -l [pid]).

Hi, I found the reason is that the event time are not in the time window.
I saw there are some real time tasks in indexing server console , but there some other exceptions in beambolt, it described as below,

2015-06-16T18:27:21.499+0000 c.m.t.f.FutureRetry$ [WARN] Transient error, will try again in 989 ms

java.io.IOException: Unable to push events to task: index_realtime_wordcount_2015-06-16T18:26:00.000Z_0_0 (status = TaskRunning)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:160) ~[stormjar.jar:na]

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:146) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$6.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Try$.apply(Try.scala:13) ~[stormjar.jar:na]

at com.twitter.util.Future$.apply(Future.scala:90) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:824) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:823) ~[stormjar.jar:na]

at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:100) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.k(Promise.scala:100) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.apply(Promise.scala:110) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.apply(Promise.scala:91) [stormjar.jar:na]

at com.twitter.util.Promise$$anon$2.run(Promise.scala:345) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:186) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:212) [stormjar.jar:na]

at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:86) [stormjar.jar:na]

at com.twitter.util.Promise.runq(Promise.scala:331) [stormjar.jar:na]

at com.twitter.util.Promise.updateIfEmpty(Promise.scala:642) [stormjar.jar:na]

at com.twitter.util.Promise.update(Promise.scala:615) [stormjar.jar:na]

at com.twitter.util.Promise.setValue(Promise.scala:591) [stormjar.jar:na]

at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:76) [stormjar.jar:na]

at com.twitter.finagle.transport.ChannelTransport.handleUpstream(ChannelTransport.scala:45) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:194) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [stormjar.jar:na]

at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) [stormjar.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) [stormjar.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [stormjar.jar:na]

at com.twitter.finagle.channel.ChannelStatsHandler.messageReceived(ChannelStatsHandler.scala:86) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [stormjar.jar:na]

at com.twitter.finagle.channel.ChannelRequestStatsHandler.messageReceived(ChannelRequestStatsHandler.scala:35) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [stormjar.jar:na]

at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [stormjar.jar:na]

at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [stormjar.jar:na]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65]

at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]

com.twitter.finagle.NoBrokersAvailableException: No hosts are available for druid:firehose:wordcount-26-0000-0000

at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]

2015-06-16T18:27:22.236+0000 c.m.t.f.FutureRetry$ [WARN] Transient error, will try again in 1442 ms

java.io.IOException: Unable to push events to task: index_realtime_wordcount_2015-06-16T18:25:00.000Z_0_0 (status = TaskRunning)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:160) ~[stormjar.jar:na]

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:146) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$6.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Try$.apply(Try.scala:13) ~[stormjar.jar:na]

at com.twitter.util.Future$.apply(Future.scala:90) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:824) ~[stormjar.jar:na]

at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:823) ~[stormjar.jar:na]

at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:100) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.k(Promise.scala:100) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.apply(Promise.scala:110) [stormjar.jar:na]

at com.twitter.util.Promise$Transformer.apply(Promise.scala:91) [stormjar.jar:na]

at com.twitter.util.Promise$$anon$2.run(Promise.scala:345) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:186) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157) [stormjar.jar:na]

at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:212) [stormjar.jar:na]

at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:86) [stormjar.jar:na]

at com.twitter.util.Promise.runq(Promise.scala:331) [stormjar.jar:na]

at com.twitter.util.Promise.updateIfEmpty(Promise.scala:642) [stormjar.jar:na]

at com.twitter.util.Promise.update(Promise.scala:615) [stormjar.jar:na]

at com.twitter.util.Promise.setValue(Promise.scala:591) [stormjar.jar:na]

at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:76) [stormjar.jar:na]

at com.twitter.finagle.transport.ChannelTransport.handleUpstream(ChannelTransport.scala:45) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:194) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [stormjar.jar:na]

at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) [stormjar.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) [stormjar.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [stormjar.jar:na]

at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [stormjar.jar:na]

at com.twitter.finagle.channel.ChannelStatsHandler.messageReceived(ChannelStatsHandler.scala:86) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [stormjar.jar:na]

at com.twitter.finagle.channel.ChannelRequestStatsHandler.messageReceived(ChannelRequestStatsHandler.scala:35) [stormjar.jar:na]

at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [stormjar.jar:na]

at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) [stormjar.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [stormjar.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [stormjar.jar:na]

at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [stormjar.jar:na]

at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [stormjar.jar:na]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65]

at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]

在 2015年6月17日星期三 UTC+8上午12:50:06,Gian Merlino写道:

The “No hosts are available for druid:firehose:wordcount-26-0000-0000” message for an accepted task probably means something is wrong with service discovery, or the task has not actually got a slot to run in. Do you see any pending tasks on the overlord web console? If so, that may be a capacity issue. You may need to increase druid.worker.capacity, or kill some running tasks, or add more middleManagers.

If you don’t see any pending tasks, then, do you see any running tasks? If you check their logs, do you see any of them announcing that they are handling “wordcount-26-0000-0000”? (that should be in their task logs somewhere)

Yes, I have only one middleManager, and its capacity is low, I will apply more middleManager nodes to test it, thanks very much,
And for another questions want to ask you.

1.Does “tranquility” only support “serverTime” rejectionPolicy? Are there some other rejectionPolicy it can support, like “messageTime”. If it can, how can I set it?

2.Does “tranquility” only support “minute” segmentGranularity, does it support “five minute” or “ten minute”?

在 2015年6月17日星期三 UTC+8上午2:44:01,Gian Merlino写道:

  1. Yeah, only something like “serverTime” is possible. Tranquility’s concept of windowing works slightly differently from the builtin Druid rejectionPolicy: it actually tells Druid to use a “none” rejectionPolicy and then does all the timing checks client-side.

  2. It can do both “minute” and “hour” for sure. I think it should work on “FIVE_MINUTE” and “TEN_MINUTE” too, although I haven’t tested it. Feel free to give it a shot, and if it doesn’t work then please file a github issue.