Subject: Using Spark Accumulators with Structured Streaming

I can't reproduce the issue with my simple code:
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println( + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      println(s">>> key: $key => state: ${state}")

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream

I'm wondering if there were any errors can be found from driver logs? The micro-batch
exceptions won't terminate the streaming job running.

For the following code, we have to make sure that `StateUpdateTask` is started:


On Thu, 28 May 2020 19:59:31 +0530
Srinivas V <[EMAIL PROTECTED]> wrote:

To unsubscribe e-mail: [EMAIL PROTECTED]