文档章节

聊聊flink的Table API及SQL Programs

go4it
 go4it
发布于 01/21 12:52
字数 1506
阅读 12
收藏 0

本文主要研究一下flink的Table API及SQL Programs

实例

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register a Table
tableEnv.registerTable("table1", ...)            // or
tableEnv.registerTableSource("table2", ...);     // or
tableEnv.registerExternalCatalog("extCat", ...);
// register an output Table
tableEnv.registerTableSink("outputTable", ...);

// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable");

// execute
env.execute();
  • 本实例展示了flink的Table API及SQL Programs的基本用法

Table API实例

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table

// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query
  • 通过tableEnv.scan方法来创建Table,之后使用Table的各种查询api

sqlQuery实例

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// emit or convert Table
// execute query
  • sqlQuery内部是使用Apache Calcite来实现的

sqlUpdate实例(TableSink)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// execute query
  • 这里使用TableSink注册output table之后,就可以使用TableEnvironment的sqlUpdate方法sink到output table

insertInto实例(TableSink)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// register the TableSink with a specific schema
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);

// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");

// execute the program
  • 通过Table.insertInto方法sink到output table

DataStream(或DataSet)与Table转换

注册DataStream为Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
  • 通过StreamTableEnvironment.registerDataStream注册DataStream为Table

DataStream转Table实例

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
  • 这里通过StreamTableEnvironment.fromDataStream将DataStream转为Table

Table转DataStream实例


// get StreamTableEnvironment. 
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table with two fields (String name, Integer age)
Table table = ...

// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

// convert the Table into an append DataStream of Tuple2<String, Integer> 
//   via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);
  • 这里通过StreamTableEnvironment.toRetractStream将Table转换为DataStream

Table转DataSet实例

// get BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table with two fields (String name, Integer age)
Table table = ...

// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);

// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toDataSet(table, tupleType);
  • 这里通过BatchTableEnvironment.toDataSet将Table转换为DataSet

Data Types与Table Schema映射

Position-based Mapping(Tuple类型)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

//---Tuple类型---

DataStream<Tuple2<Long, Integer>> stream = ...

// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field names "myLong" and "myInt"
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
  • Position-based的映射要求新指定的字段名不能与input data type重名,如果没有指定,则默认从f0开始来命名原始类型;此模式适用于Tuple、Row类型,POJO类型不能使用此模式

Name-based Mapping(POJO类型)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

//---Tuple类型---

DataStream<Tuple2<Long, Integer>> stream = ...

// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, "f1");

// convert DataStream into Table with swapped fields
Table table = tableEnv.fromDataStream(stream, "f1, f0");

// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");

//---POJO类型---

// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
  • Tuple或者POJO类型都可以使用这种模式,也可以使用as进行别名

Atomic类型

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Long> stream = ...

// convert DataStream into Table with default field name "f0"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
  • 原始类型被转换为单个字段

Row类型

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...

// convert DataStream into Table with default field names "name", "age"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, "myName, myAge");

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
  • Row类型支持任意数量的字段,并允许字段值为null,它可以使用Position-based Mapping及Name-based Mapping

小结

flink的Table API及SQL Programs的基本用法

  • 首先是创建TableEnvironment(BatchTableEnvironment或者StreamTableEnvironment),之后就是创建Table或者TableSource并注册到catalog(默认使用的catalog是internal的,也可以自己选择注册external catalog),然后就进行table的query,之后就是一些转换操作
  • 关于Table的创建可以从DataSet、DataStream转换过来;关于Table的查询可以使用api query(scan方法),也可以使用sql query(sqlQuery方法),或者是混合使用
  • 也可以将查询的Table转换为DataSet或者DataStream进行其他处理;如果输出也是输出到table的话,可以注册TableSink,然后使用TableEnvironment的sqlUpdate方法或Table的insertInto方法输出到TableSink

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 77
博文 889
码字总数 800584
作品 0
深圳
私信 提问
聊聊flink Table的OrderBy及Limit

序 本文主要研究一下flink Table的OrderBy及Limit 实例 orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch Table flink-table_2.11-1.7.0-source...

go4it
01/31
0
0
聊聊flink的CsvTableSource

序 本文主要研究一下flink的CsvTableSource TableSource flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala TableSource定义了三个方法,分别是getRe......

go4it
02/05
0
0
Apache Flink 1.3.1 发布,通用数据处理平台

Apache Flink 1.3.1 发布了,这是 Apache Flink 1.3 系列的首个 bug 修复版本。该版本包含 50 个修复程序和对 Flink 1.3.0 的小改进。下面的列表包括所有修补程序的详细列表。建议用户升级至...

局长
2017/06/24
749
3
聊聊flink Table的Over Windows

序 本文主要研究一下flink Table的Over Windows 实例 Over Windows类似SQL的over子句,它可以基于event-time、processing-time或者row-count;具体可以通过Over类来构造,其中必须设置order...

go4it
01/27
0
0
聊聊flink的TableFunction

序 本文主要研究一下flink的TableFunction 实例 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里...

go4it
02/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

精品书籍推荐

JavaScript书籍推荐 1、[JavaScript高级程序设计(第3版)] 2、你不知道的JavaScript(中卷) 3、ES6标准入门(第二版)阮一峰

轻轻的往前走
14分钟前
2
0
JVM(六)为什么新生代有两个Survivor分区?

本文会使用排除法的手段,来讲解新生代的区域划分,从而让读者能够更清晰的理解分代回收器的原理,在开始之前我们先来整体认识一下分代收集器。 分代收集器会把内存空间分为:老生代和新生代...

王磊的博客
19分钟前
5
0
程序员最喜欢的15款文本编辑器推荐

程序员最喜欢的15款文本编辑器推荐 2017年09月18日 17:30:50 kangle_zhu 阅读数:59390 转载地址:http://www.cr173.com/html/50553_1.html 很多时候比如编程查看代码或者打开各种文档下我们...

linjin200
21分钟前
6
0
如何在php后端及时推送消息给客户端

walkor大神,目前需求是这样的: 有一群商家在后台网页处理批量导入产品 -》 服务器接受请求 -》 开始foreach一个一个处理导入请求; 我现在想每成功导入一个就推送到前台显示已经导入成功,...

dragon_tech
40分钟前
13
0
Java利用hanlp完成语句相似度分析的案例详解

分享一篇hanlp分词工具使用的小案例,即利用hanlp分词工具分析两个中文语句的相似度的案例。供大家一起学习参考! 在做考试系统需求时,后台题库系统提供录入题目的功能。在录入题目的时候,...

左手的倒影
46分钟前
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部