文档章节

聊聊flink TableEnvironment的scan操作

go4it
 go4it
发布于 01/22 10:29
字数 445
阅读 16
收藏 0

本文主要研究一下flink TableEnvironment的scan操作

实例

//Scanning a directly registered table
val tab: Table = tableEnv.scan("tableName")

//Scanning a table from a registered catalog
val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
  • scan操作用于从schema读取指定的table,也可以传入catalogName及dbName从指定的catalog及db读取

TableEnvironment.scan

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scala

abstract class TableEnvironment(val config: TableConfig) {

  private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
  private val rootSchema: SchemaPlus = internalSchema.plus()

  //......

  @throws[TableException]
  @varargs
  def scan(tablePath: String*): Table = {
    scanInternal(tablePath.toArray) match {
      case Some(table) => table
      case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.")
    }
  }

  private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
    require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
    val schemaPaths = tablePath.slice(0, tablePath.length - 1)
    val schema = getSchema(schemaPaths)
    if (schema != null) {
      val tableName = tablePath(tablePath.length - 1)
      val table = schema.getTable(tableName)
      if (table != null) {
        return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory))))
      }
    }
    None
  }

  private def getSchema(schemaPath: Array[String]): SchemaPlus = {
    var schema = rootSchema
    for (schemaName <- schemaPath) {
      schema = schema.getSubSchema(schemaName)
      if (schema == null) {
        return schema
      }
    }
    schema
  }

  //......
}
  • scan方法内部调用的是scanInternal,scanInternal首先读取catalog及db信息,然后调用getSchema方法来获取schema
  • getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
  • 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table

小结

  • TableEnvironment的scan操作就是从Schema中查找Table,可以使用tableName,或者额外指定catalog及db来查找
  • getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
  • 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 77
博文 889
码字总数 800584
作品 0
深圳
私信 提问
聊聊flink Table的Set Operations

序 本文主要研究一下flink Table的Set Operations 实例 Union union方法类似sql的union UnionAll unionAll方法类似sql的union all Intersect intersect方法类似sql的intersect IntersectAll ......

go4it
01/30
0
0
聊聊flink Table的select操作

序 本文主要研究一下flink Table的select操作 Table.select flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala Table提供了两个select方法,一个接收String参数,......

go4it
01/23
0
0
聊聊flink Table的OrderBy及Limit

序 本文主要研究一下flink Table的OrderBy及Limit 实例 orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch Table flink-table_2.11-1.7.0-source...

go4it
01/31
0
0
聊聊flink的TableFactory

序 本文主要研究一下flink的TableFactory 实例 本实例定义了MySystemTableSourceFactory,它的requiredContext为update-mode=append及connector.type=my-system,它的supportedProperties为c......

go4it
02/07
0
0
聊聊flink Table的Over Windows

序 本文主要研究一下flink Table的Over Windows 实例 Over Windows类似SQL的over子句,它可以基于event-time、processing-time或者row-count;具体可以通过Over类来构造,其中必须设置order...

go4it
01/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

linux在线安装JDK(1.8版本)

linux在线安装JDK(1.8版本) 2018年07月03日 15:36:27 唯爱酒神 阅读数:806 标签: linux jdk安装 jdk安装 更多 个人分类: linux 在线下载JDK 命令: wget --no-check-certificate --no-c...

rootliu
42分钟前
1
0
移植Modbus到STM32F103(2):移植FreeModbus到usart3并运行示例代码

FreeModbus是Modbus的一个被广泛移植的实现。其源码在github,最新版是1.6。 FreeModbus支持Modbus功能码里的0x01~0x06,0x0F~0x11和0x17,对一些功能比如异常诊断和读事件计数等功能码并没有...

Konstantine
今天
3
0
浅谈神经网络(神经网络篇)

背景 之前写过浅谈神经网络基础篇,简单介绍下机器学习这块内容,用于扫盲。本文正式将神经网络,这部分是深度学习的基础。了解完可以掌握强大的机器学习的方法,也可以更好的了解深度学习。...

Uknowzheng
今天
5
0
移动硬盘变为RAW格式后的修复

在Mac上使用自己的移动硬盘结果文件系统格式变为RAW; 在自己windows笔记本上使用chkdsk H: /F进行修复,修复日志如下: C:\Users\mengzhang6>chkdsk H: /F文件系统的类型是 NTFS。卷标是 do...

晨猫
今天
7
0
10 Git —— 标签管理

10 Git —— 标签管理 本节内容: 命令git tag <tagname>用于新建一个标签,默认为HEAD,也可以指定一个commit id;命令git tag -a <tagname> -m "blablabla..."可以指定标签信息;命令git......

lwenhao
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部