[druid-user] Tranquility with Kafka Connect - javax validator problem

You’re probably discarding too much from META-INF in your assembly merge strategy. Try this instead.

assemblyMergeStrategy in assembly := {

case PathList(“org”, “eclipse”, “aether”, xs@_*) => MergeStrategy.first

case PathList(“META-INF”, “sisu”, xs@_*) => MergeStrategy.discard

case “META-INF/services/io.druid.initialization.DruidModule” => MergeStrategy.concat

case x =>

val oldStrategy = (assemblyMergeStrategy in assembly).value

oldStrategy(x)

}

Btw I had started working on one too, but didn’t finish it. I’m also not sure if it works at all since I haven’t tested it recently :). But if you want to look, it’s here: https://github.com/gianm/kafka-connect-druid. Feel free to take any of the code if you think it’s useful, it’s all apache licensed.

ohh!! This (https://github.com/gianm/kafka-connect-druid) is great!! I’m trying to test your repo, but I have a similar problem like my implementation, when start the kafka connect worker with the druid-connect dependencies, it has a conflict with javax and report this:

[2016-04-22 07:05:54,540] WARN FAILED org.eclipse.jetty.server.Server@ee15b1f: java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map; (org.eclipse.jetty.util.component.AbstractLifeCycle:212)

java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;

at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:331)

at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:390)

at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:172)

at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:364)

at javax.servlet.GenericServlet.init(GenericServlet.java:244)

at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:612)

at org.eclipse.jetty.servlet.ServletHolder.initialize(ServletHolder.java:395)

at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:871)

at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298)

at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741)

at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)

at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)

at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)

at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)

at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)

at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)

at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)

at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)

at org.eclipse.jetty.server.handler.StatisticsHandler.doStart(StatisticsHandler.java:232)

at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)

at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)

at org.eclipse.jetty.server.Server.start(Server.java:387)

at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)

at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)

at org.eclipse.jetty.server.Server.doStart(Server.java:354)

at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)

at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:127)

at org.apache.kafka.connect.runtime.Connect.start(Connect.java:58)

at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62)

Exception in thread “main” java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;

at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:331)

at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:390)

at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:172)

at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:364)

at javax.servlet.GenericServlet.init(GenericServlet.java:244)

at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:612)

at org.eclipse.jetty.servlet.ServletHolder.initialize(ServletHolder.java:395)

at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:871)

at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298)

at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741)

at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)

at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)

at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)

at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)

at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)

at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)

at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)

at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)

at org.eclipse.jetty.server.handler.StatisticsHandler.doStart(StatisticsHandler.java:232)

at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)

at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)

at org.eclipse.jetty.server.Server.start(Server.java:387)

at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)

at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)

at org.eclipse.jetty.server.Server.doStart(Server.java:354)

at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)

at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:127)

at org.apache.kafka.connect.runtime.Connect.start(Connect.java:58)

at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62)

``

I’m trying to solve the problem with Sbt - Shade but I have no results :frowning:

Regards,

Andrés

Hmm, maybe try excluding com.sun.jersey:jersey-core, com.sun.jersey:jersey-server, and com.sun.jersey.contribs:jersey-guice from tranquility in kafka-connect-druid. That should let kafka-connect’s win, and we do want those to win because tranquility doesn’t actually need to serve anything.

Hi Gian,

This works, but I need to exclude this dependencies too:

exclude("org.eclipse.jetty","jetty-server")
exclude("org.eclipse.jetty","jetty-servlets")
exclude("org.eclipse.jetty","jetty-util")
exclude("org.eclipse.jetty","jetty-io")
exclude("org.eclipse.jetty","jetty-http")
exclude("org.eclipse.jetty","jetty-client")
exclude("org.eclipse.jetty","jetty-continuation"),

``

Now, I can see how the sink connector try to start, but … It throw this WARN:

[2016-04-25 14:57:29,373] WARN Transient error, will try again in 19,238 ms (com.metamx.tranquility.finagle.FutureRetry$:?)

com.twitter.finagle.NoBrokersAvailableException: No hosts are available for disco!overlord, Dtab.base=, Dtab.local=

at com.twitter.finagle.NoStacktrace(Unknown Source)

[2016-04-25 14:57:48,622] WARN Transient error, will try again in 38,643 ms (com.metamx.tranquility.finagle.FutureRetry$:?)

com.twitter.finagle.NoBrokersAvailableException: No hosts are available for disco!overlord, Dtab.base=, Dtab.local=

at com.twitter.finagle.NoStacktrace(Unknown Source)

``

I think that the problem is on my config, I’m reading the tranquility cons (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md#properties), and I build this properties file:

{

“name” : “druid-sink-monitor”,

“config” : {

“connector.class” : “io.imply.kafkaconnect.druid.TranquilitySinkConnector”,

“tasks.max” : “1”,

“topics”: “rb_monitor”,

“druid.specString”: “{“dataSchema”:{“dataSource”:“rb_monitor_connector”,“metricsSpec”:[{“type”:“count”,“name”:“events”},{“type”:“doubleSum”,“name”:“sum_value”,“fieldName”:“value”},{“type”:“max”,“name”:“max_value”,“fieldName”:“value”},{“type”:“min”,“name”:“min_value”,“fieldName”:“value”}],“granularitySpec”:{“segmentGranularity”:“hour”,“queryGranularity”:“minute”,“type”:“uniform”},“parser”:{“type”:“map”,“parseSpec”:{“format”:“json”,“timestampSpec”:{“column”:“timestamp”,“format”:“posix”},“dimensionsSpec”:{“dimensionExclusions”:[“unit”,“type”]}}}},“tuningConfig”:{“type”:“realtime”,“windowPeriod”:“PT10M”,“intermediatePersistPeriod”:“PT10M”,“maxRowsInMemory”:“100000”}}”,

“zookeeper.connect”:“i-6294daf9”,

“druid.discovery.curator.path”:"/druid/discoveryPath",

“druid.selectors.indexing.serviceName”:“overlord”

}

}

``

I suppose that I forget something, it is so similar to my BeamFactory on Samza-Tranquility:


public class MonitorBeamFactory implements BeamFactory {
    @Override
    public Beam<Object> makeBeam(SystemStream stream, int partitions, int replicas, Config config) {
        final int maxRows = Integer.valueOf(config.get("redborder.beam.monitor.maxrows", "200000"));
        final String intermediatePersist = config.get("redborder.beam.monitor.intermediatePersist", "PT20m");
        final String zkConnect = config.get("systems.kafka.consumer.zookeeper.connect");
        final long indexGranularity = Long.valueOf(config.get("systems.druid_monitor.beam.indexGranularity", "60000"));

        final String dataSource = stream.getStream();

        final List<String> exclusions = ImmutableList.of("unit", "type");

        final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
                new CountAggregatorFactory(EVENTS_AGGREGATOR),
                new DoubleSumAggregatorFactory("sum_value", "value"),
                new MaxAggregatorFactory("max_value", "value"),
                new MinAggregatorFactory("min_value", "value"));

        // The Timestamper should return the timestamp of the class your Samza task produces. Samza envelopes contain
        // Objects, so you'll generally have to cast them here.
        final Timestamper<Object> timestamper = new Timestamper<Object>() {
            @Override
            public DateTime timestamp(Object obj) {
                final Map<String, Object> theMap = (Map<String, Object>) obj;
                Long date = Long.parseLong(theMap.get(TIMESTAMP).toString());
                date = date * 1000;
                return new DateTime(date.longValue());
            }
        };

        final CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(zkConnect)
                .retryPolicy(new ExponentialBackoffRetry(500, 15, 10000))
                .build();

        curator.start();

        return DruidBeams
                .builder(timestamper)
                .curator(curator)
                .discoveryPath("/druid/discoveryPath")
                .location(DruidLocation.create("overlord", "druid:local:firehose:%s", dataSource))
                .rollup(DruidRollup.create(DruidDimensions.schemalessWithExclusions(exclusions), aggregators, new DurationGranularity(indexGranularity, 0)))
                .druidTuning(DruidTuning.create(maxRows, new Period(intermediatePersist), 0))
                .tuning(ClusteredBeamTuning.builder()
                        .partitions(partitions)
                        .replicants(replicas)
                        .segmentGranularity(Granularity.HOUR)
                        .warmingPeriod(new Period("PT15M"))
                        .windowPeriod(new Period("PT10M"))
                        .build())
                .timestampSpec(new TimestampSpec(TIMESTAMP, "posix", null))
                .buildBeam();
    }
}

``

Am I forget something??? The main different that I can see is this line:

DruidLocation.create("overlord", "druid:local:firehose:%s", dataSource)

``

but … I am not sure if this “druid:local:firehose:%s” can affect …

Regards,

Andrés

I have tried to do something but nothing works … The problem is like that tranquility can’t discover overlord to make the indexing tasks … it’s so strange… any idea?

Regards,

Andrés

Maybe the problem is related with this PR https://github.com/druid-io/tranquility/pull/151

Later, I will try the new tranquility release! :slight_smile:

Ah, yeah, that’s probably it! 0.8.0 is out now so give that a try.