文档章节

Flink Window

满小茂
 满小茂
发布于 2018/12/17 00:38
字数 3152
阅读 33
收藏 0

1.Flink窗口

      Window Assigner分配器。

          窗口可以是时间驱动的(Time Window,例如:每30秒钟),也可以是数据驱动的(Count Window,例如:每一百个元素)。

      一种经典的窗口分类可以分成:

          翻滚窗口(Tumbling Window,无重叠),滚动窗口(Sliding Window,有重叠),和会话窗口(Session Window,活动间隙)。

     基于时间的窗口

(1)Tumbling Windows

        https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#tumbling-windows     

(2)Sliding Windows

        https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#sliding-windows

(3)Session Windows

        https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#session-windows

(4)Global Windows

        https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows

     基于数据个数的窗口

(5)CountWindows

/**
	 * Windows this {@code KeyedStream} into tumbling count windows.
	 *
	 * @param size The size of the windows in number of elements.
	 */
	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
		return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
	}

	/**
	 * Windows this {@code KeyedStream} into sliding count windows.
	 *
	 * @param size The size of the windows in number of elements.
	 * @param slide The slide interval in number of elements.
	 */
	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
		return window(GlobalWindows.create())
				.evictor(CountEvictor.of(size))
				.trigger(CountTrigger.of(slide));
	}

      

2.Flink时间类型

流处理程序在时间概念上总共有三个时间概念:

(1) 处理时间

       处理时间是指每台机器的系统时间,当流程序采用处理时间时将使用运行各个运算符实例的机器时间。处理时间是最简单的时间概念,不需要流和机器之间的协调,它能提供最好的性能和最低延迟。然而在分布式和异步环境中,处理时间不能提供消息事件的时序性保证,因为它受到消息传输延迟,消息在算子之间流动的速度等方面制约。

(2) 事件时间

      事件时间是指事件在其设备上发生的时间,这个时间在事件进入 flink 之前已经嵌入事件,然后 flink 可以提取该时间。基于事件时间进行处理的流程序可以保证事件在处理的时候的顺序性,但是基于事件时间的应用程序必须要结合 watermark 机制。基于事件时间的处理往往有一定的滞后性,因为它需要等待后续事件和处理无序事件,对于时间敏感的应用使用的时候要慎重考虑。

(3) 注入时间

      注入时间是事件注入到 flink 的时间。事件在 source 算子处获取 source 的当前时间作为事件注入时间,后续的基于时间的处理算子会使用该时间处理数据。相比于事件时间,注入时间不能够处理无序事件或者滞后事件,但是应用程序无序指定如何生成 watermark。在内部注入时间程序的处理和事件时间类似,但是时间戳分配和 watermark 生成都是自动的。

区别如下图

      

(4) WaterMark机制

       由于Flink的数据可能会有乱序的可能,所有Flink引入WaterMark机制来一定程度上避免乱序,Watermark是代表当前系统对数据的一个处理进度。

       StreamExecutionEnvironment.java

       watermark更新的时间间隔,从源码中可以看到是默认是200ms

    /**
	 * Sets the time characteristic for all streams create from this environment, e.g., processing
	 * time, event time, or ingestion time.
	 *
	 * <p>If you set the characteristic to IngestionTime of EventTime this will set a default
	 * watermark update interval of 200 ms. If this is not applicable for your application
	 * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
	 *
	 * @param characteristic The time characteristic.
	 */
	@PublicEvolving
	public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
		this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
		if (characteristic == TimeCharacteristic.ProcessingTime) {
			getConfig().setAutoWatermarkInterval(0);
		} else {
			getConfig().setAutoWatermarkInterval(200);
		}
	}

(5)Watermark的生成实现

 实现 AssignerWithPeriodicWatermarks.java,Flink自带的实现BoundedOutOfOrdernessTimestampExtractor.javaAscendingTimestampExtractor.java

BoundedOutOfOrdernessTimestampExtractor使用了延迟策略,在设置时指定了容忍数据的延迟的时间。

/**
 * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
 * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
 * help reduce the number of elements that are ignored due to lateness when computing the final result for a
 * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
 * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
 * */
@Override
	public final Watermark getCurrentWatermark() {
		// this guarantees that the watermark never goes backwards.
		long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
		if (potentialWM >= lastEmittedWatermark) {
			lastEmittedWatermark = potentialWM;
		}
		return new Watermark(lastEmittedWatermark);
	}

