文档章节

聊聊flink的Table Formats

go4it
 go4it
发布于 02/04 11:16
字数 1023
阅读 229
收藏 0

本文主要研究一下flink的Table Formats

实例

CSV Format

.withFormat(
  new Csv()
    .field("field1", Types.STRING)    // required: ordered format fields
    .field("field2", Types.TIMESTAMP)
    .fieldDelimiter(",")              // optional: string delimiter "," by default
    .lineDelimiter("\n")              // optional: string delimiter "\n" by default
    .quoteCharacter('"')              // optional: single character for string values, empty by default
    .commentPrefix('#')               // optional: string to indicate comments, empty by default
    .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
    .ignoreParseErrors()              // optional: skip records with parse error instead of failing by default
)
  • flink内置支持csv format,无需添加额外依赖

JSON Format

.withFormat(
  new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // required: define the schema either by using type information which parses numbers to corresponding types
    .schema(Type.ROW(...))

    // or by using a JSON schema which parses to DECIMAL and TIMESTAMP
    .jsonSchema(
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
      "}"
    )

    // or use the table's schema
    .deriveSchema()
)
  • 可以使用schema或者jsonSchema或者deriveSchema来定义json format,需要额外添加flink-json依赖

Apache Avro Format

.withFormat(
  new Avro()

    // required: define the schema either by using an Avro specific record class
    .recordClass(User.class)

    // or by using an Avro schema
    .avroSchema(
      "{" +
      "  \"type\": \"record\"," +
      "  \"name\": \"test\"," +
      "  \"fields\" : [" +
      "    {\"name\": \"a\", \"type\": \"long\"}," +
      "    {\"name\": \"b\", \"type\": \"string\"}" +
      "  ]" +
      "}"
    )
)
  • 可以使用recordClass或者avroSchema来定义Avro schema,需要添加flink-avro依赖

ConnectTableDescriptor

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala

abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
    private val tableEnv: TableEnvironment,
    private val connectorDescriptor: ConnectorDescriptor)
  extends TableDescriptor
  with SchematicDescriptor[D]
  with RegistrableDescriptor { this: D =>

  private var formatDescriptor: Option[FormatDescriptor] = None
  private var schemaDescriptor: Option[Schema] = None

  //......

  override def withFormat(format: FormatDescriptor): D = {
    formatDescriptor = Some(format)
    this
  }

  //......
}
  • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor;ConnectTableDescriptor提供了withFormat方法,返回FormatDescriptor

FormatDescriptor

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/FormatDescriptor.java

@PublicEvolving
public abstract class FormatDescriptor extends DescriptorBase implements Descriptor {

	private String type;

	private int version;

	/**
	 * Constructs a {@link FormatDescriptor}.
	 *
	 * @param type string that identifies this format
	 * @param version property version for backwards compatibility
	 */
	public FormatDescriptor(String type, int version) {
		this.type = type;
		this.version = version;
	}

	@Override
	public final Map<String, String> toProperties() {
		final DescriptorProperties properties = new DescriptorProperties();
		properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type);
		properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version);
		properties.putProperties(toFormatProperties());
		return properties.asMap();
	}

	/**
	 * Converts this descriptor into a set of format properties. Usually prefixed with
	 * {@link FormatDescriptorValidator#FORMAT}.
	 */
	protected abstract Map<String, String> toFormatProperties();
}
  • FormatDescriptor是个抽象类,Csv、Json、Avro都是它的子类

Csv

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Csv.scala

class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {

  private var fieldDelim: Option[String] = None
  private var lineDelim: Option[String] = None
  private val schema: mutable.LinkedHashMap[String, String] =
    mutable.LinkedHashMap[String, String]()
  private var quoteCharacter: Option[Character] = None
  private var commentPrefix: Option[String] = None
  private var isIgnoreFirstLine: Option[Boolean] = None
  private var lenient: Option[Boolean] = None

  def fieldDelimiter(delim: String): Csv = {
    this.fieldDelim = Some(delim)
    this
  }

  def lineDelimiter(delim: String): Csv = {
    this.lineDelim = Some(delim)
    this
  }

  def schema(schema: TableSchema): Csv = {
    this.schema.clear()
    schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
      field(n, t)
    }
    this
  }

  def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
    field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
    this
  }

  def field(fieldName: String, fieldType: String): Csv = {
    if (schema.contains(fieldName)) {
      throw new ValidationException(s"Duplicate field name $fieldName.")
    }
    schema += (fieldName -> fieldType)
    this
  }

  def quoteCharacter(quote: Character): Csv = {
    this.quoteCharacter = Option(quote)
    this
  }

  def commentPrefix(prefix: String): Csv = {
    this.commentPrefix = Option(prefix)
    this
  }

  def ignoreFirstLine(): Csv = {
    this.isIgnoreFirstLine = Some(true)
    this
  }

  def ignoreParseErrors(): Csv = {
    this.lenient = Some(true)
    this
  }

  override protected def toFormatProperties: util.Map[String, String] = {
    val properties = new DescriptorProperties()

    fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
    lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))

    val subKeys = util.Arrays.asList(
      DescriptorProperties.TABLE_SCHEMA_NAME,
      DescriptorProperties.TABLE_SCHEMA_TYPE)

    val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJava

    properties.putIndexedFixedProperties(
      FORMAT_FIELDS,
      subKeys,
      subValues)
    quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
    commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
    isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
    lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))

    properties.asMap()
  }
}
  • Csv提供了field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors等方法

Json

flink-json-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Json.java

public class Json extends FormatDescriptor {

	private Boolean failOnMissingField;
	private Boolean deriveSchema;
	private String jsonSchema;
	private String schema;

