Flink AllowedLateness
allowedLateness是Flink window中用来容忍一段时间迟到数据的,对allowedLateness最容易的误解是它等同于将watermark减去或者说推迟一段时间,通过推迟窗口触发来容忍一定时间的迟到数据。事实上allowedLateness并不会推迟窗口的触发时间,它改变的是窗口的清除时间(cleanupTime)。下面是org.apache.flink.streaming.runtime.operators.windowing.WindowOperator的cleanupTime和registerCleanupTimer,这2个方法表明了allowedLateness影响窗口的清除:
/**
* Returns the cleanup time for a window, which is
* {@code window.maxTimestamp + allowedLateness}. In
* case this leads to a value greater than {@link Long#MAX_VALUE}
* then a cleanup time of {@link Long#MAX_VALUE} is
* returned.
*
* @param window the window whose cleanup time we are computing.
*/
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
/**
* Registers a timer to cleanup the content of the window.
* @param window
* the window whose state to discard
*/
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
if (windowAssigner.isEventTime()) {
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
也就是说allowedLateness改变的是窗口的清除时间,watermark触发窗口计算后窗口没有立马被清除而是等待allowedLateness后再清除,在这期间收到该窗口迟到数据时都会触发一次窗口计算。如下图,小圆表示窗口中的元素,watermark达到窗口endTime时会触发一次窗口计算,迟到的小圆还会分别触发一次窗口计算。