文档章节

聊聊flink的Execution Plan Visualization

go4it
 go4it
发布于 2019/02/13 14:13
字数 555
阅读 61
收藏 0

本文主要研究一下flink的Execution Plan Visualization

实例

代码

    @Test
    public void testExecutionPlan(){
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS)
                .flatMap(new WordCountTest.Tokenizer())
                .keyBy(0)
                .sum(1);
        dataStream.print();
        System.out.println(env.getExecutionPlan());
    }

json

{
  "nodes": [
    {
      "id": 1,
      "type": "Source: Collection Source",
      "pact": "Data Source",
      "contents": "Source: Collection Source",
      "parallelism": 1
    },
    {
      "id": 2,
      "type": "Flat Map",
      "pact": "Operator",
      "contents": "Flat Map",
      "parallelism": 4,
      "predecessors": [
        {
          "id": 1,
          "ship_strategy": "REBALANCE",
          "side": "second"
        }
      ]
    },
    {
      "id": 4,
      "type": "Keyed Aggregation",
      "pact": "Operator",
      "contents": "Keyed Aggregation",
      "parallelism": 4,
      "predecessors": [
        {
          "id": 2,
          "ship_strategy": "HASH",
          "side": "second"
        }
      ]
    },
    {
      "id": 5,
      "type": "Sink: Print to Std. Out",
      "pact": "Data Sink",
      "contents": "Sink: Print to Std. Out",
      "parallelism": 4,
      "predecessors": [
        {
          "id": 4,
          "ship_strategy": "FORWARD",
          "side": "second"
        }
      ]
    }
  ]
}

可视化

打开flink plan visualizer将上面的json,输入到文本框,点击Draw进行可视化如下:

StreamExecutionEnvironment.getExecutionPlan

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java

@Public
public abstract class StreamExecutionEnvironment {
	//......

	/**
	 * Creates the plan with which the system will execute the program, and
	 * returns it as a String using a JSON representation of the execution data
	 * flow graph. Note that this needs to be called, before the plan is
	 * executed.
	 *
	 * @return The execution plan of the program, as a JSON String.
	 */
	public String getExecutionPlan() {
		return getStreamGraph().getStreamingPlanAsJSON();
	}

	/**
	 * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
	 *
	 * @return The streamgraph representing the transformations
	 */
	@Internal
	public StreamGraph getStreamGraph() {
		if (transformations.size() <= 0) {
			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
		}
		return StreamGraphGenerator.generate(this, transformations);
	}

	//......
}
  • StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph;之后就是调用StreamGraph.getStreamingPlanAsJSON来获取json格式的execution plan

StreamGraph.getStreamingPlanAsJSON

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java

@Internal
public class StreamGraph extends StreamingPlan {

	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);

	private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;

	private final StreamExecutionEnvironment environment;
	private final ExecutionConfig executionConfig;
	private final CheckpointConfig checkpointConfig;

	private boolean chaining;

	private Map<Integer, StreamNode> streamNodes;
	private Set<Integer> sources;
	private Set<Integer> sinks;
	private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
	private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
	private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;

	protected Map<Integer, String> vertexIDtoBrokerID;
	protected Map<Integer, Long> vertexIDtoLoopTimeout;
	private StateBackend stateBackend;
	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;

	//......

	public String getStreamingPlanAsJSON() {
		try {
			return new JSONGenerator(this).getJSON();
		}
		catch (Exception e) {
			throw new RuntimeException("JSON plan creation failed", e);
		}
	}

	//......
}
  • StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan

小结

  • flink提供了flink plan visualizer的在线地址,用于进行execution plan的可视化,它接收json形式的execution plan
  • StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph
  • StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1217
码字总数 1133972
作品 0
深圳
私信 提问
聊聊flink的Parallel Execution

序 本文主要研究一下flink的Parallel Execution 实例 Operator Level operators、data sources、data sinks都可以调用setParallelism()方法来设置parallelism Execution Environment Level 在......

go4it
2019/02/12
58
0
聊聊flink LocalEnvironment的execute方法

序 本文主要研究一下flink LocalEnvironment的execute方法 实例 这里使用DataSet从csv读取数据,然后进行flatMap、groupBy、sum操作,最后调用print输出 DataSet.print flink-java-1.6.2-sou...

go4it
2018/11/21
110
0
聊聊flink的RichParallelSourceFunction

序 本文主要研究一下flink的RichParallelSourceFunction RichParallelSourceFunction RichParallelSourceFunction实现了ParallelSourceFunction接口,同时继承了AbstractRichFunction Parall......

go4it
2018/11/28
106
0
聊聊flink的ScheduledExecutor

序 本文主要研究一下flink的ScheduledExecutor Executor java.base/java/util/concurrent/Executor.java jdk的Executor接口定义了execute方法,接收参数类型为Runnable ScheduledExecutor fl......

go4it
2019/03/13
24
0
Apache Flink 零基础入门(六)Flink核心概念

Flink程序是在分布式集合上实现转换的常规程序(例如filtering, mapping, updating state, joining, grouping, defining windows, aggregating)。集合最初是从sources上创建,这些源包括:本...

Vincent-Duan
2019/08/29
57
0

没有更多内容

加载失败,请刷新页面

加载更多

CentOS-启用SFTP

创建用户组及用户 $ groupadd sftp $ useradd -g sftp -s /sbin/nologin -d /home/sftp sftp 设置密码 $ passwd sftp 输入密码(123456) 确认密码 修改sshd_config文件 $ vim /etc/ssh/sshd_......

自由人生-ZYRS
20分钟前
11
0
这个IM项目没时间搞了,开源算了。10万并发,基于golang。

先上效果 安装方法 本系统升级到golang1.12,请开启如下支持 #开启go mod支持export GO111MODULE=on#使用代理export GOPROXY=https://goproxy.io 1.下载项目 git clone https://github.c...

非正式解决方案
24分钟前
6
0
Mysql基本操作

查看mysql中已经有的数据库 二、删除已经有的数据库school 三、创建新数据库myschool 四、进入到myschool中 五、查看myschool库中所有的表 六、新建一张student表 七、查看student表结构 八、...

愚蠢的土豆
24分钟前
8
0
经典检索算法:BM25

BM25算法是一种常见用来做相关度打分的公式 思路比较简单,主要就是计算一个query里面所有词和文档的相关度, 然后在把分数做累加操作 而每个词的相关度分数主要还是受到tf/idf的影响 其实就...

Java搬砖工程师
31分钟前
5
0
详解mycat+haproxy+keepalived搭建高可用负载均衡mysql集群

概述 目前业界对数据库性能优化普遍采用集群方式,而oracle集群软硬件投入昂贵,mysql则比较推荐用mycat去搭建数据库集群,下面介绍一下怎么用mycat+haproxy+keepalived搭建一个属于mysql数据...

小致Daddy
32分钟前
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部