Loading spark sql.dataframe into druid

Hi All,

I have following use-case :

Use spark streaming to read json data from kafka . Batch the data for 10 mins interval , do some aggregations using spark sql. At this time I have a dataframe .

I need to load this dataframe into druid. Can you please help me with how can I load this Dataframe into druid ?

I read about Tranquility . But I think it can only load Dstreams RDD into druid (and not dataframes which are created from processing dstreams ) .

Sample Code :

var sparkSession = StreamingContextBuilder.sparkSessionBuilder(appName)

val ssc = StreamingContextBuilder.getStreamingContext(sparkSession,aggregationTime)

val kafkaParams =StreamingContextBuilder.setKafkaBroker(StreamingCfg.kafkaBroker)

val messages = StreamingContextBuilder.getKafkaMessages(ssc,kafkaTopics,kafkaParams)

val jsonStr = messages
  .foreachRDD((rdd, time: Time) => {

    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
    import org.apache.spark.sql.functions._

    val jsonSchema = scala.io.Source.fromFile(schemaFile).mkString
    val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
    val df = spark.read.schema(schema).json(rdd)
    df.withColumn("date_key", TimeUtil.getDateTimeWindow(col(timestampField),lit((aggregationTime/60)),lit(timeMeasure)))
      .filter(col("date_key") >= time.toString())
    val aggQuery = sqlQuery
    spark.sql("set spark.sql.caseSensitive=true")
    val tmp = spark



In the above example, I need to store the dataframe "tmp" into a druid datasource. Basically every 10 mins I will have new data into "tmp" which I need to load into druid.

I would appreciate any help .

Thank you,


Got it . Convert your df into rdd of some case class and then we can use propagate function of tranquility .