Subject: How does Spark Structured Streaming determine an event has arrived late?


Let me answer the original question directly, that is, how do we determine
that an event is late. We simply track the maximum event time the engine
has seen in the data it has processed till now. And any data that has event
time less than the max is basically "late" (as it is out-of-order). Now, in
a distributed setting, it is very hard define to whether each record is
late or not, because it is hard to have a consistent definition of
max-event-time-seen. Fortunately, we dont have to do this precisely because
we dont really care whether a record is "late"; we only care whether a
record is "too late", that is, older than the watermark =
max-event-time-seen - watermark-delay). As the programming guide says, if
data is "late" but not "too late" we process it in the same way as non-late
data. Only when the data is "too late" do we drop it.

To further clarify, we do not in any way to correlate processing-time with
event-time. The definition of lateness is only based on event-time and has
nothing to do with processing-time. This allows us to do event-time
processing with old data streams as well. For example, you may replay
1-week old data as a stream, and the processing will be exactly the same as
it would have been if you had processed the stream in real-time a week ago.
This is fundamentally necessary for achieving the deterministic processing
that Structured Streaming guarantees.

Regarding the picture, the "time" is actually "event-time". My apologies
for not making this clear in the picture. In hindsight, the picture can be
made much better.  :)

Hope this explanation helps!

TD

On Tue, Feb 27, 2018 at 2:26 AM, kant kodali <[EMAIL PROTECTED]> wrote: