Tranquility Storm - InvalidClassException, local class incompatible

I try to update from Tranquility 0.5.1 (Scala 2.10) to Tranquility 0.7.4 (also testes with 0.8.2).

But when i deploy the Topology to storm, i get following exception:

java.lang.RuntimeException: java.io.InvalidClassException: com.metamx.tranquility.storm.BeamBolt; local class incompatible: stream classdesc serialVersionUID = -1956694456247380936, local class serialVersionUID = -5294014198338257187
        at backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.utils.Utils.deserialize(Utils.java:89) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.daemon.task$mk_task_data$fn__3129.invoke(task.clj:180) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.daemon.task$mk_task.invoke(task.clj:184) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.daemon.executor$mk_executor$fn__3308.invoke(executor.clj:329) ~[storm-core-0.9.6.jar:0.9.6]
        at clojure.core$map$fn__4207.invoke(core.clj:2485) ~[clojure-1.5.1.jar:na]
        at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
        at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
        at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
        at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
        at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) ~[clojure-1.5.1.jar:na]
        at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) ~[clojure-1.5.1.jar:na]
        at clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) ~[clojure-1.5.1.jar:na]
        at clojure.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
        at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
        at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:329) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype.storm.daemon.worker$fn__4629$exec_fn__1104__auto____4630$iter__4635__4639$fn__4640.invoke(worker.clj:426) ~[storm-core-0.9.6.jar:0.9.6]
        at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
        at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
        at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
        at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
        at clojure.core$dorun.invoke(core.clj:2780) ~[clojure-1.5.1.jar:na]
        at clojure.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__4629$exec_fn__1104__auto____4630.invoke(worker.clj:426) ~[storm-core-0.9.6.jar:0.9.6]
        at clojure.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__4629$mk_worker__4685.doInvoke(worker.clj:393) [storm-core-0.9.6.jar:0.9.6]
        at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$_main.invoke(worker.clj:504) [storm-core-0.9.6.jar:0.9.6]
        at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.6.jar:0.9.6]
Caused by: java.io.InvalidClassException: com.metamx.tranquility.storm.BeamBolt; local class incompatible: stream classdesc serialVersionUID = -1956694456247380936, local class serialVersionUID = -5294014198338257187
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) ~[na:1.8.0_92]
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) ~[na:1.8.0_92]
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) ~[na:1.8.0_92]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) ~[na:1.8.0_92]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_92]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_92]
        at backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:52) ~[storm-core-0.9.6.jar:0.9.6]
        ... 36 common frames omitted

``

I tried this with Storm 0.9.5 and 0.9.6 - update to 1.x is currently not possible for us.

Even a simple Test won’t work and returns with the same exception. This exception won’t occure, when i run them with the local cluster test mode.

I allready tried to remove all Storm entries from ZooKeeper, clearing the Storm directory on disk and try it with a new and clean Storm 0.9.6

I checked the dependency versions from my project with the Storm Lib directory, which seem to have the same versions.

My pom.xml looks like

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.6</version>
        <!-- keep storm out of the jar-with-dependencies -->
        <scope>provided</scope>
    </dependency>

<dependency>
    <groupId>io.druid</groupId>
    <artifactId>tranquility-core_2.10</artifactId>
    <version>0.7.4</version>
</dependency>
<dependency>
    <groupId>io.druid</groupId>
    <artifactId>tranquility-storm_2.10</artifactId>
    <version>0.7.4</version>
</dependency>

And my Test-Topology:

builder.setSpout(“TestSpout”, new BaseRichSpout() {
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {

}

@Override
public void nextTuple() {

}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields(new ArrayList<String>() { {
        add("Test");
    }
    }));
}

});

BeamBolt<Map<String, Object>> druidBolt = new BeamBolt<>(new DruidTranquilityFactory(), 16384);
builder.setBolt(“DruidBolt”, druidBolt, 1)
.shuffleGrouping(“TestSpout”);


Where the DruidTranquilityFactory

public class DruidTranquilityFactory implements BeamFactory<Map<String, Object>>
{
private static final long serialVersionUID = -1491596146457491987L;

    @Override
    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
    final CuratorFramework curator = CuratorFrameworkFactory.newClient(Configuration.ZooKeeperUriDruid, new RetryOneTime(1000));
    curator.start();

    final String dataSource = "TestDs";

    final DruidBeams.Builder<Map<String, Object>, Map<String, Object>> builder = DruidBeams
            .builder(
                    new Timestamper<Map<String, Object>>()
                    {
                        @Override
                        public DateTime timestamp(Map<String, Object> theMap)
                        {
                            return new DateTime(theMap.get("timestamp"), DateTimeZone.UTC);
                        }
                    }
            )
            .curator(curator)
            .discoveryPath("/druid/discovery")
            .location(
                    DruidLocation.create(
                            "druid:prod:overlord",
                            "druid:local:firehose:%s",
                            dataSource
                    )
            )
            .timestampSpec(new TimestampSpec("timestamp", "millis", null))
            .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.NONE))
            .tuning(ClusteredBeamTuning.builder()
                    .segmentGranularity(Granularity.HOUR)
                    .windowPeriod(new Period("PT70M"))
                    .build())
            .druidTuning(DruidTuning.create(100000, new Period("PT1H"), 0));

    final Service<List<Map<String, Object>>, Integer> service = builder.buildJavaService();
    final Beam<Map<String, Object>> beam = builder.buildBeam();
        return beam;
}

}


I allready tried to exlude the storm and zookeeper dependency from the tranquility pom entry, as well as using the version for Scala 2.10 and 2.11!

Any idea what goes wrong?