Need help with Spark Streaming to Tranquility

Hi,

I am getting 500 internal server errors when connecting from spark streaming to tranquility. Can anyone please help?

I have spark streaming, tranquility and druid all set up locally.

My dependencies:

“io.druid” % “tranquility-core_2.11” % “0.8.2”

“io.druid” % “tranquility-spark_2.11” % “0.8.2”,

My SimpleEventBeamFactory:

object SimpleEventBeamFactory

{

val BeamInstance: Beam[SimpleEvent] = {

// Tranquility uses ZooKeeper (through Curator framework) for coordination.

val curator = CuratorFrameworkFactory.newClient(

“localhost:2181”,

new BoundedExponentialBackoffRetry(100, 3000, 5)

)

curator.start()

val indexService = “druid/overlord” // Your overlord’s druid.service, with slashes replaced by colons.

val discoveryPath = “/druid/discovery” // Your overlord’s druid.discovery.curator.path

val dataSource = “goo”

val dimensions = IndexedSeq(“bar”)

val aggregators = Seq(new LongSumAggregatorFactory(“foo”, “foo”))

// Expects simpleEvent.timestamp to return a Joda DateTime object.

DruidBeams

.builder((simpleEvent: SimpleEvent) => simpleEvent.ts)

.curator(curator)

.discoveryPath(discoveryPath)

.location(DruidLocation.create(indexService, dataSource))

.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.fromString(“MINUTE”)))

.tuning(

ClusteredBeamTuning(

segmentGranularity = Granularity.MINUTE,

windowPeriod = new Period(“PT10M”),

partitions = 1,

replicants = 1

)

)

.buildBeam()

}

}

log:

Creating druid indexing task (service = druid:overlord): {

“type” : “index_realtime”,

“id” : “index_realtime_goo_2017-02-16T17:39:00.000Z_0_0”,

“resource” : {

“availabilityGroup” : “goo-2017-02-16T17:39:00.000Z-0000”,

“requiredCapacity” : 1

},

“spec” : {

“dataSchema” : {

“dataSource” : “goo”,

“parser” : {

“type” : “map”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “timestamp”,

“format” : “iso”,

“missingValue” : null

},

“dimensionsSpec” : {

“dimensions” : [ “foo” ],

“spatialDimensions” :

}

}

},

“metricsSpec” : [ {

“type” : “longSum”,

“name” : “bar”,

“fieldName” : “bar”

} ],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “MINUTE”,

“queryGranularity” : {

“type” : “duration”,

“duration” : 60000,

“origin” : “1969-12-31T16:00:00.000-08:00”

}

}

},

“ioConfig” : {

“type” : “realtime”,

“plumber” : null,

“firehose” : {

“type” : “clipped”,

“interval” : “2017-02-16T17:39:00.000Z/2017-02-16T17:40:00.000Z”,

“delegate” : {

“type” : “timed”,

“shutoffTime” : “2017-02-16T17:55:00.000Z”,

“delegate” : {

“type” : “receiver”,

“serviceName” : “firehose:druid:overlord:goo-159-0000-0000”,

“bufferSize” : 100000

}

}

}

},

“tuningConfig” : {

“shardSpec” : {

“type” : “linear”,

“partitionNum” : 0

},

“rejectionPolicy” : {

“type” : “none”

},

“buildV9Directly” : false,

“maxPendingPersists” : 0,

“intermediatePersistPeriod” : “PT10M”,

“windowPeriod” : “PT10M”,

“type” : “realtime”,

“maxRowsInMemory” : 75000

}

}

}

Exception message:

“exceptionMessage”:“Failed to send request to task[index_realtime_goo_2017-02-16T17:39:00.000Z_0_0]: 500 Internal Server Error”}

Thanks!