Data pushed to Druid using Tranquility but no datasource is created

Hello,

I am trying to push data to Druid using Tranquility in my Flink streaming application.

Here is the configuration:

  • Flink version 1.1.2

  • Tranquility version 0.8.2

  • Druid version 0.9.1.1

I create fake objects, then I transform them into a HashMap and then I use a custom Beam to send them to Druid via tranquility.

Here is my Flink stream code :

public class SlimDruidStream {

 public static void main(String[] args) throws Exception {

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 SimpleObject o;

 List<SimpleObject> list = new ArrayList<SimpleObject>();

 for(int i = 0 ; i < 3 ; i++) {
   o = new SimpleObject(System.currentTimeMillis(), i,"dimValue"+i);
   list.add(o);
 }

 DataStreamSource<SimpleObject> mapSource = env.fromCollection(list);

 DataStream<Map<String,Object>> mapStream = mapSource.map(new MapFunction<SimpleObject, Map<String,Object>>() {
   public Map<String,Object> map(SimpleObject simpleObj) throws Exception {
     Map<String,Object> result = new HashMap<>();
     result.put("timestamp", simpleObj.getTimestamp());
     result.put("dimValue", simpleObj.getDimValue());
     result.put("numValue", simpleObj.getNumValue());

     return result;
   }
 });

 mapStream.addSink(new BeamSink(new SimpleBeamFactory("test-flink"), true));

 env.execute();

 }
}

Here is my SimpleObject class :

public class SimpleObject {

 private Long timestamp;
 private Integer numValue;
 private String dimValue;

 public SimpleObject() {
 }

 public SimpleObject(Long timestamp, Integer numValue, String dimValue) {
  this.timestamp = timestamp;
  this.numValue = numValue;
  this.dimValue = dimValue;
 }

 public Long getTimestamp() {
  return timestamp;
 }

 public void setTimestamp(Long timestamp) {
  this.timestamp = timestamp;
 }

 public Integer getNumValue() {
  return numValue;
 }

 public void setNumValue(Integer numValue) {
  this.numValue = numValue;
 }

 public String getDimValue() {
  return dimValue;
 }

 public void setDimValue(String dimValue) {
  this.dimValue = dimValue;
 }

}

``

Here is my SimpleBeam Factory code:

public class SimpleBeamFactory implements BeamFactory {

 final protected String datasource;
 final protected String druidZkHosts = "127.0.0.1:2181";
 final protected String druidZkDiscoveryPath = "/druid/discovery";
 final protected String druidIndexServiceName = "druid/overlord";
 final protected String druidFirehosePattern = "firehose:%s";
 final protected int numReplicants = 1;
 final protected int numPartitions = 1;

 public SimpleBeamFactory(String datasource) {
  this.datasource = datasource;
 }

 public Tranquilizer<Map<String,Object>> tranquilizer(){
  Tranquilizer<Map<String,Object>> t = Tranquilizer.create(makeBeam());
  t.start();
  return t;
 }

 public Beam makeBeam() {

 final List<String> dimensions = ImmutableList.of(
   "dimValue"
 );

 final List<AggregatorFactory> aggregators = ImmutableList.of(
  new CountAggregatorFactory("agg_count"),
  new HyperUniquesAggregatorFactory("agg_distinct_dimValue", "dimValue")
 );

 // Tranquility uses ZooKeeper (through Curator) for coordination.
 final CuratorFramework curator = CuratorFrameworkFactory
  .builder()
  .connectString(this.druidZkHosts)
  .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
  .build();
 curator.start();

 final Timestamper<Map<String,Object>> timestamper = new Timestamper<Map<String,Object>>() {
  public DateTime timestamp(Map<String,Object> map) {
   return new DateTime(map.get("timestamp"));
  }
 };

 final Beam<Map<String,Object>> beam = DruidBeams
 .builder(timestamper)
 .curator(curator)
 .discoveryPath(this.druidZkDiscoveryPath)
 .location(
  DruidLocation.create(
   this.druidIndexServiceName,
   this.druidFirehosePattern,
   this.datasource
  )
 )
 .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
 .tuning(
 ClusteredBeamTuning
 .builder()
 .segmentGranularity(Granularity.DAY)
 .windowPeriod(new Period("PT30M"))
 .partitions(this.numPartitions)
 .replicants(this.numReplicants)
 .build()
 ).buildBeam();

 return beam;
 }
}

