How to ingest tsv data with the tranquility?

Hi,
I have a troubling with the tranquility recently.I’m sending the events to druid by samza,and the event’s delimiter is ‘^’.Howerver,I do not know to write the code to let the tranquilty works.Do you have some example codes.Thanks.

Hi,

You must implement a BeamFactory.class some like this:

package my.test;
public class TestBeamFactory implements BeamFactory {
    @Override
    public Beam<Object> makeBeam(SystemStream stream, Config config) {
        final int maxRows = 200000;
        final String intermediatePersist = "PT20m";
        final String zkConnect = "127.0.0.1:2181";
        final long indexGranularity = 6000;
        final String dataSource = stream.getStream();
        final Integer partitions = 1;
        final Integer replicas = 2;
        final List<String> dimensions = ImmutableList.of(
                *"dim1", "dim2", "dim3"*
        );

        final List<AggregatorFactory> aggregators = Arrays.asList(new AggregatorFactory[]{new CountAggregatorFactory(EVENTS_AGGREGATOR)});

        // The Timestamper should return the timestamp of the class your Samza task produces. Samza envelopes contain
        // Objects, so you'll generally have to cast them here.
        final Timestamper<Object> timestamper = new Timestamper<Object>() {
            @Override
            public DateTime timestamp(Object obj) {
                final Map<String, Object> theMap = (Map<String, Object>) obj;
                Long date = Long.parseLong(theMap.get("timestamp").toString());
                date = date * 1000;
                return new DateTime(date.longValue());
            }
        };

        final CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(zkConnect)
                .retryPolicy(new ExponentialBackoffRetry(500, 15, 10000))
                .build();

        curator.start();

        return DruidBeams
                .builder(timestamper)
                .curator(curator)
                .discoveryPath("/druid/discoveryPath")
                .location(DruidLocation.create("overlord", "druid:local:firehose:%s", dataSource))
                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, new DurationGranularity(indexGranularity, 0)))
                .druidTuning(DruidTuning.create(maxRows, new Period(intermediatePersist), 0))
                .tuning(ClusteredBeamTuning.builder()
                        .partitions(partitions)
                        .replicants(replicas)
                        .segmentGranularity(Granularity.HOUR)
                        .warmingPeriod(new Period("PT5M"))
                        .windowPeriod(new Period("PT15M"))
                        .build())
                .timestampSpec(new TimestampSpec(TIMESTAMP, "posix"))
                .buildBeam();
    }
}



later you can use the MessageCollecto to send the messages:

private static final SystemStream monitorSystemStream = new SystemStream(“druid_tranquility”, “datasource_test”);

collector.send(new OutgoingMessageEnvelope(systemStream, null, message));


and you must added these properties on samza task config file:

systems.druid_tranquility.samza.factory=com.metamx.tranquility.samza.BeamSystemFactory
systems.druid_tranquility.beam.factory=my.test.TestBeamFactory;
systems.druid_tranquility.beam.batchSize=2000
systems.druid_tranquility.beam.maxPendingBatches=5
systems.druid_tranquility.beam.indexGranularity=60000

but ... We are using json events, and we parsers this events into a java map to send to druid using tranquility.

Regards,
Andres


Hi Andres,After reading your code,I’m sending events to druid by tranquility successfully.Thank you very much.

在 2015年8月6日星期四 UTC+8下午4:32:17,wangm…@gmail.com写道:

Perfect!! :slight_smile:

Regards,

Andres

Andrés Gómez

Developer****

redborder.net / agomez@redborder.net

Phone: +34 955 60 11 60

0e6e8de_1.png

square-twitter-20.png square-google-plus-20.png square-linkedin-20.png

Piénsalo antes de imprimir este mensaje

Este correo electrónico, incluidos sus anexos, se dirige exclusivamente a su destinatario. Contiene información CONFIDENCIAL cuya divulgación está prohibida por la ley o puede estar sometida a secreto profesional. Si ha recibido este mensaje por error, le rogamos nos lo comunique inmediatamente y proceda a su destrucción.

This email, including attachments, is intended exclusively for its addressee. It contains information that is CONFIDENTIAL whose disclosure is prohibited by law and may be covered by legal privilege. If you have received this email in error, please notify the sender and delete it from your system.

En 7 de agosto de 2015 en 6:28:13, wangmuling@gmail.com (wangmuling@gmail.com) escrito: