Subject: Streaming data out of spark to a Kafka topic


In a traditional we get data via Kafka into Spark streaming, do some work
and write to a NoSQL database like Mongo, Hbase or Aerospike.

That part can be done below and is best explained by the code as follows:

Once a high value DF lookups is created I want send the data to a new topic
for recipients!

    val kafkaParams = Map[String, String](
                                      "bootstrap.servers" ->
                                      "schema.registry.url" ->
                                       "zookeeper.connect" ->
                                       "" -> sparkAppName,
                                       "" ->
                                       "" ->
                                       "" ->
                                       "" ->
    //val topicsSet = topics.split(",").toSet
    val topics = Set(topicsValue)
    val dstream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    // This returns a tuple of key and value (since messages in Kafka are
optionally keyed). In this case it is of type (String, String)
    val topicsOut = Set(topicsValueOut)
    val dstreamOut = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsOut)
    { pricesRDD =>
      if (!pricesRDD.isEmpty)  // data exists in RDD
        val op_time = System.currentTimeMillis.toString
        val spark =
        val sc = spark.sparkContext
        import spark.implicits._
        var operation = new operationStruct(op_type, op_time)
        // Convert RDD[String] to RDD[case class] to DataFrame
        val RDDString = { case (_, value) => value.split(',')
}.map(p =>
currency), operation))
        val df = spark.createDataFrame(RDDString)
        var document = df.filter('priceInfo.getItem("price") > 90.0), writeConfig)
         println("Current time is: " + Calendar.getInstance.getTime)
         totalPrices += document.count
         var endTimeQuery = System.currentTimeMillis
         println("Total Prices added to the collection so far: "
+totalPrices+ " , Runnig for  " + (endTimeQuery -
startTimeQuery)/(1000*60)+" Minutes")
         // Check if running time > runTime exit
         if( (endTimeQuery - startTimeQuery)/(100000*60) > runTime)
           println("\nDuration exceeded " + runTime + " minutes exiting")
         // picking up individual arrays -->'otherDetails.getItem("tickerQuotes")(0)) shows first element
         //val lookups = df.filter('priceInfo.getItem("ticker") ===
tickerWatch && 'priceInfo.getItem("price") > priceWatch)
         val lookups = df.filter('priceInfo.getItem("price") > priceWatch)
         if(lookups.count > 0) {
           println("High value tickers")
issued"), 'priceInfo.getItem("ticker").as("Ticker"),
'priceInfo.getItem("price").cast("Double").as("Latest price")).show

// Now here I want to send the content of lookups DF to another kafka
//Note that above I have created a new dstreamOut with a new topic topicsOut

How that can be done?