``

Here is the log I get in my Flink application :

2016-12-05 12:24:10.667+01:00 DEBUG Tranquilizer:? - Sent[1], dropped[0], failed[0] out of 1 messages from batch #1. 1 batches still pending.

``

The task is created, but the "test-flink" datasource is never created and I get no interesting log in my Druid task.

I am sure data are pushed to Druid because if I voluntarily insert a typing error and write "timestampppp" instead of "timestamp" , i get the following log in my Druid task logs :

2016-12-05T11:04:19,872 WARN [qtp332998175-48] org.eclipse.jetty.servlet.ServletHandler - /druid/worker/v1/chat/firehose:test-flink7-005-0000-0000/push-events com.metamx.common.parsers.ParseException: Unparseable timestamp found! at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:72) ~[druid-api-0.9.1.1.jar:0.9.1.1] at io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory$EventReceiverFirehose.addAll(EventReceiverFirehoseFactory.java:192) ~[druid-server-0.9.1.1.jar:0.9.1.1] at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111] at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.SubLocatorRule.accept(SubLocatorRule.java:137) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409) ~[jersey-server-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409) ~[jersey-servlet-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558) ~[jersey-servlet-1.19.jar:1.19] at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733) ~[jersey-servlet-1.19.jar:1.19] at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) ~[javax.servlet-api-3.1.0.jar:3.1.0] at com.google.inject.servlet.ServletDefinition.doServiceImpl(ServletDefinition.java:278) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:268) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:180) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:93) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:120) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$1.call(GuiceFilter.java:132) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$1.call(GuiceFilter.java:129) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter$Context.call(GuiceFilter.java:206) ~[guice-servlet-4.0-beta.jar:?] at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:129) ~[guice-servlet-4.0-beta.jar:?] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlets.UserAgentFilter.doFilter(UserAgentFilter.java:83) ~[jetty-servlets-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlets.GzipFilter.doFilter(GzipFilter.java:364) ~[jetty-servlets-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at io.druid.server.initialization.jetty.ResponseHeaderFilterHolder$ResponseHeaderFilter.doFilter(ResponseHeaderFilterHolder.java:100) ~[druid-server-0.9.1.1.jar:0.9.1.1] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1125) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1059) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.Server.handle(Server.java:497) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:248) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) [jetty-io-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:620) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112] at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:540) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] Caused by: java.lang.NullPointerException: Null timestamp in input: {dimValue=dimValue2, timestampppp=1480935856025, numValue=2} at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:64) ~[druid-api-0.9.1.1.jar:0.9.1.1]
 ... 53 more

``

Any idea why the datasource is never created ?

Do not hesitate to ask if you need more information / more logs.

Thank you in advance for your help.

Melissa Benali-Richard

2016-12-05T11:04:19,872 WARN [qtp332998175-48] org.eclipse.jetty.servlet.ServletHandler - /druid/worker/v1/chat/firehose:test-flink7-005-0000-0000/push-events com.metamx.common.parsers.ParseException: Unparseable timestamp found! at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:72) ~[druid-api-0.9.1.1.jar:0.9.1.1] at io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory$EventReceiverFirehose.addA

Hello,

Thank you very much for your answer, however, I just did this error voluntraily (miswriting “timestamp”) to show that I am sure that data reach Druid as we can see at the end of the log:

Caused by:java.lang.NullPointerException: Null timestamp in input: {dimValue=dimValue2, timestampppp=1480935856025, numValue=2}

If I write “timestamp” correctly in my main method, I do not have any log but no datasource is created…

Hello Melissa Benali-Richard,

Have you integrated Flink and Druid successfully?