Subject: Using Spark Accumulators with Structured Streaming


Giving the code below:
//accumulators is a class level variable in driver.

 sparkSession.streams().addListener(new StreamingQueryListener() {
            @Override
            public void onQueryStarted(QueryStartedEvent queryStarted) {
                logger.info("Query started: " + queryStarted.id());
            }
            @Override
            public void onQueryTerminated(QueryTerminatedEvent
queryTerminated) {
                logger.info("Query terminated: " + queryTerminated.id());
            }
            @Override
            public void onQueryProgress(QueryProgressEvent queryProgress) {

accumulators.eventsReceived(queryProgress.progress().numInputRows());
                long eventsReceived = 0;
                long eventsExpired = 0;
                long eventSentSuccess = 0;
                try {
                    eventsReceived =
accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
                    eventsExpired =
accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
                    eventSentSuccess =
accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
                } catch (MissingKeyException e) {
                    logger.error("Accumulator key not found due to
Exception {}", e.getMessage());
                }
                logger.info("Events Received:{}", eventsReceived);
                logger.info("Events State Expired:{}", eventsExpired);
                logger.info("Events Sent Success:{}", eventSentSuccess);
                logger.info("Query made progress - batchId: {}
numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
durationMs:{}" ,
                        queryProgress.progress().batchId(),
queryProgress.progress().numInputRows(),
queryProgress.progress().inputRowsPerSecond(),
                        queryProgress.progress().processedRowsPerSecond(),
queryProgress.progress().durationMs());
On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[EMAIL PROTECTED]> wrote: