Subject: Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

* As expected I have 5 latest checkpoints retained in my hdfs backend.
* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s
job restores from the latest checkpoint ( I think ) but as it creates new
checkpoints it does not delete the older chk point. At the end there are
now 10 chkpoints,  5 from the old run which remain static and 5 latest
representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like
it is taking the 5th checkpoint from the beginning based on num-retained
value, Note that it has the same job id and does not scope to a new

Please tell me if this does not make sense.

