Samza container shuts down immediately on Tranquility connection to Zookeeper

Hi, I’m running into trouble setting up realtime ingestion with Tranquility 0.3.5 and Samza 0.8. In my test setup, I’m running a Samza stream processor locally using a ThreadJobFactory and my input topics are served from a Kafka/Zookeeper combo running on localhost. I have Druid 0.7.0 running separately in a dedicated cluster that also uses a separate Zookeeper cluster. I’ve implemented a BeamFactory that’s basically identical to the one in the Tranquility README.

The Samza stream processor starts up normally and reads its inputs from localhost-Kafka. After doing some ETL, the stream processor successfully writes to an output Kafka topic on localhost and then attempts to send a message to Druid. The Samza job logs seem to indicate that the code gets past the creation of the CuratorFramework object and then something happens while building the DruidBeam. It looks like during the DruidBeam building process, the Samza stream processor reaches out to the separate Zookeeper cluster (not localhost), successfully connects, and then for some unknown reason the SamzaContainer decides to shut down. No exception is logged anywhere, even at trace log level.

I’ve included log and code snippets below, thanks for any help.

–T

NetflowBeamFactory.java snippet:

Beam ret = null;

try {

ret = DruidBeams

.builder(timestamper)

.curator(curator)

.discoveryPath("/druid/discovery")

.location(DruidLocation.create(“overlord”, “druid:firehose:%s”, dataSource))

.rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.MINUTE))

.tuning(

ClusteredBeamTuning.builder()

.segmentGranularity(Granularity.HOUR)

.windowPeriod(new Period(“PT10M”))

.build()

)

.buildBeam();

} catch (Exception e) {

log.error(ExceptionUtils.getStackTrace(e));

}

log.debug(“Done building beam”);

return ret;

Druid-Samza config:

systems.druid.samza.factory: com.metamx.tranquility.samza.BeamSystemFactory

systems.druid.beam.factory: com.skyportsystems.rt.tranquility.NetflowBeamFactory

systems.druid.beam.batchSize: 1

systems.druid.beam.maxPendingBatches: 1

Samza logs snippet:

2015-04-09 00:08:07 ConnectionStateManager [INFO] State change: CONNECTED

2015-04-09 00:08:09 SamzaContainer [INFO] Shutting down.

2015-04-09 00:08:09 SamzaContainer [INFO] Shutting down consumer multiplexer.

2015-04-09 00:08:09 SystemConsumers [DEBUG] Stopping consumers.

2015-04-09 00:08:09 BrokerProxy [INFO] Shutting down BrokerProxy for tgiuli-sc:9092

2015-04-09 00:08:09 BrokerProxy [INFO] Got interrupt exception in broker proxy thread.

2015-04-09 00:08:09 SamzaContainer [INFO] Shutting down producer multiplexer.

2015-04-09 00:08:09 SystemProducers [DEBUG] Stopping producers.

2015-04-09 00:08:09 Producer [INFO] Shutting down producer

2015-04-09 00:08:09 ProducerPool [INFO] Closing all sync producers

2015-04-09 00:08:09 SyncProducer [INFO] Disconnecting from tgiuli-sc:9092

2015-04-09 00:08:09 SamzaContainer [INFO] Shutting down task instance stream tasks.

2015-04-09 00:08:09 TaskInstance [DEBUG] Skipping stream task shutdown for taskName: Partition 0

2015-04-09 00:08:09 SamzaContainer [INFO] Shutting down task instance stores.

2015-04-09 00:08:09 TaskInstance [DEBUG] Shutting down storage manager for taskName: Partition 0

2015-04-09 00:08:09 TaskStorageManager [DEBUG] Stopping stores.

2015-04-09 00:08:09 SamzaContainer [INFO] Shutting down offset manager.

2015-04-09 00:08:09 OffsetManager [DEBUG] Skipping checkpoint manager shutdown because no checkpoint manager is defined.

2015-04-09 00:08:09 SamzaContainer [INFO] Shutting down metrics reporters.

2015-04-09 00:08:09 SamzaContainer [INFO] Shutting down JVM metrics.

2015-04-09 00:08:09 SamzaContainer [INFO] Shutdown complete.

My bad, it turns out my maven build was messed up and I had not correctly included Tranquility in the Samza assembly pom and I wasn’t catching the NoClassDefFound exception in my logs. After adding a Tranquility dependency to the assembly, I ran into version conflict issues with Jackson and Curator, which I resolved with help from this thread: https://groups.google.com/forum/#!topic/druid-development/eIiuSS-fM8I. After this, I’m successfully pushing data from Samza into Druid through Tranquility.

Thanks,

–T

Hey TJ, I’m kind of curious, what did you have to tweak to get things working with Jackson and Curator? I’m wondering if we should change something in the tranquility package to make life easier for people, or if the conflict was due to some other thing you had included (which we wouldn’t be able to do much about, other than maybe add a FAQ).

Ok, with a little bit of poking, it looks like the issue arose from jackson-jaxrs version 1.8.5. I had originally based my Samza build off of the samza-hello-samza project’s pom.xml, which depends on 1.8.5. To fix my issue, I removed the dependency on 1.8.5 and added:

org.codehaus.jackson

jackson-jaxrs

1.9.13

org.codehaus.jackson

jackson-xc

1.9.13

org.codehaus.jackson

jackson-core-asl

1.9.13

org.codehaus.jackson

jackson-mapper-asl

1.9.13

I admit that this is a bit of voodoo, I was following along Deepak Jain’s findings from this post: https://groups.google.com/d/msg/druid-development/eIiuSS-fM8I/pO3I0hxgjdoJ.

Thanks, TJ. I added a “troubleshooting” section to the readme and put the detail from Deepak’s thread there: https://github.com/metamx/tranquility#im-getting-strange-jackson-or-curator-exceptions