AscendingTimestampExtractor的实现是默认递增的。

周期性水印生成的最简单的特殊例子是时间戳被给定的源任务按递增顺序产生,在这种情况下,当前的时间戳永远可以作为水印,因为没有更早的时间戳将到达。
注意:每个并行数据源任务中的timestamp是递增的。

/**
 * A timestamp assigner and watermark generator for streams where timestamps are monotonously
 * ascending. In this case, the local watermarks for the streams are easy to generate, because
 * they strictly follow the timestamps.
 *
 * @param <T> The type of the elements that this function can extract timestamps from
 */
@Override
	public final long extractTimestamp(T element, long elementPrevTimestamp) {
		final long newTimestamp = extractAscendingTimestamp(element);
		if (newTimestamp >= this.currentTimestamp) {
			this.currentTimestamp = newTimestamp;
			return newTimestamp;
		} else {
			violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
			return newTimestamp;
		}
	}

 

Watermark 如何处理乱序数据,通过源码来解释如下

EventTimeTrigger.java

/**
 * A {@link Trigger} that fires once the watermark passes the end of the window
 * to which a pane belongs.
 *
 * @see org.apache.flink.streaming.api.watermark.Watermark
 */
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private EventTimeTrigger() {}

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        // 如果当前的watermark 已经过了window的上限阈值,则开始计算窗口的值.
        // 迟来的element如果在窗口计算销毁之前到达则不会丢失数据,否则将会丢失。销毁窗口的关键因素就是watermark和maxTimestamp的大小
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			ctx.registerEventTimeTimer(window.maxTimestamp());
			return TriggerResult.CONTINUE;
		}
	}

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
		return time == window.maxTimestamp() ?
			TriggerResult.FIRE :
			TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}
    ....
}

3.Flink窗口计算和延迟数据获取Demo

如果当前数据的 EventTime 在 WaterMark 之上,也就是 EventTime> WaterMark。因为我们知道数据所属窗口的 WindowEndTime,一定是大于 EventTime 的。这时我们有 WindowEndTime > EventTime > WaterMark。所以这种情况下数据是一定不会丢失的。

如果当前数据的 EventTime 在 WaterMark 之下,也就是 WaterMark > EventTime。这时候要分两种情况: 

  3.1 如果该数据所属窗口的 WindowEndTime > WaterMark,则表示窗口还没被触发,即 WindowEndTime > WaterMark > EventTime,这种情况数据也是不会丢失的。 

  3.1 如果该数据所属窗口的 WaterMark > WindowEndTime,则表示窗口已经无法被触发,即 WaterMark > WindowEndTime > EventTime,这种情况数据也就丢失了。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //定义了水位线的时间属性是从消息中获取
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //定义数据源
        env.addSource(new TruckSource()).name("truckSource")
                //定义数据转换函数(可选),可以将元组转换成bean对象
                .map(new SourceMapFunction()).name("sourceMapFunction").setParallelism(4)
                //定义过滤器(可选),可以将某些无用数据过滤掉
                .filter(new MyFilterFunction()).name("myFilterFunction").setParallelism(4)
                //指定watermarks的生成方式
                //(1)BoundedOutOfOrdernessTimestampExtractor (2)AscendingTimestampExtractor
                .assignTimestampsAndWatermarks(new TruckTimestamp())
                //时间窗口(可选).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .timeWindowAll(Time.seconds(10))
                //允许的最大延迟10秒(可选),超过此延迟数据可丢失
                .allowedLateness(Time.seconds(10))
                //使用时间窗口函数做处理
                .apply(new TrigerAlertWindow()).name("trigerAlertWindow")
                //sink数据做持久化操作
                .addSink(new MySinkFunction()).name("mySinkFunction");
        env.execute("test");
     //------------------------------------------------------------------------------------
    /**
     * 提取消息中的event 时间字段作为水位线字段
     */
    public static class TruckTimestamp extends AscendingTimestampExtractor<TruckBean> {
        private static final long serialVersionUID = 1L;

        @Override
        public long extractAscendingTimestamp(TruckBean element) {
            return element.time;
        }
    }

   public  class TruckBean {
       public String imei;
       public double lat;
       public double lng;
       public long time;
       public long count;
   }

