Dynamically updating ingestion spec using tranquility in storm


I’m currently ingesting kafka messages into storm and then ingesting those messages in Druid using storm bolt. As of now, I specify static aggregrator metric names as a part of ingestion spec. What I am looking to do is dynamically update the aggregator metric names for every new task that is created so as to update the ingestion spec. Is there any way to tackle this problem? Any help on this problem is appreciated, thanks!

I have the following code as of now:

class DruidBolt implements com.metamx.tranquility.storm.BeamFactory<Map<String, Object>> {

public static final Logger LOG = LoggerFactory.getLogger(Topology.class);

public Beam<Map<String, Object>> makeBeam(Map<?, ?> map, IMetricsContext imc) {
    try {
        LOG.info("[DruidBeamBolt]  Building beam");

        final CuratorFramework curator = CuratorFrameworkFactory.newClient(
                Configuration.ZOOKEEPER_SERVERS_CONFIG, new BoundedExponentialBackoffRetry(100, 1000, 5));
        final String dataSource = Configuration.KAFKA_SERVERS_TOPIC ;
        final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
                new CountAggregatorFactory("events"),
                new DoubleSumAggregatorFactory("value", "value"),
                new DoubleMaxAggregatorFactory("max_value", "value"),
                new DoubleMinAggregatorFactory("min_value", "value"),
        new ApproximateHistogramAggregatorFactory("histogram","value",50,7,(float)-10000000000000.00,(float)10000000000000.00));
        final List<String> dimensions=new ArrayList<String>();
        final DruidBeams.Builder<Map<String, Object>,Map<String, Object>> builder= DruidBeams
                .builder(new Timestamper<Map<String, Object>>() {
                    public DateTime timestamp(Map<String, Object> theMap) {

                        return new DateTime(theMap.get("timestamp"));
                        new DruidLocation(
                                new DruidEnvironment(
                                ), dataSource
                .rollup(DruidRollup.create(dimensions,  aggregators, QueryGranularity.MINUTE))
                .tuning(ClusteredBeamTuning.create(Granularity.MINUTE, new Period("PT0M"), new Period("PT0M"), 1, 1)
        final Beam<Map<String,Object>> beam = builder.buildBeam();
        return beam;

    } catch (Exception e) {
        LOG.info("[DruidBeamBolt]" + e);
        throw Throwables.propagate(e);



One way of doing this is to create a new task with a new schema when a new metric is detected. This way when the new task starts, it’ll begin reflecting the new metric.

Please correct me if I’m wrong but in the BeamBolt class, looks like the class implementing the makeBeam class is called only once in the prepare method:

 override def prepare(conf: ju.Map[_, _], context: TopologyContext, collector: OutputCollector) {
    require(this.collector == null, "WTF?! Already initialized, but prepare was called anyway.")
    this.collector = collector
    this.tranquilizer = Tranquilizer.create(
      beamFactory.makeBeam(conf, context),
    running = true

So, is the schema created only once in case of a single worker? How would we spawn multiple tasks with new schema?

The included bolt in the tranquility-storm library doesn’t offer dynamic reconfiguration. You need to kill the topology and re-submit it to change the configuration.

You can get dynamic configuration working if you need it, but you’d need to write your own bolt and use tranquility-core instead of tranquility-storm.