Error while sending data to druid through tranqulity

Hi,

I am learning druid. I am able to insert and query data through command line.Now, I am trying to insert data using tranquility.I see we use overlord and middle manager instead of realtime node. I got a code, but it throws the following error.I am not very sure how to handle this.Any insight will be helpful.

Exception in thread “main” java.lang.NoSuchMethodError: scala.util.matching.Regex.unapplySeq(Ljava/lang/CharSequence;)Lscala/Option;

at com.fasterxml.jackson.module.scala.JacksonModule$.version$lzycompute(JacksonModule.scala:30)

at com.fasterxml.jackson.module.scala.JacksonModule$.version(JacksonModule.scala:26)

at com.fasterxml.jackson.module.scala.JacksonModule$class.version(JacksonModule.scala:49)

at com.fasterxml.jackson.module.scala.DefaultScalaModule.version(DefaultScalaModule.scala:19)

at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:701)

at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:70)

at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:68)

at com.metamx.common.scala.Predef$EffectOps.withEffect(Predef.scala:44)

at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:67)

at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10)

at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:64)

at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10)

at com.metamx.common.scala.Jackson$class.init(Jackson.scala:14)

at com.metamx.common.scala.Jackson$.(Jackson.scala:10)

at com.metamx.common.scala.Jackson$.(Jackson.scala)

at com.metamx.tranquility.druid.DruidBeams$BuilderConfig$$anon$6.(DruidBeams.scala:260)

at com.metamx.tranquility.druid.DruidBeams$BuilderConfig.buildAll(DruidBeams.scala:259)

at com.metamx.tranquility.druid.DruidBeams$Builder.buildBeam(DruidBeams.scala:182)

at com.metamx.tranquility.druid.DruidBeams$Builder.buildService(DruidBeams.scala:228)

at com.metamx.tranquility.druid.DruidBeams$Builder.buildJavaService(DruidBeams.scala:232)

at MyTranquilityTest.main(MyTranquilityTest.java:84)

This is the dependency i am using :

io.druid

tranquility-core_2.11

0.6.4

I have started coordinator,broker, historical , overlord and middle manager node.Do, i need to do some other steps as well?

Do i need to have mysql ?

This is the code :

import java.io.IOException;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import com.google.common.collect.ImmutableList;

import com.metamx.common.Granularity;

import com.metamx.tranquility.beam.ClusteredBeamTuning;

import com.metamx.tranquility.druid.*;

import com.metamx.tranquility.typeclass.Timestamper;

import com.twitter.finagle.Service;

import com.twitter.util.Await;

import com.twitter.util.Future;

import io.druid.granularity.QueryGranularity;

import io.druid.query.aggregation.AggregatorFactory;

import io.druid.query.aggregation.CountAggregatorFactory;

import io.druid.query.aggregation.DoubleSumAggregatorFactory;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.codehaus.jackson.map.JsonMappingException;

import org.codehaus.jackson.map.ObjectMapper;

import org.joda.time.DateTime;

import org.joda.time.Period;