4.window窗口数据处理

    window窗口数据处理 apply和process.详细信息见 org.apache.flink.streaming.api.datastream.WindowedStream

    /**
	 * Applies the given window function to each window. The window function is called for each
	 * evaluation of the window for each key individually. The output of the window function is
	 * interpreted as a regular non-windowed stream.
	 *
	 * <p>Not that this function requires that all data in the windows is buffered until the window
	 * is evaluated, as the function provides no means of incremental aggregation.
	 *
	 * @param function The window function.
	 * @return The data stream that is the result of applying the window function to the window.
	 */
	public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
		TypeInformation<R> resultType = getWindowFunctionReturnType(function, getInputType());

		return apply(function, resultType);
	}

	/**
	 * Applies the given window function to each window. The window function is called for each
	 * evaluation of the window for each key individually. The output of the window function is
	 * interpreted as a regular non-windowed stream.
	 *
	 * <p>Note that this function requires that all data in the windows is buffered until the window
	 * is evaluated, as the function provides no means of incremental aggregation.
	 *
	 * @param function The window function.
	 * @param resultType Type information for the result type of the window function
	 * @return The data stream that is the result of applying the window function to the window.
	 */
	public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
		function = input.getExecutionEnvironment().clean(function);
		return apply(new InternalIterableWindowFunction<>(function), resultType, function);
	}

	/**
	 * Applies the given window function to each window. The window function is called for each
	 * evaluation of the window for each key individually. The output of the window function is
	 * interpreted as a regular non-windowed stream.
	 *
	 * <p>Not that this function requires that all data in the windows is buffered until the window
	 * is evaluated, as the function provides no means of incremental aggregation.
	 *
	 * @param function The window function.
	 * @return The data stream that is the result of applying the window function to the window.
	 */
	@PublicEvolving
	public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) {
		TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, getInputType(), null);

		return process(function, resultType);
	}

	/**
	 * Applies the given window function to each window. The window function is called for each
	 * evaluation of the window for each key individually. The output of the window function is
	 * interpreted as a regular non-windowed stream.
	 *
	 * <p>Not that this function requires that all data in the windows is buffered until the window
	 * is evaluated, as the function provides no means of incremental aggregation.
	 *
	 * @param function The window function.
	 * @param resultType Type information for the result type of the window function
	 * @return The data stream that is the result of applying the window function to the window.
	 */
	@Internal
	public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
		function = input.getExecutionEnvironment().clean(function);
		return apply(new InternalIterableProcessWindowFunction<>(function), resultType, function);
	}

其中 apply方法处理windows数据,是通过windowFunction实现的,通过这个算子,可以对window数据进行处理

/**
 * Base interface for functions that are evaluated over keyed (grouped) windows.
 *
 * @param <IN> The type of the input value.
 * @param <OUT> The type of the output value.
 * @param <KEY> The type of the key.
 * @param <W> The type of {@code Window} that this window function can be applied on.
 */
@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

	/**
	 * Evaluates the window and outputs none or several elements.
	 *
	 * @param key The key for which this window is evaluated.
	 * @param window The window that is being evaluated.
	 * @param input The elements in the window being evaluated.
	 * @param out A collector for emitting elements.
	 *
	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
	 */
	void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

process算子是个底层的方法,常见的有ProcessFunction和KeyedProcessFunction两种计算的方式,具体的实现可以看源码。process和apply算子最大的区别在于process可以自己定时触发计算的定时器,在processElement方法定义定时器   context.timerService().registerEventTimeTimer(timestamp); ,当定时器时间到达,会回调onTimer()方法的计算任务,这是和apply最大的区别

/**
 * A function that processes elements of a stream.
 *
 * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}
 * is invoked. This can produce zero or more elements as output. Implementations can also
 * query the time and set timers through the provided {@link Context}. For firing timers
 * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
 * zero or more elements as output and register further timers.
 *
 * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
 * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
 *
 * <p><b>NOTE:</b> A {@code ProcessFunction} is always a
 * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
 * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and
 * teardown methods can be implemented. See
 * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
 * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
 *
 * @param <I> Type of the input elements.
 * @param <O> Type of the output elements.
 */
