Error while inserting data into Druid Using Tranquility

Hi,

I am trying to insert data into druid using the following code :

package druid;

import com.google.common.collect.ImmutableList;

import com.google.common.collect.ImmutableMap;

import com.metamx.common.Granularity;

import com.metamx.common.logger.Logger;

import com.metamx.tranquility.beam.ClusteredBeamTuning;

import com.metamx.tranquility.druid.DruidBeams;

import com.metamx.tranquility.druid.DruidDimensions;

import com.metamx.tranquility.druid.DruidLocation;

import com.metamx.tranquility.druid.DruidRollup;

import com.metamx.tranquility.tranquilizer.Tranquilizer;

import com.metamx.tranquility.typeclass.Timestamper;

import com.twitter.util.Await;

import com.twitter.util.Future;

import io.druid.data.input.impl.TimestampSpec;

import io.druid.granularity.QueryGranularity;

import io.druid.query.aggregation.AggregatorFactory;

import io.druid.query.aggregation.CountAggregatorFactory;

import io.druid.query.aggregation.LongSumAggregatorFactory;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.joda.time.DateTime;

import org.joda.time.DateTimeZone;

import org.joda.time.Period;

import scala.runtime.BoxedUnit;

import java.util.List;

import java.util.Map;

public class JavaExample {

private static final Logger log = new Logger(JavaExample.class);

public static void main(String args) {

final String indexService = “druid/overlord”; // Your overlord’s druid.service

final String discoveryPath = “/druid/discovery”; // Your overlord’s

// druid.discovery.curator.path

final String dataSource = “foo5”;

final List dimensions = ImmutableList.of(“bar”, “qux”);

final List aggregators = ImmutableList.of(new CountAggregatorFactory(“cnt”), new LongSumAggregatorFactory(“baz”, “baz”));

// Tranquility needs to be able to extract timestamps from your object type (in this case,

// Map<String, Object>).

final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() {

@Override

public DateTime timestamp(Map<String, Object> theMap) {

return new DateTime(theMap.get(“timestamp”));

}

};

// Tranquility uses ZooKeeper (through Curator) for coordination.

final CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(“localhost:2181”)

.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)).build();

curator.start();

// The JSON serialization of your object must have a timestamp field in a format that Druid

// understands. By default,

// Druid expects the field to be called “timestamp” and to be an ISO8601 timestamp.

final TimestampSpec timestampSpec = new TimestampSpec(“timestamp”, “auto”, null);

// Tranquility needs to be able to serialize your object type to JSON for transmission to

// Druid. By default this is

// done with Jackson. If you want to provide an alternate serializer, you can provide your

// own via .objectWriter(...).

// In this case, we won’t provide one, so we’re just using Jackson.

final Tranquilizer<Map<String, Object>> druidService = DruidBeams

.builder(timestamper)

.curator(curator)

.discoveryPath(discoveryPath)

.location(DruidLocation.create(indexService, dataSource))

.timestampSpec(timestampSpec)

.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularity.MINUTE))

.tuning(ClusteredBeamTuning.builder().segmentGranularity(Granularity.HOUR).windowPeriod(new Period(“PT10M”)).partitions(1)

.replicants(1).build()).buildTranquilizer();

// System.out.println(druidService.toString());

druidService.start();

try {

// Build a sample event to send; make sure we use a current date

Map<String, Object> obj = ImmutableMap.<String, Object> of(“timestamp”, new DateTime(DateTimeZone.UTC).toString(), “bar”, “barVal”,

“qux”, “quxVal”, “baz”, 3);

// Send event to Druid:

final Future future = druidService.send(obj);

// Wait for confirmation:

Await.result(future);

}

catch (Exception e) {

// e.printStackTrace();

log.warn(e, “Failed to send message”);

}

finally {

// Close objects:

druidService.stop();

curator.close();

}

}

}

Dependency Used :

io.druid

tranquility-core_2.11

0.7.2

org.scala-lang

scala-library

2.11.5

And this is the error I am getting :

27 Jan 2016 17:46:48,509 2824 [ClusteredBeam-ZkFuturePool-6e5ccd5e-b70c-41cc-a38f-676a3e501a76] WARN com.metamx.tranquility.tranquilizer.Tranquilizer - Failed to send 1 messages.

java.util.IllegalFormatConversionException: d != scala.collection.immutable.Vector

at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)

at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)

at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)

at java.util.Formatter.format(Formatter.java:2488)

at java.util.Formatter.format(Formatter.java:2423)

at java.lang.String.format(String.java:2792)

at scala.collection.immutable.StringLike$class.format(StringLike.scala:293)

at scala.collection.immutable.StringOps.format(StringOps.scala:30)

at org.eintr.loglady.Logger.trace(Logger.scala:74)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$2$$anonfun$apply$4.apply(DruidBeam.scala:89)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$2$$anonfun$apply$4.apply(DruidBeam.scala:77)

at scala.Option$WithFilter.map(Option.scala:207)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$2.apply(DruidBeam.scala:77)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$2.apply(DruidBeam.scala:76)

at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)

at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)

at scala.collection.immutable.List.foreach(List.scala:381)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)

at scala.collection.immutable.List.flatMap(List.scala:344)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4.apply(DruidBeam.scala:76)

at com.metamx.tranquility.druid.DruidBeam$$anonfun$4.apply(DruidBeam.scala:75)

at scala.collection.TraversableLike$WithFilter$$anonfun$flatMap$2.apply(TraversableLike.scala:759)

at scala.collection.immutable.List.foreach(List.scala:381)

at scala.collection.TraversableLike$WithFilter.flatMap(TraversableLike.scala:758)

at com.metamx.tranquility.druid.DruidBeam.sendBatch(DruidBeam.scala:75)

