Tranquility in Storm : Not sending data to the realtime task

I’m trying to send data through Tranquility as a BeamBolt in Storm cluster. I think I’ve got it almost working where it creates the realtime task but it doesn’t send data to it. I don’t see any error in task log nor in Tranquility log. The only error I can see is that in the zookeeper it has the following error(?) :

2015-03-05 21:56:32,247 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException when processing sessionid:0x14be1e105e700f3 type:create cxid:0x24 zxid:0xdf1 txntype:-1 reqpath:n/a Error Path:/prod/discovery/druid_traffic_test11-21-0000-0000 Error:KeeperErrorCode = NoNode for /prod/discovery/druid_traffic_test11-21-0000-0000

I have a very simple storm topology where I just have a kafka spout to feed tsv data and a data mapping bolt that translates tsv data into HashMap and a tranquility beam bolt to send to Druid. I run in storm local mode and I don’t see any error from tranquility log but the real time task doesn’t get any data at all. I see ClusteredBeam flushes messages in the log but I don’t see any “sending”.

Can anybody help me find where I need to dig in to make this work?

Here is my setup :

Druid cluster version : 0.6.160

Hi Jaebin,

How are you emitting data to druid beam from your storm bolt?

I emit one value object from the bolt. Here is my code

public class MapBolt extends BaseRichBolt {

private OutputCollector collector;

private final String keys = {

“datetime”,

“x_platform_id”,

“p_account”,

“p_site”,

“p_ad_unit”,

“width”,

“height”,

“u_geo_country”,

“p_coin”,

“a_account”,

“a_brand”,

“bid_bucket_lb”,

“bid_bucket_ub”,

“mkt_impressions”,

“mkt_billable_impressions”,

“p_revenue”,

“sum_winning_bid_no_fees”,

“count_winning_bid_no_fees”,

“sum_losing_bid_no_fees”,

“count_losing_bid_no_fees”

};

@Override

public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector) {

this.collector = collector;

}

@Override

public void execute(Tuple input)

{

// get the string

String data = input.getString(0);

String values = data.split("\t");

// Build map with given columns

if (this.keys.length != values.length) {

// error

System.out.println(“ERROR : keys and values don’t match”);

} else {

Map<String, String> map = new HashMap<String, String>();

for (int i = 0; i < values.length; i++) {

map.put(this.keys[i], values[i]);

}

//map.put(“timestamp”, map.get(“datetime”));

System.out.println(map);

collector.emit(new Values(map));

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“map”));

}

}

Can you try adding timestamp key:

long timestamp = <unix_timestamp>;
map.put(DruidBeams.DefaultTimestampSpec().getTimestampColumn(), new DateTime(timestamp).toString())

I’m using “datetime” field as a timestamp. I set that timestampspec in beam builder (See my builder setup below). . And I know the timestamp is within the window of tranqulity beam bolt. So to add “timestamp” field, I need to change timestampSpec in the builder. I think I’ve tried that before but let me try again.

final Timestamper<Map<String, String>> timeStamper = new Timestamper<Map<String, String>>() {

@Override

public DateTime timestamp(Map<String, String> map)

{

String timestamp = map.get(“datetime”);

return new DateTime(timestamp);

}

};

final DruidBeams.Builder<Map<String, String>> builder = DruidBeams

.builder(timeStamper)

.curator(curator)

.discoveryPath("/prod/discovery")

.location(

DruidLocation.create(

“overlord”,

“druid:firehose:%s”,

dataSource

)

)

.timestampSpec(new TimestampSpec(“datetime”, “iso”))

.rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.MINUTE))

.tuning(

ClusteredBeamTuning

.builder()

.segmentGranularity(Granularity.HOUR)

.windowPeriod(new Period(“PT10M”))

.partitions(1)

.replicants(1)

.build()

);

Ok. I added “timestamp” into the map (with the current time just for debugging) and removed timestampSpec from my beam builder.
And that made some changes in realtime task log. I see non-zero value in “events/processed” metrics in the log but it still have zero for rows/output metrics.

And I don’t see any log message from DruidBeam saying it sent anything. I just got it “flushed” messages.

It turned out that the middle manager is running on the system with UTC timezone and I ran Storm local cluster on my machine with PST. So I changed segmentGranularity to “DAY” in ClusteredBeamTuning just to make sure if that is the case and I see now the segment is being created in the middle manager with that change.

Yeah Prajwal, your instinct was right. It was timezone issue between the middle manager and the storm cluster. It was one of gotchas in real time processing relying on the date string without timezone.

Thanks a lot,

Jaebin

Here is the final update on this issue. Actually it was not the problem of timezone difference between the middle manager and storm cluster where tranquility beamBolt is (even though it contributed it). The problem was the timestamp format I was using. I’ve been using timestamp format without timezone so changing that format to include the timezone information fixed the issue.

Before : 2015-03-05T17:50:09.087390

After : 2015-03-05T18:03:43.360784-08:00

I just changed the format of timestamp in my data and run with all the configuration I posted in the initial post and it worked fine.

Thanks Prajwal again for bringing my attention to the timestamp.