Insert past events in Kafka with Tranquility to Druid ?

Hello,

My timestamp events are already stored in Kafka topic. I’d like to move them to Druid. From my understanding, I should use Tranquility Client to move them around. I can’t use the kafka firehose directly because my events are Avro serialized, that’s why I choose the tranquility route too.

I use the following sample code from tranquility github :

val indexService = "overlord" // Your overlord's druid.service, with slashes replaced by colons.
val firehosePattern = "druid:firehose:%s" // Make up a service pattern, include %s somewhere in it.
val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path.
val dataSource = "foo"
val dimensions = Seq("bar", "qux")
val aggregators = Seq(new CountAggregatorFactory("cnt"), new LongSumAggregatorFactory("baz", "baz"))

// Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
val timestamper = (eventMap: Map[String, Any]) => new DateTime(eventMap("timestamp"))

// Tranquility needs to be able to serialize your object type. By default this is done with Jackson. If you want to
// provide an alternate serializer, you can provide your own via ```.objectWriter(...)```. In this case, we won't
// provide one, so we're just using Jackson:
val druidService = DruidBeams
  .builder(timestamper)
  .curator(curator)
  .discoveryPath(discoveryPath)
  .location(DruidLocation(indexService, firehosePattern, dataSource))
  .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
  .tuning(
    ClusteredBeamTuning(
      segmentGranularity = Granularity.HOUR,
      windowPeriod = new Period("PT10M"),
      partitions = 1,
      replicants = 1
    )
  )
  .buildService()

val time1: String = dateTime.minusDays(30).toString(dtf)
val listOfEvents = Seq(Map(“timestamp” -> time1, “bar” -> “toto”, “qux” -> “world”, “baz” -> 42L))

// Send events to Druid:
val numSentFuture: Future[Int] = druidService(listOfEvents)

// Wait for confirmation:
val numSent = Await.result(numSentFuture)


If my timestamp is set like this, the event are not sent/ingested into Druid. If the timestamp date is recent, it works as expected. Maybe I'm missing something. How could I configure Druid and/or Tranquility to ingest past events ?

Regards,

Nicolas PHUNG

BTW I’m using Druid 0.8.0 with Tranquility 0.5.0.

Hey Nicolas,

Druid streaming ingestion (both Kafka-based and tranquility-based) is currently not able to handle older data that falls outside the windowPeriod. You can always adjust the windowPeriod higher, but setting it higher than the segmentGranularity will cause non-handed-off segments to pile up on the realtime indexers and is not recommended. We intend to remove this limitation in a future version.

For the time being, the best way to index older data is with Druid’s batch ingestion: either the IndexTask for relatively small amounts of data (~1GB or less) or the Hadoop indexer for any size of data.

Hi Gian,

Thank you for your reply. Do you have a roadmap or ticket I can follow to know when this limitation in a future version ?

So for now, I’ll have to put my Kafka data into HDFS for example and then call the Batch Indexing Services to get the past events into Druid, won’t I ?

Regards,

Nicolas P

We are tracking the ingestion improvements here: https://github.com/druid-io/druid/issues/1642. And yeah, Kafka -> HDFS/S3 -> Druid is currently the best strategy for historical data. People have had luck with Camus for Kafka -> HDFS and Secor for Kafka -> S3, so you could look at those for ways of getting data into the Hadoop indexer.