[Tranquility][Spark-Streaming] Error on DruidBeams builder

Hey guys,

I try to ingest some data with tranquility from sparkStreaming but i’m facing an issue.

16/08/19 11:41:11 INFO ClientCnxn: Opening socket connection to server :2181. Will not attempt to authenticate using SASL (unknown error)

16/08/19 11:41:11 ERROR Executor: Exception in task 0.0 in stage 20.0 (TID 212)

java.lang.NoSuchMethodError: io.druid.data.input.impl.TimestampSpec.(Ljava/lang/String;Ljava/lang/String;Lorg/joda/time/DateTime;)V

at com.metamx.tranquility.druid.DruidBeams$.(DruidBeams.scala:106)

at com.metamx.tranquility.druid.DruidBeams$.(DruidBeams.scala)

at com.atinternet.certifications.card.SimpleEventBeamFactory$.(SimpleEventBeamFactory.scala:36)

at com.atinternet.certifications.card.SimpleEventBeamFactory$.(SimpleEventBeamFactory.scala)

at com.atinternet.certifications.card.SimpleEventBeamFactory.makeBeam(SimpleEventBeamFactory.scala:16)

``

And here’s my SimpleEventBeamFactory class:

13 class SimpleEventBeamFactory extends BeamFactory[Event]
14 {
15    // Return a singleton, so the same connection is shared across all tasks in the same JVM.
16    def makeBeam: Beam[Event] = SimpleEventBeamFactory.BeamInstance
17 }
18
19 object SimpleEventBeamFactory
20 {
21   val BeamInstance: Beam[Event] = {
22     // Tranquility uses ZooKeeper (through Curator framework) for coordination.
23     val curator = CuratorFrameworkFactory.newClient(
24       "<zookeeperIp>:2181",
25       new BoundedExponentialBackoffRetry(100, 3000, 5)
26     )
27     curator.start()
28
29     val indexService = "druid/overlord" // Your overlord's druid.service, with slashes replaced by colons.
30     val discoveryPath = "/druid/discovery"     // Your overlord's druid.discovery.curator.path
31     val dataSource = "testStream"
32     val dimensions = IndexedSeq("site","date")
33     val aggregators = Seq(new HyperUniquesAggregatorFactory("visitors", "emitterId"))
34
35   // Expects simpleEvent.timestamp to return a Joda DateTime object.
36    DruidBeams
37       .builder((simpleEvent: Event) => DateTime.now)
38       .curator(curator)
39       .discoveryPath(discoveryPath)
40       .location(DruidLocation.create(indexService, dataSource))
41       .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
42       .tuning(
43         ClusteredBeamTuning(
44           segmentGranularity = Granularity.HOUR,
45           windowPeriod = new Period("PT10M"),
46           partitions = 1,
47           replicants = 1
48         )
49       )
50       .buildBeam()
51   }
52 }

``

As you can see on line 37:

.builder((simpleEvent: Event) => DateTime.now)

``

I used DateTime.now to be sure it’s not a problem with my event.getTimespamp, but the error remains.

Any ideas ?!

Thanks,

Ben

Even when i try to run https://github.com/metamx/tranquility/blob/master/core/src/test/java/com/metamx/tranquility/example/JavaExample.java example, i got the same error on the builder !

It seems like druid-api package was not good.
So i added maven dependency:

<dependency>
 <groupId>io.druid</groupId>
 <artifactId>druid-api</artifactId>
 <version>0.3.17</version>
</dependency>

And the error is gone. Even with version 0.9.1.1

But another one appears:

Exception in thread “main” java.lang.NoSuchMethodError: org.apache.curator.utils.PathUtils.validatePath(Ljava/lang/String;)V
at org.apache.curator.framework.recipes.locks.LockInternals.(LockInternals.java:103)
at org.apache.curator.framework.recipes.locks.InterProcessMutex.(InterProcessMutex.java:190)
at org.apache.curator.framework.recipes.locks.InterProcessMutex.(InterProcessMutex.java:64)
at org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2.(InterProcessSemaphoreV2.java:122)
at org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2.(InterProcessSemaphoreV2.java:106)
at org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex.(InterProcessSemaphoreMutex.java:40)
at com.metamx.tranquility.beam.ClusteredBeam.(ClusteredBeam.scala:135)
at com.metamx.tranquility.druid.DruidBeams$Builder.buildBeam(DruidBeams.scala:758)
at com.metamx.tranquility.druid.DruidBeams$Builder.buildTranquilizer(DruidBeams.scala:821)
at com.atinternet.certifications.card.JavaExample.main(JavaExample.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

On line:


.buildTranquilizer();

So actually i got:

<dependency>
    <groupId>io.druid</groupId>
    <artifactId>tranquility-spark_${scala.binary.version}</artifactId>
    <version>0.8.2</version>
</dependency>

<dependency>
    <groupId>io.druid</groupId>
    <artifactId>druid-api</artifactId>
    <version>0.3.17</version>
</dependency>

Any idea what's missing ?!


Hey Ben,

Looks like a dependency conflict to me. The signature for PathUtils.validatePath() changed between Curator 2.6 and 2.7, and Tranquility is currently using 2.9.1. Do you have something pulling in a Curator <= 2.6?

Hi Benjamin:
**** I have got JSON streaming data in spark streaming. how should i send them to druid? I am a starter from China! May i see the “Event” class?

在 2016年8月19日星期五 UTC+8下午5:43:40,Benjamin Angelaud写道: