How to ingest tsv data with the tranquility?

I have a troubling with the tranquility recently.I’m sending the events to druid by samza,and the event’s delimiter is ‘^’.Howerver,I do not know to write the code to let the tranquilty works.Do you have some example codes.Thanks.


You must implement a BeamFactory.class some like this:

package my.test;
public class TestBeamFactory implements BeamFactory {
    public Beam<Object> makeBeam(SystemStream stream, Config config) {
        final int maxRows = 200000;
        final String intermediatePersist = "PT20m";
        final String zkConnect = "";
        final long indexGranularity = 6000;
        final String dataSource = stream.getStream();
        final Integer partitions = 1;
        final Integer replicas = 2;
        final List<String> dimensions = ImmutableList.of(
                *"dim1", "dim2", "dim3"*

        final List<AggregatorFactory> aggregators = Arrays.asList(new AggregatorFactory[]{new CountAggregatorFactory(EVENTS_AGGREGATOR)});

        // The Timestamper should return the timestamp of the class your Samza task produces. Samza envelopes contain
        // Objects, so you'll generally have to cast them here.
        final Timestamper<Object> timestamper = new Timestamper<Object>() {
            public DateTime timestamp(Object obj) {
                final Map<String, Object> theMap = (Map<String, Object>) obj;
                Long date = Long.parseLong(theMap.get("timestamp").toString());
                date = date * 1000;
                return new DateTime(date.longValue());

        final CuratorFramework curator = CuratorFrameworkFactory.builder()
                .retryPolicy(new ExponentialBackoffRetry(500, 15, 10000))


        return DruidBeams
                .location(DruidLocation.create("overlord", "druid:local:firehose:%s", dataSource))
                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, new DurationGranularity(indexGranularity, 0)))
                .druidTuning(DruidTuning.create(maxRows, new Period(intermediatePersist), 0))
                        .warmingPeriod(new Period("PT5M"))
                        .windowPeriod(new Period("PT15M"))
                .timestampSpec(new TimestampSpec(TIMESTAMP, "posix"))

later you can use the MessageCollecto to send the messages:

private static final SystemStream monitorSystemStream = new SystemStream(“druid_tranquility”, “datasource_test”);

collector.send(new OutgoingMessageEnvelope(systemStream, null, message));

and you must added these properties on samza task config file:


but ... We are using json events, and we parsers this events into a java map to send to druid using tranquility.


Hi Andres,After reading your code,I’m sending events to druid by tranquility successfully.Thank you very much.

在 2015年8月6日星期四 UTC+8下午4:32:17,wangm…@gmail.com写道:

Perfect!! :slight_smile:



En 7 de agosto de 2015 en 6:28:13, ( escrito: