Storm Tranquility DruidBeams.builder doesn't work the same as reading from JSON file

I’m trying to move away from just using JSON file to use DruidBeams.builder instead. But I get this error when using DruidBeams.builder so I’m not sure what I did wrong.

Here’s my Spec json file

{
“dataSources”: {
“twitter-firehose-web-test”: {
“spec”: {
“dataSchema”: {
“dataSource”: “twitter-firehose-web-test”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“timestampSpec”: {
“column”: “_ts”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”: ,
“dimensionExclusions”:
},
“format”: “json”
}
},
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “MINUTE”,
“queryGranularity”: “none”
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “count”
}
]
},
“ioConfig”: {
“type”: “realtime”
},
“tuningConfig”: {
“type”: “realtime”,
“maxRowsInMemory”: “100000”,
“intermediatePersistPeriod”: “PT10M”,
“windowPeriod”: “PT1H”
}
},
“properties”: {
“task.partitions”: “1”,
“task.replicants”: “1”
}
}
},
“properties”: {
“zookeeper.connect”: “localhost”,
“druid.discovery.curator.path”: “/druid/discovery”,
“druid.selectors.indexing.serviceName”: “druid/overlord”
}
}

``

And this is my builder

    final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
            new CountAggregatorFactory("count")
    );

    CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", new BoundedExponentialBackoffRetry(100, 1000, 5));
    curatorFramework.start();

    Beam<Map<String, Object>> beam = DruidBeams.builder(new Timestamper<Map<String, Object>>() {
        @Override
        public DateTime timestamp(Map<String, Object> theMap) {
            return new DateTime(Long.valueOf(theMap.get("_ts").toString()), DateTimeZone.UTC);

        }
    }).curator(curatorFramework)
            .discoveryPath("/druid/discovery")
            .location(DruidLocation.create("druid/overlord", "twitter-firehose-web-test"))
            .rollup(DruidRollup.create(DruidDimensions.schemaless(), aggregators, QueryGranularity.NONE))
            .tuning(ClusteredBeamTuning.create(Granularity.MINUTE, new Period("PT1M"), new Period("PT1H"), 1, 1)).buildBeam();

    return beam;

``

This is the error I got.

92504 [finagle/netty3-13] WARN com.metamx.tranquility.beam.ClusteredBeam - Emitting alert: [anomaly] Failed to propagate events: druid:overlord/twitter-firehose-web-test
{
“eventCount” : 1,
“timestamp” : “2016-05-06T19:22:00.000Z”,
“beams” : “MergingPartitioningBeam(DruidBeam(interval = 2016-05-06T19:22:00.000/2016-05-06T19:23:00.000, partition = 0, tasks = [index_realtime_twitter-firehose-web-test_2016-05-06T19:22:00.000Z_0_0/twitter-firehose-web-test-22-0000-0000]))”
}
java.io.IOException: Failed to send request to task[index_realtime_twitter-firehose-web-test_2016-05-06T19:22:00.000Z_0_0]: 500 Internal Server Error
at com.metamx.tranquility.druid.TaskClient$$anonfun$apply$2$$anonfun$apply$3.apply(TaskClient.scala:87) ~[storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.metamx.tranquility.druid.TaskClient$$anonfun$apply$2$$anonfun$apply$3.apply(TaskClient.scala:73) ~[storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$6.apply(Future.scala:950) ~[storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Try$.apply(Try.scala:13) ~[storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Future$.apply(Future.scala:97) ~[storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:950) ~[storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:949) ~[storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:112) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise$Transformer.k(Promise.scala:112) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise$Transformer.apply(Promise.scala:122) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise$Transformer.apply(Promise.scala:103) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise$$anon$1.run(Promise.scala:366) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:178) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:136) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:207) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:92) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise.runq(Promise.scala:350) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise.updateIfEmpty(Promise.scala:721) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise.update(Promise.scala:694) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.util.Promise.setValue(Promise.scala:670) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:111) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.finagle.netty3.transport.ChannelTransport.handleUpstream(ChannelTransport.scala:55) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:145) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.finagle.netty3.channel.ChannelStatsHandler.messageReceived(ChannelStatsHandler.scala:78) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at com.twitter.finagle.netty3.channel.ChannelRequestStatsHandler.messageReceived(ChannelRequestStatsHandler.scala:35) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [storm-twitter-stream-1.0-SNAPSHOT.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]

``

Hi,

Did you figure out what was going wrong? I am getting the same error using DruidBeams.builder.

Thanks

I couldn’t figure it out so I just used the factory to create the config instead