spark tranquility overlord

the following is my test code, i have no idea why can’t ingested and no exception in the log , any mistakes?

object DruidSpark {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName(“DruidSpark”)

.setMaster(“local[4]”)

val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(5))

val zkConnect = “localhost:2181” // zookeeper

val inputs = ListBufferSimpleEvent

for (index <- 1 to 1000) {

val simpleEvent = SimpleEvent.fromMap(MsgUtil.makeMsg().map(kv => (kv._1, kv._2)).toMap)

inputs.append(simpleEvent)

}

println(“inputs:” + inputs.size)

val lines = mutable.QueueRDD[SimpleEvent]

lines += sc.makeRDD(inputs, 2)

//lines.foreach( line => line.propagate(new SimpleEventBeamFactory(zkConnect)))

//TimeUnit.SECONDS.sleep(100000)

val dstream = ssc.queueStream(lines)

dstream.foreachRDD(rdd => rdd.propagate(new SimpleEventBeamFactory(zkConnect)))

ssc.start()

ssc.awaitTermination()

}

class SimpleEventBeamFactory(zkConnect: String) extends BeamFactory[SimpleEvent] {

override def makeBeam: Beam[SimpleEvent] = {

val curator = CuratorFrameworkFactory.newClient(

zkConnect,

new BoundedExponentialBackoffRetry(100, 1000, 5)

)

curator.start()

val timekeeper = new SystemTimekeeper

val indexService = “overlord”

val discoveryPath = “discovery”

val dataSource = “wikipedia”

val dimensions = IndexedSeq(“page”,

“language”, “user”, “unpatrolled”, “newPage”,

“robot”, “anonymous”, “namespace”, “continent”,

“country”, “region”, “city”)

val aggregators = Seq(new CountAggregatorFactory(“count”),

new DoubleSumAggregatorFactory(“added”, “added”),

new DoubleSumAggregatorFactory(“deleted”, “deleted”),

new DoubleSumAggregatorFactory(“delta”, “delta”))

val tuning = ClusteredBeamTuning(Granularity.MINUTE,

windowPeriod = new Period(“PT10M”))

val rollup = DruidRollup(

SpecificDruidDimensions(dimensions),

aggregators,

QueryGranularity.MINUTE

)

val druidEnvironment = DruidEnvironment(indexService)

val druidLocation = new DruidLocation(druidEnvironment, dataSource)

val timeFn = (event: SimpleEvent) => new DateTime(event.toMap(“timestamp”))

val builder = DruidBeams.builderSimpleEvent

.curator(curator)

.location(druidLocation)

.rollup(rollup)

.tuning(tuning)

.discoveryPath(discoveryPath)

.timekeeper(timekeeper)

.timestampSpec(new TimestampSpec(“timestamp”, “yyyy-MM-dd’T’HH:mm:ss.SSS’Z’”, null))

builder.buildBeam()

}

}

}