Subject: Using Spark Accumulators with Structured Streaming


I can't reproduce the issue with my simple code:
```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
...
```

I'm wondering if there were any errors can be found from driver logs? The micro-batch
exceptions won't terminate the streaming job running.

For the following code, we have to make sure that `StateUpdateTask` is started:

--
Cheers,
-z

On Thu, 28 May 2020 19:59:31 +0530
Srinivas V <[EMAIL PROTECTED]> wrote:

To unsubscribe e-mail: [EMAIL PROTECTED]