1.Flink窗口
Window Assigner分配器。
窗口可以是时间驱动的(Time Window,例如:每30秒钟),也可以是数据驱动的(Count Window,例如:每一百个元素)。
一种经典的窗口分类可以分成:
翻滚窗口(Tumbling Window,无重叠),滚动窗口(Sliding Window,有重叠),和会话窗口(Session Window,活动间隙)。
基于时间的窗口
(1)Tumbling Windows
(2)Sliding Windows
(3)Session Windows
(4)Global Windows
基于数据个数的窗口
(5)CountWindows
/**
* Windows this {@code KeyedStream} into tumbling count windows.
*
* @param size The size of the windows in number of elements.
*/
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
/**
* Windows this {@code KeyedStream} into sliding count windows.
*
* @param size The size of the windows in number of elements.
* @param slide The slide interval in number of elements.
*/
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
2.Flink时间类型
流处理程序在时间概念上总共有三个时间概念:
(1) 处理时间
处理时间是指每台机器的系统时间,当流程序采用处理时间时将使用运行各个运算符实例的机器时间。处理时间是最简单的时间概念,不需要流和机器之间的协调,它能提供最好的性能和最低延迟。然而在分布式和异步环境中,处理时间不能提供消息事件的时序性保证,因为它受到消息传输延迟,消息在算子之间流动的速度等方面制约。
(2) 事件时间
事件时间是指事件在其设备上发生的时间,这个时间在事件进入 flink 之前已经嵌入事件,然后 flink 可以提取该时间。基于事件时间进行处理的流程序可以保证事件在处理的时候的顺序性,但是基于事件时间的应用程序必须要结合 watermark 机制。基于事件时间的处理往往有一定的滞后性,因为它需要等待后续事件和处理无序事件,对于时间敏感的应用使用的时候要慎重考虑。
(3) 注入时间
注入时间是事件注入到 flink 的时间。事件在 source 算子处获取 source 的当前时间作为事件注入时间,后续的基于时间的处理算子会使用该时间处理数据。相比于事件时间,注入时间不能够处理无序事件或者滞后事件,但是应用程序无序指定如何生成 watermark。在内部注入时间程序的处理和事件时间类似,但是时间戳分配和 watermark 生成都是自动的。
区别如下图
(4) WaterMark机制
由于Flink的数据可能会有乱序的可能,所有Flink引入WaterMark机制来一定程度上避免乱序,Watermark是代表当前系统对数据的一个处理进度。
StreamExecutionEnvironment.java
watermark更新的时间间隔,从源码中可以看到是默认是200ms
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
* <p>If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
(5) Watermark的生成实现
实现 AssignerWithPeriodicWatermarks.java,Flink自带的实现BoundedOutOfOrdernessTimestampExtractor.java和AscendingTimestampExtractor.java
BoundedOutOfOrdernessTimestampExtractor使用了延迟策略,在设置时指定了容忍数据的延迟的时间。
/** * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can * help reduce the number of elements that are ignored due to lateness when computing the final result for a * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp. * */
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
/**
*
* 返回当前水印。该方法定期被调用系统检索当前水印。该方法可能会返回{@code null}表示没有新的水印可用
* 调用此方法的时间间隔和生成的水印依赖于取决于 ExecutionConfig#getAutoWatermarkInterval()
*/
@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
而AscendingTimestampExtractor的实现是默认递增的。
周期性水印生成的最简单的特殊例子是时间戳被给定的源任务按递增顺序产生,在这种情况下,当前的时间戳永远可以作为水印,因为没有更早的时间戳(迟到的数据)到达。
注意:每个并行数据源任务中的timestamp是递增的。
/** * A timestamp assigner and watermark generator for streams where timestamps are monotonously * ascending. In this case, the local watermarks for the streams are easy to generate, because * they strictly follow the timestamps. * * @param <T> The type of the elements that this function can extract timestamps from */
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
final long newTimestamp = extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}
@Override
public final Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
Watermark 如何处理乱序数据,通过源码来解释如下
EventTimeTrigger.java
/**
* A {@link Trigger} that fires once the watermark passes the end of the window
* to which a pane belongs.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// 如果当前的watermark 已经过了window的上限阈值,则开始计算窗口的值.
// 迟来的element如果在窗口计算销毁之前到达则不会丢失数据,否则将会丢失。销毁窗口的关键因素就是watermark和maxTimestamp的大小
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
....
}
3.Flink窗口计算和延迟数据获取Demo
如果当前数据的 EventTime 在 WaterMark 之上,也就是 EventTime> WaterMark。因为我们知道数据所属窗口的 WindowEndTime,一定是大于 EventTime 的。这时我们有 WindowEndTime > EventTime > WaterMark。所以这种情况下数据是一定不会丢失的。
如果当前数据的 EventTime 在 WaterMark 之下,也就是 WaterMark > EventTime。这时候要分两种情况:
3.1 如果该数据所属窗口的 WindowEndTime > WaterMark,则表示窗口还没被触发,即 WindowEndTime > WaterMark > EventTime,这种情况数据也是不会丢失的。
3.1 如果该数据所属窗口的 WaterMark > WindowEndTime,则表示窗口已经无法被触发,即 WaterMark > WindowEndTime > EventTime,这种情况数据也就丢失了。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义了水位线的时间属性是从消息中获取
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//定义数据源
env.addSource(new TruckSource()).name("truckSource")
//定义数据转换函数(可选),可以将元组转换成bean对象
.map(new SourceMapFunction()).name("sourceMapFunction").setParallelism(4)
//定义过滤器(可选),可以将某些无用数据过滤掉
.filter(new MyFilterFunction()).name("myFilterFunction").setParallelism(4)
//指定watermarks的生成方式
//(1)BoundedOutOfOrdernessTimestampExtractor (2)AscendingTimestampExtractor
.assignTimestampsAndWatermarks(new TruckTimestamp())
//时间窗口(可选).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.timeWindowAll(Time.seconds(10))
//允许的最大延迟10秒(可选),超过此延迟数据可丢失
.allowedLateness(Time.seconds(10))
//使用时间窗口函数做处理
.apply(new TrigerAlertWindow()).name("trigerAlertWindow")
//sink数据做持久化操作
.addSink(new MySinkFunction()).name("mySinkFunction");
env.execute("test");
//------------------------------------------------------------------------------------
/**
* 提取消息中的event 时间字段作为水位线字段
*/
public static class TruckTimestamp extends AscendingTimestampExtractor<TruckBean> {
private static final long serialVersionUID = 1L;
@Override
public long extractAscendingTimestamp(TruckBean element) {
return element.time;
}
}
public class TruckBean {
public String imei;
public double lat;
public double lng;
public long time;
public long count;
}
### 如果不指定WaterMark,系统默认使用PROCESS_TIME
4.window窗口数据处理
window窗口数据处理 apply和process.详细信息见 org.apache.flink.streaming.api.datastream.WindowedStream
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
TypeInformation<R> resultType = getWindowFunctionReturnType(function, getInputType());
return apply(function, resultType);
}
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableWindowFunction<>(function), resultType, function);
}
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) {
TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, getInputType(), null);
return process(function, resultType);
}
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*/
@Internal
public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableProcessWindowFunction<>(function), resultType, function);
}
其中 apply方法处理windows数据,是通过windowFunction实现的,通过这个算子,可以对window数据进行处理
/**
* Base interface for functions that are evaluated over keyed (grouped) windows.
*
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
process算子是个底层的方法,常见的有ProcessFunction和KeyedProcessFunction两种计算的方式,具体的实现可以看源码。process和apply算子最大的区别在于process可以自己定时触发计算的定时器,在processElement方法定义定时器 context.timerService().registerEventTimeTimer(timestamp); ,当定时器时间到达,会回调onTimer()方法的计算任务,这是和apply最大的区别
/**
* A function that processes elements of a stream.
*
* <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}
* is invoked. This can produce zero or more elements as output. Implementations can also
* query the time and set timers through the provided {@link Context}. For firing timers
* {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
* zero or more elements as output and register further timers.
*
* <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
* available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
*
* <p><b>NOTE:</b> A {@code ProcessFunction} is always a
* {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
* {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and
* teardown methods can be implemented. See
* {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
*
* @param <I> Type of the input elements.
* @param <O> Type of the output elements.
*/
@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* Process one element from the input stream.
*
* <p>This function can output zero or more elements using the {@link Collector} parameter
* and also update internal state or set timers using the {@link Context} parameter.
*
* @param value The input value.
* @param ctx A {@link Context} that allows querying the timestamp of the element and getting
* a {@link TimerService} for registering timers and querying the time. The
* context is only valid during the invocation of this method, do not store it.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* Called when a timer set using {@link TimerService} fires.
*
* @param timestamp The timestamp of the firing timer.
* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
* querying the {@link TimeDomain} of the firing timer and getting a
* {@link TimerService} for registering timers and querying the time.
* The context is only valid during the invocation of this method, do not store it.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* Information available in an invocation of {@link #processElement(Object, Context, Collector)}
* or {@link #onTimer(long, OnTimerContext, Collector)}.
*/
public abstract class Context {
/**
* Timestamp of the element currently being processed or timestamp of a firing timer.
*
* <p>This might be {@code null}, for example if the time characteristic of your program
* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
*/
public abstract Long timestamp();
/**
* A {@link TimerService} for querying time and registering timers.
*/
public abstract TimerService timerService();
/**
* Emits a record to the side output identified by the {@link OutputTag}.
*
* @param outputTag the {@code OutputTag} that identifies the side output to emit to.
* @param value The record to emit.
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
public abstract class OnTimerContext extends Context {
/**
* The {@link TimeDomain} of the firing timer.
*/
public abstract TimeDomain timeDomain();
}
}
window算子
window算子:是可以设置并行度的
windowAll 算子:并行度始终为1
只有KeyedSteam才能window算子,普通的DataStream只能使用WindowAll 算子。
处理window算子的数据: apply算子和process算子
(1) window数据 apply 算子 对应的fuction
WindowFunction(keyed window) 与 AllWindowFunction(no key window)
(2) window数据 process 算子 对应的fuction
process算子可以获取到window窗口的Context信息(包括state,watermark,当前的window对象) , apply算子无法获取到上下文信息,只能处理数据
ProcessWindowFunction(keyed-window) 和 ProcessAllWindowFunction(no keyd window)
keyed-window 和 nokeyed-window https://cloud.tencent.com/developer/article/1481336
(3) 普通流数据,process算子,对应的fuction
KeyedProcessFunction: A keyed function that processes elements of a stream. (可实现定时任务)
ProcessFunction:A function that processes elements of a stream.(可实现定时任务)