@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

	private static final long serialVersionUID = 1L;

	/**
	 * Process one element from the input stream.
	 *
	 * <p>This function can output zero or more elements using the {@link Collector} parameter
	 * and also update internal state or set timers using the {@link Context} parameter.
	 *
	 * @param value The input value.
	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
	 *            a {@link TimerService} for registering timers and querying the time. The
	 *            context is only valid during the invocation of this method, do not store it.
	 * @param out The collector for returning result values.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

	/**
	 * Called when a timer set using {@link TimerService} fires.
	 *
	 * @param timestamp The timestamp of the firing timer.
	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
	 *            querying the {@link TimeDomain} of the firing timer and getting a
	 *            {@link TimerService} for registering timers and querying the time.
	 *            The context is only valid during the invocation of this method, do not store it.
	 * @param out The collector for returning result values.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

	/**
	 * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
	 */
	public abstract class Context {

		/**
		 * Timestamp of the element currently being processed or timestamp of a firing timer.
		 *
		 * <p>This might be {@code null}, for example if the time characteristic of your program
		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
		 */
		public abstract Long timestamp();

		/**
		 * A {@link TimerService} for querying time and registering timers.
		 */
		public abstract TimerService timerService();

		/**
		 * Emits a record to the side output identified by the {@link OutputTag}.
		 *
		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
		 * @param value The record to emit.
		 */
		public abstract <X> void output(OutputTag<X> outputTag, X value);
	}

	/**
	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
	 */
	public abstract class OnTimerContext extends Context {
		/**
		 * The {@link TimeDomain} of the firing timer.
		 */
		public abstract TimeDomain timeDomain();
	}

}

© 著作权归作者所有

共有 人打赏支持
满小茂
粉丝 76
博文 120
码字总数 135036
作品 0
成都
程序员
私信 提问
Apache Flink 1.3.1 发布,通用数据处理平台

Apache Flink 1.3.1 发布了,这是 Apache Flink 1.3 系列的首个 bug 修复版本。该版本包含 50 个修复程序和对 Flink 1.3.0 的小改进。下面的列表包括所有修补程序的详细列表。建议用户升级至...

局长
2017/06/24
749
3
Apache Flink 1.7.1 发布,开源流处理框架

Apache Flink 1.7.1 已发布,此版本包括27项修复及针对 Flink 1.7.0 的小改进。建议所有用户升级。Apache Flink 是一个开源的流处理框架,应用于分布式、高性能、始终可用的、准确的数据流应...

王练
2018/12/23
0
0
Flink 实战 : 统计网站PV,UV

Flink 实战:统计网站PV,UV PV,UV PV(Page View) : 页面点击次数 UV(User View): 独立用户访问次数 假定需求如下,每间隔1分钟,统计过去5分钟的UV,PV。很容易想到,通过数据库的count...

moyiguke
2018/06/27
0
0
Flink技术源码解析(一):Flink概述与源码研读准备

一、前言 Apache Flink作为一款高吞吐量、低延迟的针对流数据和批数据的分布式实时处理引擎,是当前实时处理领域的一颗炙手可热的新星。关于Flink与其它主流实时大数据处理引擎Storm、Spark...

binggozhan
2018/06/06
0
0
Apache Flink 漫谈系列 - Watermark

实际问题(乱序) 在介绍Watermark相关内容之前我们先抛出一个具体的问题,在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响,比如:某数据源中的某些数据由于某种原因(...

金竹
2018/11/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

牛津词典 2018 年度词汇 ——「有毒」!

简评:本文并没有「标题党」,牛津词典公布的 2018 年度词汇就是 Toxic. 意为「有毒的」。 2018 was toxic. Toxic 这个词是什么意思呢? 牛津词典(Oxford Dictionaries)在 Word of the Da...

极光推送
16分钟前
1
0
浅谈Service Mesh体系中的Envoy

https://blog.csdn.net/yunqiinsight/article/details/81019255

易野
25分钟前
1
0
嵌入式应用选择合适的微控制器

准备所需硬件接口列表 使用微控制器的基本硬件框图,准备一份微控制器需要支持的所有外设接口的列表。微控制器中有两种常见的接口类型需要列出。第一种是通信接口,这些是外围设备,如USB,S...

linuxCool
33分钟前
3
0
Group by使用

概述 GROUP BY我们可以先从字面上来理解,GROUP表示分组,BY后面写字段名,就表示根据哪个字段进行分组,如果有用Excel比较多的话,GROUP BY比较类似Excel里面的透视表。 GROUP BY必须得配合...

小橙子的曼曼
44分钟前
4
0
机械臂写中文

Make Me a Hanzi https://www.skishore.me/makemeahanzi/ 使用uArm Swift Pro机械臂写中文-毛笔字 https://github.com/makelove/Robot_Arm_Write_Chinese...

itfanr
55分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部