Example of using Storm BeamFactory (Java preferred)


Can someone pls explain what configurations to use for Tranquility’s BeamFactory and the corresponding specs in Druid? An example will be really helpful.

I want to ingest kafka messages into Storm(I have completed the Spout part), now I want to ingest into Druid using the StormBolt.

What I have done so far…

package com.aspire.propelstream.odas.topology.bolt;

import com.aspire.propelstream.odas.topology.spout.DrivingEventScheme;

import backtype.storm.task.IMetricsContext;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.google.common.base.Throwables;

import com.google.common.collect.ImmutableList;

import com.metamx.common.Granularity;

import com.metamx.tranquility.beam.Beam;

import com.metamx.tranquility.beam.ClusteredBeamTuning;

import com.metamx.tranquility.druid.DruidBeams;

import com.metamx.tranquility.druid.DruidLocation;

import com.metamx.tranquility.druid.DruidRollup;

import com.metamx.tranquility.storm.BeamBolt;

import com.metamx.tranquility.storm.BeamFactory;

import com.metamx.tranquility.typeclass.JavaObjectWriter;

import com.metamx.tranquility.typeclass.Timestamper;

import io.druid.granularity.QueryGranularity;

import io.druid.query.aggregation.AggregatorFactory;

import io.druid.query.aggregation.CountAggregatorFactory;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.retry.RetryOneTime;

import org.joda.time.DateTime;

import org.joda.time.Period;

import java.io.ByteArrayOutputStream;

import java.io.ObjectOutputStream;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import java.util.HashMap;

public class PredictorBolt implements BeamFactory<Map<String, Object>>



public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics)


try {

final CuratorFramework curator = CuratorFrameworkFactory.newClient(

“”, new RetryOneTime(1000));


final List dimensions = ImmutableList.of(“page”,“language”,“user”,“unpatrolled”,“newPage”,“robot”,“anonymous”,“namespace”,“continent”,“country”,“region”,“city”);

final List aggregators = ImmutableList

. of(new CountAggregatorFactory(“cnt”));

   final String dataSource = "wikipedia";

   final DruidBeams.Builder<Map<String, Object>> builder = DruidBeams


   new Timestamper<Map<String, Object>>()



     public DateTime timestamp(Map<String, Object> theMap)


       return new DateTime(theMap.get("timestamp"));













.rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.NONE))




                      .windowPeriod(new Period("PT1M"))



final Beam<Map<String, Object>> beam = builder.buildBeam();

return beam;


catch (Exception e)


throw Throwables.propagate(e);




My wikipedia.spec file



“dataSchema”: {

“dataSource”: “wikipedia”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”


“dimensionsSpec” : {

“dimensions”: [“page”,“language”,“user”,“unpatrolled”,“newPage”,“robot”,“anonymous”,“namespace”,“continent”,“country”,“region”,“city”],

“dimensionExclusions” : ,

“spatialDimensions” :




“metricsSpec”: [


“type”: “count”,

“name”: “count”



“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “MINUTE”,

“queryGranularity”: “NONE”



“ioConfig”: {

“type”: “realtime”,

“firehose”: {

“type”: “kafka-0.8”,

“consumerProps”: {

“zookeeper.connect”: “localhost:2181”,

“zookeeper.connection.timeout.ms”: “15000”,

“zookeeper.session.timeout.ms”: “15000”,

“zookeeper.sync.time.ms”: “5000”,

“group.id”: “wikipedia”,

“fetch.message.max.bytes”: “1048586”,

“auto.offset.reset”: “largest”,

“auto.commit.enable”: “false”


“feed”: “wikipedia”


“plumber”: {

“type”: “realtime”


“children”: [


“type” : “dataSource”,

“ingestionSpec” : {

“dataSource”: “wikipedia”




“type” : “hadoop”,

“paths”: “hdfs://sandbox.hortonworks.com:8020/druid/wikipedia_data.json”



“segmentOutputPath” : “hdfs://sandbox.hortonworks.com:8020/druid/outputSegment”


“tuningConfig”: {

“type”: “realtime”,

“maxRowsInMemory”: 500000,

“intermediatePersistPeriod”: “PT1m”,

“windowPeriod”: “PT1m”,

“rejectionPolicy”: {

“type”: “messageTime”






Hey Sai,

You don’t need any specs in Druid when using Tranquility. You do need to set up the indexing service (Overlord + optional MiddleManagers for running distributed). But once you do that, Tranquility will create the necessary Druid ingestion tasks on its own.

You should be able to modify the tranquility code you posted as necessary for your use case. Some likely modifications you’ll need are,

  1. “dimensions” and “aggregators” appropriate for your dataset

  2. The overlord name in the code you posted is “druid:local:indexer”, but the default name of the overlord in druid is just “overlord”. So unless you changed your overlord’s name, you’ll want to have that be “overlord” in the tranquility code too.

  3. The zookeeper cluster in the code you posted is just localhost, If you’re running a multi-machine setup then that will need to point to an actual zookeeper cluster.

Thank you!