Druid not receiving data from a Flink Sink using Tranquility

Hi All,

I’m building a Flink stream processor that sends data to druid via the Tranquility Flink Sink.

DruidBeams.fromConfig[Event](

config, timestamper, new JsonObjectWriterEvent)

.buildBeam()

``

Where config is just the output of TranquilityConfig.read. It appears that Tranquility is sending my events, as I have output like this in my processor’s logs:

2016-11-09 21:41:27,468 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Sent[1], dropped[0], failed[0] out of 1 messages from batch #0. 4 batches still pending.

2016-11-09 21:41:27,469 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Sending buffer with 76 messages (from background send thread).

2016-11-09 21:41:27,469 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Swapping out buffer with 76 messages, 5 batches now pending.

2016-11-09 21:41:27,469 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Sending buffer with 76 messages.

2016-11-09 21:41:27,489 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Sent[76], dropped[0], failed[0] out of 76 messages from batch #5. 4 batches still pending.

2016-11-09 21:41:27,551 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Sent[1], dropped[0], failed[0] out of 1 messages from batch #3. 3 batches still pending.

2016-11-09 21:41:27,974 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Sent[1], dropped[0], failed[0] out of 1 messages from batch #4. 2 batches still pending.

2016-11-09 21:41:27,982 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Sent[1], dropped[0], failed[0] out of 1 messages from batch #1. 1 batches still pending.

2016-11-09 21:41:28,915 DEBUG com.metamx.tranquility.tranquilizer.Tranquilizer - Sent[1], dropped[0], failed[0] out of 1 messages from batch #2. 0 batches still pending.

``

Also, on the Druid side, my indexing task is starting just fine and it prints the expected config on the Druid side to its logs, and doesn’t print any errors. However, the data source does not show up in Druid, even after the indexing task has completed. I have verified that my event timestamps are within the windowPeriod as well. I went through the code in DruidBeams.fromConfigInternal, and it looks like everything should be accounted for by the values in the config file. Also, since my indexing task is actually starting, getting the right config, and not logging any errors, it seems like all the configuration information is coming through just fine. Is there any additional configuration that is necessary when instantiating the Beam using a config file? Are there any known issues with this?

Thanks!

– Will

It looks like my issue was actually on the Druid side. There was one warning in the real time task log:

WARN [task-runner-0-priority-0] io.druid.segment.realtime.firehose.PredicateFirehose - [0] InputRow(s) ignored as they do not satisfy the predicate

It looks like that predicate is filtering on the time. I was basing my processor off the example code in the Tranquility GitHub repository, and in the SimpleEvent class at

tranquility/core/src/test/scala/com/metamx/tranquility/test/SimpleEvent.scala, the object to JSON conversion sends *seconds* in its JSON message:


  @JsonValue
  def toMap: Map[String, Any] = Map(
    TimeColumn -> (ts.millis / 1000),
    "foo" -> foo,
    "bar" -> bar,
    "lat" -> lat,
    "lon" -> lon
  )

However, it looks like Druid will filter that out. The fix was to change my JSON conversion code to serialize the DateTime to milliseconds since the epoch.

Hi Will,

Have you looked at the existing Flink adapter? https://github.com/druid-io/tranquility/blob/master/docs/flink.md

Please note that because of Druid’s windowPeriod restriction with Tranquility, for your tests you should be using current time events.