flink join

原创
2022/02/13 23:46
阅读数 20

Window Join #

窗口连接连接两个流的元素,它们共享一个公共键,并且位于同一个窗口中。这些窗口可以通过使用窗口赋值器来定义,并在两个流的元素上求值。

然后将来自双方的元素传递给用户定义的JoinFunction或FlatJoinFunction,用户可以发出满足连接标准的结果。

一般的用法可以总结如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

关于语义学的几点注意事项:两个流的元素成对组合的创建行为就像一个内连接,这意味着如果一个流的元素没有来自另一个流的相应元素被连接,那么它们将不会被触发。那些被连接的元素的时间戳是仍然位于各自窗口中的最大时间戳。例如,一个边界为[5,10)的窗口将导致合并元素的时间戳为9。
在下一节中,我们将使用一些示例场景概述不同类型的窗口连接的行为。

翻窗连接#
当执行翻转窗口连接时,所有具有公共键和公共翻转窗口的元素都以成对组合的方式连接,并传递给JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流的元素如果在其翻转窗口中没有来自另一个流的元素,则不会被释放!

如图所示,我们定义了一个大小为2毫秒的翻转窗口,其结果是窗体[0,1],[2,3],....的窗口该图像显示了每个窗口中所有元素的成对组合,这些元素将被传递给JoinFunction。注意,在翻转窗口[6,7]中,什么都没有发射出来,因为绿色流中不存在与橙色元素⑥和⑦相结合的元素。

Java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

Sliding Window Join #

当执行滑动窗口连接时,所有具有公共键和公共滑动窗口的元素都以成对组合的方式连接,并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素如果没有来自另一个流的元素,则不会被触发!注意,一些元素可能在一个滑动窗口中被连接,但在另一个滑动窗口中却不能被连接!

在这个示例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,导致滑动窗口[- 1,0]、[0,1]、[1,2]、[2,3],....x轴下方被连接的元素是传递给每个滑动窗口的JoinFunction的元素。在这里你也可以看到,例如,在窗口[2,3]中,橙色②与绿色③连接,但没有与窗口[1,2]中的任何东西连接。

Java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

Session Window Join #

当执行会话窗口连接时,所有具有相同键的元素,当“combined”满足会话条件时,将以成对组合的方式连接,并传递给JoinFunction或FlatJoinFunction。这将再次执行内部连接,因此如果有一个会话窗口只包含来自一个流的元素,则不会发出输出!

在这里,我们定义了一个会话窗口联接,其中每个会话都被至少1ms的间隔所分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿流中没有元素,因此⑧和⑨没有被加入!

Java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

Interval Join #

间隔连接将两个流的元素(我们暂时称它们为A和B)用一个共同的键连接起来,其中流B中的元素的时间戳位于相对于流A中元素的时间戳的时间间隔内。

当一对元素被传递给ProcessJoinFunction时,它们将被分配两个元素的较大的时间戳(可以通过ProcessJoinFunction. context访问)。

interval join目前只支持事件时间。

In the example above, we join two streams ‘orange’ and ‘green’ with a lower bound of -2 milliseconds and an upper bound of +1 millisecond. Be default, these boundaries are inclusive, but .lowerBoundExclusive() and .upperBoundExclusive can be applied to change the behaviour.

Using the more formal notation again this will translate to

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

as indicated by the triangles.

Java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部