public class MyTranquilityTest {

final static String indexService = “overlord”;

final static String firehosePattern = “firehose:%s”;

final static String discoveryPath = “/discovery”;

final static String dataSource = “tranq_test”;

final static List dimensions = ImmutableList.of(“campaignid”,

“region”);

final static List aggregators = ImmutableList.of(

new CountAggregatorFactory(“count”),

new DoubleSumAggregatorFactory(“impressions”, “impressions”),

new DoubleSumAggregatorFactory(“clicks”, “clicks”));

@SuppressWarnings(“serial”)

final static Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() {

@Override

public DateTime timestamp(Map<String, Object> theMap) {

return new DateTime(theMap.get(“date”));

}

};

public static void main(String args) {

System.out.println(“Start of Tranquility”);

try {

final CuratorFramework curator = CuratorFrameworkFactory.builder()

.connectString(“localhost:2181”)

.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))

.build();

curator.start();

final Service<List<Map<String, Object>>, Integer> druidService = DruidBeams

.builder(timestamper)

.curator(curator)

.discoveryPath(discoveryPath)

.location(

DruidLocation.create(indexService, firehosePattern,

dataSource))

.rollup(DruidRollup.create(

DruidDimensions.specific(dimensions), aggregators,

QueryGranularity.DAY))

.tuning(ClusteredBeamTuning.create(Granularity.DAY,

new Period(“PT0.5S”), new Period(“P1D”), 1, 1))

.buildJavaService();

String json = "[{ “campaignid”:“1”, “date”:“2014-10-01”, “region”:“us”, “impressions”:100, “clicks”:10 }, { “campaignid”:“1”, “date”:“2014-10-02”, “region”:“us”, “impressions”:100, “clicks”:9 }, { “campaignid”:“2”, “date”:“2014-10-03”, “region”:“uk”, “impressions”:110, “clicks”:1 }, { “campaignid”:“3”, “date”:“2014-10-02”, “region”:“ca”, “impressions”:100, “clicks”:20 }] ";

List<Map<String, Object>> value = convertAJsonToAListMap(json);

System.out.println(“value:” + value);

// Send events to Druid:

final Future numSentFuture = druidService.apply(value);

// Wait for confirmation:

final Integer numSent = Await.result(numSentFuture);

System.out.println(“numSent:” + numSent + “, numSentFuture:”

  • numSentFuture);

// Close lifecycled objects:

Await.result(druidService.close());

curator.close();

System.out.println(“End of Tranquility”);

} catch (Exception e) {

// Log

System.out.println(“Message:” + e.getMessage());

}

}

@SuppressWarnings(“unchecked”)

public static List<Map<String, Object>> convertAJsonToAListMap(String json) {

List<Map<String, Object>> list = new LinkedList<Map<String, Object>>();

List<Map<String, Object>> listSample = new LinkedList<Map<String, Object>>();

ObjectMapper mapper = new ObjectMapper();

try {

list = mapper.readValue(json, listSample.getClass());

} catch (JsonMappingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

return list;

}

}

overlord property :

druid.host=localhost

druid.port=8090

druid.service=overlord

druid.discovery.curator.path=/discovery

druid.indexer.queue.startDelay=PT0M

druid.indexer.runner.javaOpts=-server -Xmx256m

druid.indexer.fork.property.druid.processing.numThreads=1

druid.indexer.fork.property.druid.computation.buffer.size=100000000

druid.indexer.runner.type=remote

middle manager property :

druid.host=localhost

druid.service=middleManager

druid.port=8100

peon configuration

druid.indexer.fork.property.druid.processing.buffer.sizeBytes=536870912

druid.indexer.fork.property.druid.processing.numThreads=2

druid.indexer.fork.property.druid.segmentCache.locations=[{“path”: “/tmp/persistent/zk_druid”, “maxSize”: 0}]

druid.indexer.fork.property.druid.server.http.numThreads=50

Resources for peons

druid.indexer.runner.javaOpts=-server -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

druid.indexer.task.baseTaskDir=/tmp/persistent/task/

druid.worker.capacity=default (Number of available processors - 1)

druid.worker.ip=localhost

I am able to fix the error.But the data does not get inserted into druid.It doesn’t throw any error as well.

final Future numSentFuture = druidService.apply(value);

// Wait for confirmation:

final Integer numSent = Await.result(numSentFuture);

System.out.println(“numSent:” + numSent + “, numSentFuture:”

  • numSentFuture);

This is the result i am getting when i print numset and numsentfuture.

numSent:0, numSentFuture:Promise@1494089143(state=Done(Return(0)))

Can anyone point what i am missing.

Hey Ritesh,

Could you post the full Tranquility logs?

Also, do you have a common.runtime.properties file loaded by the middle manager? Make sure that you set ‘druid.selectors.indexing.serviceName=overlord’ since you’ve changed the default name of druid.service on the overlord.

Hi David ,

Thanks for replying.

druid.selectors.indexing.serviceName is set to overlord.

This is my common.runtime.properties.

common.runtime.properties (2.02 KB)

###Broker runtime.properties (5.79 KB)

Hi David,

On digging deep i found out this error .

2015-12-17T08:45:54,229 ERROR [task-runner-0] io.druid.curator.discovery.ServerDiscoverySelector - No server instance found

2015-12-17T08:45:54,229 WARN [task-runner-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Exception submitting action for task[index_realtime_foo_2015-12-17T08:00:00.000Z_0_0]

java.io.IOException: Failed to locate service uri

at io.druid.indexing.common.actions.RemoteTaskActionClient.submit(RemoteTaskActionClient.java:91) [druid-indexing-service-0.8.2.jar:0.8.2]

at io.druid.indexing.common.task.AbstractTask.getTaskLocks(AbstractTask.java:184) [druid-indexing-service-0.8.2.jar:0.8.2]

at io.druid.indexing.common.task.RealtimeIndexTask.run(RealtimeIndexTask.java:168) [druid-indexing-service-0.8.2.jar:0.8.2]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:221) [druid-indexing-service-0.8.2.jar:0.8.2]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:200) [druid-indexing-service-0.8.2.jar:0.8.2]

at java.util.concurrent.FutureTask.run(FutureTask.java:262) [?:1.7.0_91]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_91]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_91]

at java.lang.Thread.run(Thread.java:745) [?:1.7.0_91]

Caused by: com.metamx.common.ISE: Cannot find instance of indexer to talk to!

at io.druid.indexing.common.actions.RemoteTaskActionClient.getServiceInstance(RemoteTaskActionClient.java:165) ~[druid-indexing-service-0.8.2.jar:0.8.2]

at io.druid.indexing.common.actions.RemoteTaskActionClient.submit(RemoteTaskActionClient.java:86) ~[druid-indexing-service-0.8.2.jar:0.8.2]

… 8 more

2015-12-17T08:45:54,238 INFO [task-runner-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Will try again in [PT70.993S].

2015-12-17T08:47:05,231 ERROR [task-runner-0] io.druid.curator.discovery.ServerDiscoverySelector - No server instance found

Hey Ritesh,

Pretty sure your issue is with druid.discovery.curator.path. It’s being set as /discovery on the overlord but is the default of /druid/discovery on the middle manager since it hasn’t been set. I would recommend removing it from the overlord/runtime.properties and moving it into common.runtime.properties so it can be shared by all services.

Hi David ,

Still no luck with the mentioned changes and when we start service below :

java cat conf-quickstart/druid/historical/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/historical:lib/*” io.druid.cli.Main server historical

I am still getting this error :

ERROR [main] io.druid.curator.discovery.ServerDiscoverySelector - No server instance found for [druid/coordinator]

any help will be appreciated.

thanks

Finally found the problem with the sequence of process startup .

I did below sequence than the order mentioned in the document and everything loaded fine :

nohup java cat conf-quickstart/druid/coordinator/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/coordinator:lib/*” io.druid.cli.Main server coordinator >> coordinator.log 2>&1 &

nohup java cat conf-quickstart/druid/overlord/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/overlord:lib/*” io.druid.cli.Main server overlord >> overlord.log 2>&1 &

nohup java cat conf-quickstart/druid/historical/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/historical:lib/*” io.druid.cli.Main server historical >> historical.log 2>&1 &

nohup java cat conf-quickstart/druid/broker/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/broker:lib/*” io.druid.cli.Main server broker >> broker.log 2>&1 &

nohup java cat conf-quickstart/druid/middleManager/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/middleManager:lib/*” io.druid.cli.Main server middleManager >> middlemanager.log 2>&1 &

nohup java cat conf-quickstart/druid/realtime/jvm.config | xargs -cp “conf-quickstart/druid/_common:conf-quickstart/druid/realtime:lib/*” io.druid.cli.Main server realtime >> realtime.log 2>&1 &

thanks