Hi All,

As you might have already seen there is an effort tracked in FLINK-12005
[1] to support event time scale for state with time-to-live (TTL) [2].
While thinking about design, we realised that there can be multiple options
for semantics of this feature, depending on use case. There is also
sometimes confusion because of event time out-of-order nature in Flink. I
am starting this thread to discuss potential use cases of this feature and
their requirements for interested users and developers. There was already
discussion thread asking about event time for TTL and it already contains
some thoughts [3].

There are two semantical cases where we use time for TTL feature at the
moment. Firstly, we store timestamp of state last access/update. Secondly,
we use this timestamp and current timestamp to check expiration and garbage
collect state at some point later.

At the moment, Flink supports *only processing time* for both timestamps:
state *last access and current timestamp*. It is basically current local
system unix epoch time.

When it comes to event time scale, we also need to define what Flink should
use for these two timestamps. Here I will list some options and their
possible pros&cons for discussion. There might be more depending on use

*Last access timestamp (stored in backend with the actual state value):*

   - *Event timestamp of currently being processed record.* This seems to
   be the simplest option and it allows user-defined timestamps in state
   backend. The problem here might be instability of event time which can not
   only increase but also decrease if records come out of order. This can lead
   to rewriting the state timestamp to smaller value which is unnatural for
   the notion of time.
   - *Max event timestamp of records seen so far for this record key.* This
   option is similar to the previous one but it tries to fix the notion of
   time to make it always increasing. Maintaining this timestamp has also
   performance implications because the previous timestamp needs to be read
   out to decide whether to rewrite it.
   - *Last emitted watermark*. This is what we usually use for other
   operations to trigger some actions in Flink, like timers and windows but it
   can be unrelated to the record which actually triggers the state update.

*Current timestamp to check expiration:*

   - *Event timestamp of last processed record.* Again quite simple but
   unpredictable option for out-of-order events. It can potentially lead to
   undesirable expiration of late buffered data in state without control.
   - *Max event timestamp of records seen so far for operator backend.* Again
   similar to previous one, more stable but still user does not have too much
   control when to expire state.
   - *Last emitted watermark*. Again, this is what we usually use for other
   operations to trigger some actions in Flink, like timers and windows. It
   also gives user some control to decide when state is expired (up to which
   point in event time) by emitting certain watermark. It is more flexible but
   complicated. If some watermark emitting strategy is already used for other
   operations, it might be not optimal for TTL and delay state cleanup.
   - *Current processing time.* This option is quite simple, It would mean
   that user just decides which timestamp to store but it will expire in real
   time. For data privacy use case, it might be better because we want state
   to be unavailable in particular real moment of time since the associated
   piece of data was created in event time. For long term approximate garbage
   collection, it might be not a problem as well. For quick expiration, the
   time skew between event and processing time can lead again to premature
   deletion of late data and user cannot delay it.

We could also make this behaviour configurable. Another option is to make
time provider pluggable for users. The interface can give users context
(currently processed record, watermark etc) and ask them which timestamp to
use. This is more complicated though.

Looking forward for your feedback.


[1] https://issues.apache.org/jira/browse/FLINK-12005