at com.metamx.tranquility.beam.MergingPartitioningBeam$$anonfun$2.apply(MergingPartitioningBeam.scala:42)

at com.metamx.tranquility.beam.MergingPartitioningBeam$$anonfun$2.apply(MergingPartitioningBeam.scala:40)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)

at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at com.metamx.tranquility.beam.MergingPartitioningBeam.sendBatch(MergingPartitioningBeam.scala:40)

at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$26$$anonfun$apply$15.apply(ClusteredBeam.scala:381)

at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$26$$anonfun$apply$15.apply(ClusteredBeam.scala:379)

at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:894)

at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:893)

at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:112)

at com.twitter.util.Promise$Transformer.k(Promise.scala:112)

at com.twitter.util.Promise$Transformer.apply(Promise.scala:122)

at com.twitter.util.Promise$Transformer.apply(Promise.scala:103)

at com.twitter.util.Promise$$anon$1.run(Promise.scala:381)

at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:178)

at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:136)

at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:207)

at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:92)

at com.twitter.util.Promise.runq(Promise.scala:350)

at com.twitter.util.Promise.updateIfEmpty(Promise.scala:721)

at com.twitter.util.ExecutorServiceFuturePool$$anon$2.run(FuturePool.scala:107)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.run(FutureTask.java:262)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

My config files :

middlemanager :

druid.host=localhost

druid.service=middleManager

druid.port=8100

peon configuration

druid.indexer.fork.property.druid.processing.buffer.sizeBytes=536870912

druid.indexer.fork.property.druid.processing.numThreads=2

druid.indexer.fork.property.druid.segmentCache.locations=[{“path”: “/tmp/persistent/zk_druid”, “maxSize”: 0}]

druid.indexer.fork.property.druid.server.http.numThreads=50

Resources for peons

#druid.indexer.runner.javaOpts=-server -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

druid.indexer.task.baseTaskDir=/tmp/persistent/task/

druid.worker.capacity=default (Number of available processors - 1)

druid.worker.ip=localhost

druid.indexer.runner.javaOpts="-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"

druid.indexer.task.chathandler.type=announce

#druid.indexer.task.baseTaskDir=/persistent

Overlord :

Default host: localhost. Default port: 8090. If you run each node type on its own node in production, you should override these values to be IP:8080

druid.host=localhost

druid.port=8090

druid.service=druid/overlord

druid.discovery.curator.path=/druid/discovery

Run the overlord in local mode with a single peon to execute tasks

This is not recommended for production.

druid.indexer.queue.startDelay=PT0M

This setting is too small for real production workloads

druid.indexer.runner.javaOpts=-server -Xmx256m

These settings are also too small for real production workloads

Please see our recommended production settings in the docs (http://druid.io/docs/latest/Production-Cluster-Configuration.html)

druid.indexer.fork.property.druid.processing.numThreads=1

druid.indexer.fork.property.druid.computation.buffer.size=100000000

Uncomment following property if you are running Middle Manager

druid.indexer.runner.type=remote

coordinator :

druid.host=localhost

druid.port=8080

druid.service=coordinator

The coordinator begins assignment operations after the start delay.

We override the default here to start things up faster for examples.

In production you should use PT5M or PT10M

druid.coordinator.startDelay=PT70s

Historical :

druid.host=localhost

druid.port=8083

druid.service=historical

Our intermediate buffer is also very small so longer topNs will be slow.

In prod: set sizeBytes = 512mb

druid.processing.buffer.sizeBytes=100000000

We can only 1 scan segment in parallel with these configs.

In prod: set numThreads = # cores - 1

druid.processing.numThreads=1

maxSize should reflect the performance you want.

Druid memory maps segments.

memory_for_segments = total_memory - heap_size - (processing.buffer.sizeBytes * (processing.numThreads+1)) - JVM overhead (~1G)

The greater the memory/disk ratio, the better performance you should see

druid.segmentCache.locations=[{“path”: “/tmp/druid/indexCache”, “maxSize”: 10000000000}]

druid.server.maxSize=10000000000

Broker :

druid.host=localhost

druid.port=8082

druid.service=broker

We enable using the local query cache here

druid.broker.cache.useCache=true

druid.broker.cache.populateCache=true

For prod: set numThreads = # cores - 1, and sizeBytes to 512mb

druid.processing.buffer.sizeBytes=100000000

druid.processing.numThreads=1

common :

druid.extensions.coordinates=[“io.druid.extensions:druid-examples”,“io.druid.extensions:druid-kafka-eight”]

druid.extensions.localRepository=extensions-repo

Zookeeper

druid.zk.service.host=localhost

Metadata Storage (use something like mysql in production by uncommenting properties below)

by default druid will use derby

druid.metadata.storage.type=mysql

druid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid

druid.metadata.storage.connector.user=druid

druid.metadata.storage.connector.password=diurd

Deep storage (local filesystem for examples - don’t use this in production)

druid.storage.type=local

druid.storage.storageDirectory=/home/ritesh/druid-0.8.2/localstorage

Query Cache (we use a simple 10mb heap-based local cache on the broker)

druid.cache.type=local

druid.cache.sizeInBytes=10000000

Indexing service discovery

druid.selectors.indexing.serviceName=druid:overlord

Monitoring (disabled for examples, if you enable SysMonitor, make sure to include sigar jar in your cp)

druid.monitoring.monitors=[“com.metamx.metrics.SysMonitor”,“com.metamx.metrics.JvmMonitor”]

Metrics logging (disabled for examples - change this to logging or http in production)

druid.emitter=noop

What am i missing ?

Hey Ritesh,

This looks like a bug that is triggered when TRACE logging is enabled. Could you bump up to DEBUG logging (at least for the com.metamx.tranquility package) for now?

We will fix this ASAP.