	public Json() {
		super(FORMAT_TYPE_VALUE, 1);
	}

	public Json failOnMissingField(boolean failOnMissingField) {
		this.failOnMissingField = failOnMissingField;
		return this;
	}

	public Json jsonSchema(String jsonSchema) {
		Preconditions.checkNotNull(jsonSchema);
		this.jsonSchema = jsonSchema;
		this.schema = null;
		this.deriveSchema = null;
		return this;
	}

	public Json schema(TypeInformation<Row> schemaType) {
		Preconditions.checkNotNull(schemaType);
		this.schema = TypeStringUtils.writeTypeInfo(schemaType);
		this.jsonSchema = null;
		this.deriveSchema = null;
		return this;
	}

	public Json deriveSchema() {
		this.deriveSchema = true;
		this.schema = null;
		this.jsonSchema = null;
		return this;
	}

	@Override
	protected Map<String, String> toFormatProperties() {
		final DescriptorProperties properties = new DescriptorProperties();

		if (deriveSchema != null) {
			properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema);
		}

		if (jsonSchema != null) {
			properties.putString(FORMAT_JSON_SCHEMA, jsonSchema);
		}

		if (schema != null) {
			properties.putString(FORMAT_SCHEMA, schema);
		}

		if (failOnMissingField != null) {
			properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField);
		}

		return properties.asMap();
	}
}
  • Json提供了schema、jsonSchema、deriveSchema三种方式来定义json format

Avro

flink-avro-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Avro.java

public class Avro extends FormatDescriptor {

	private Class<? extends SpecificRecord> recordClass;
	private String avroSchema;

	public Avro() {
		super(AvroValidator.FORMAT_TYPE_VALUE, 1);
	}

	public Avro recordClass(Class<? extends SpecificRecord> recordClass) {
		Preconditions.checkNotNull(recordClass);
		this.recordClass = recordClass;
		return this;
	}

	public Avro avroSchema(String avroSchema) {
		Preconditions.checkNotNull(avroSchema);
		this.avroSchema = avroSchema;
		return this;
	}

	@Override
	protected Map<String, String> toFormatProperties() {
		final DescriptorProperties properties = new DescriptorProperties();

		if (null != recordClass) {
			properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass);
		}
		if (null != avroSchema) {
			properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema);
		}

		return properties.asMap();
	}
}
  • Avro提供了recordClass、avroSchema两种方式来定义avro format

小结

  • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor
  • ConnectTableDescriptor提供了withFormat方法,返回FormatDescriptor;FormatDescriptor是个抽象类,Csv、Json、Avro都是它的子类
  • Csv提供了field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors等方法;Json提供了schema、jsonSchema、deriveSchema三种方式来定义json format;Avro提供了recordClass、avroSchema两种方式来定义avro format

doc

© 著作权归作者所有

go4it
粉丝 89
博文 1096
码字总数 1034311
作品 0
深圳
私信 提问
聊聊flink的CsvTableSink

序 本文主要研究一下flink的CsvTableSink TableSink flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scala TableSink定义了getOutputType、getFieldNames、g......

go4it
02/06
31
0
聊聊flink Table的OrderBy及Limit

序 本文主要研究一下flink Table的OrderBy及Limit 实例 orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch Table flink-table_2.11-1.7.0-source...

go4it
01/31
23
0
Apache Flink 1.3.1 发布,通用数据处理平台

Apache Flink 1.3.1 发布了,这是 Apache Flink 1.3 系列的首个 bug 修复版本。该版本包含 50 个修复程序和对 Flink 1.3.0 的小改进。下面的列表包括所有修补程序的详细列表。建议用户升级至...

局长
2017/06/24
820
3
聊聊flink Table的Over Windows

序 本文主要研究一下flink Table的Over Windows 实例 Over Windows类似SQL的over子句,它可以基于event-time、processing-time或者row-count;具体可以通过Over类来构造,其中必须设置order...

go4it
01/27
37
0
聊聊flink Table的select操作

序 本文主要研究一下flink Table的select操作 Table.select flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala Table提供了两个select方法,一个接收String参数,......

go4it
01/23
148
0

没有更多内容

加载失败,请刷新页面

加载更多

CSS--列表

一、列表标识项 list-style-type none:去掉标识项 disc:默认实心圆 circle:空心圆 squire:矩形 二、列表项图片 list-style-img: 取值:url(路径) 三、列表项位置 list-style-position:...

wytao1995
今天
4
0
linux 命令-文本比较comm、diff、patch

本文原创首发于公众号:编程三分钟 今天学了三个文本比较的命令分享给大家。 comm comm 命令比较相同的文本 $ cat charabc$ cat chardiffadc 比如,我有两个文件char和chardiff如上,...

编程三分钟
今天
7
0
QML教程

https://blog.csdn.net/qq_40194498/article/category/7580030 https://blog.csdn.net/LaineGates/article/details/50887765...

shzwork
今天
5
0
HA Cluster之5

对于使用heartbeat v2版的CRM配置的集群信息都是保存在一个名为cib.xml的配置文件中,存放在/var/lib/heartbeat/crm/下。CIB:Cluster Information Base,由于xml文件配置不是那么方便,所以...

lhdzw
今天
6
0
玩转Redis-Redis基础数据结构及核心命令

  《玩转Redis》系列文章主要讲述Redis的基础及中高级应用,文章基于Redis5.0.4+。本文主要讲述Redis的数据结构String,《玩转Redis-Redis基础数据结构及核心命令》相关操作命令为方便对比...

zxiaofan666
今天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部