Subject: Streaming data out of spark to a Kafka topic


In a traditional we get data via Kafka into Spark streaming, do some work
and write to a NoSQL database like Mongo, Hbase or Aerospike.

That part can be done below and is best explained by the code as follows:

Once a high value DF lookups is created I want send the data to a new topic
for recipients!

    val kafkaParams = Map[String, String](
                                      "bootstrap.servers" ->
                                      "schema.registry.url" ->
                                       "zookeeper.connect" ->
                                       "" -> sparkAppName,
                                       "" ->
                                       "" ->
                                       "" ->
                                       "" ->
    //val topicsSet = topics.split(",").toSet
    val topics = Set(topicsValue)
    val dstream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    // This returns a tuple of key and value (since messages in Kafka are
optionally keyed). In this case it is of type (String, String)
    val topicsOut = Set(topicsValueOut)
    val dstreamOut = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsOut)
    { pricesRDD =>
      if (!pricesRDD.isEmpty)  // data exists in RDD
        val op_time = System.currentTimeMillis.toString
        val spark =
        val sc = spark.sparkContext
        import spark.implicits._
        var operation = new operationStruct(op_type, op_time)
        // Convert RDD[String] to RDD[case class] to DataFrame
        val RDDString = { case (_, value) => value.split(',')
}.map(p =>
currency), operation))
        val df = spark.createDataFrame(RDDString)
        var document = df.filter('priceInfo.getItem("price") > 90.0), writeConfig)
         println("Current time is: " + Calendar.getInstance.getTime)
         totalPrices += document.count
         var endTimeQuery = System.currentTimeMillis
         println("Total Prices added to the collection so far: "
+totalPrices+ " , Runnig for  " + (endTimeQuery -
startTimeQuery)/(1000*60)+" Minutes")
         // Check if running time > runTime exit
         if( (endTimeQuery - startTimeQuery)/(100000*60) > runTime)
           println("\nDuration exceeded " + runTime + " minutes exiting")
         // picking up individual arrays -->'otherDetails.getItem("tickerQuotes")(0)) shows first element
         //val lookups = df.filter('priceInfo.getItem("ticker") ===
tickerWatch && 'priceInfo.getItem("price") > priceWatch)
         val lookups = df.filter('priceInfo.getItem("price") > priceWatch)
         if(lookups.count > 0) {
           println("High value tickers")
issued"), 'priceInfo.getItem("ticker").as("Ticker"),
'priceInfo.getItem("price").cast("Double").as("Latest price")).show

// Now here I want to send the content of lookups DF to another kafka
//Note that above I have created a new dstreamOut with a new topic topicsOut

How that can be done?

Dr Mich Talebzadeh

LinkedIn *
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.