如何基于Apache SeaTunnel 读取Oracle的数据

原创
10/31 15:57
阅读数 96
AI总结

引言

在大数据时代,企业面临着数据的快速增长和多样化需求,如何高效地处理和整合来自不同数据源的数据成为了关键问题。

Apache SeaTunnel作为一款开源数据集成工具,提供了灵活的数据处理和实时数据同步能力,广泛应用于数据仓库、数据湖及实时分析场景中。与此同时,Oracle 数据库以其高性能和可靠性,成为许多企业数据存储的首选。结合 Apache SeaTunnel 与 Oracle 数据库,可以实现高效的数据迁移与转换。

Apache SeaTunnel

Apache SeaTunnel 是一款数据集成工具,支持批处理和流处理。它允许用户通过简单的配置实现数据的抽取、转换和加载(ETL)。SeaTunnel 提供了多种连接器,能够轻松集成不同的数据源和目标,包括关系型数据库、NoSQL 数据库、文件系统等。其灵活性和扩展性使其成为企业数据集成的重要选择。

Oracle 数据库

Oracle 数据库是一款广泛使用的关系型数据库管理系统,以其强大的事务处理能力和安全性而著称。它适用于处理大型企业级应用程序的数据存储需求。Oracle 支持多种数据类型和复杂查询,能够高效地管理和分析大量数据,适合用于金融、电信、医疗等行业。

Oracle JDBC 连接器概述

连接器描述

Oracle JDBC 连接器通过 JDBC 方式读取外部数据源的数据,支持 Apache SeaTunnel 的多种引擎,包括 Spark、Flink 和 SeaTunnel Zeta。

使用依赖

Spark/Flink 引擎

  1. 确保将 JDBC 驱动 jar 包 放置在 ${SEATUNNEL_HOME}/plugins/ 目录中。
  2. 为支持国际化字符集,复制 orai18n.jar${SEATUNNEL_HOME}/plugins/ 目录。

SeaTunnel Zeta 引擎

  1. 确保将 JDBC 驱动 jar 包 放置在 ${SEATUNNEL_HOME}/lib/ 目录中。
  2. 为支持国际化字符集,复制 orai18n.jar${SEATUNNEL_HOME}/lib/ 目录。

主要特性

  • 批处理支持
  • 精确一次(exactly-once)语义
  • 列投影(column projection)
  • 并行处理(parallelism)
  • 支持用户定义的分割(user-defined split)

支持的数据源信息

| 数据源 | 支持版本 | 驱动 | URL | Maven | |---------|------------------------------|--------------------------|-------------------------------------|----------------------------------------------------------| | Oracle | 依赖版本不同有不同驱动类 | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@datasource01:1523:xe | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |

数据库依赖

请下载对应于 Maven 的支持列表,并将其复制到 ${SEATUNNEL_HOME}/plugins/jdbc/lib/ 工作目录中。

例如 Oracle 数据源:

cp ojdbc8-xxxxxx.jar $SEATUNNEL_HOME/lib/

为支持国际化字符集,复制 orai18n.jar${SEATUNNEL_HOME}/lib/ 目录中。

数据类型映射

| Oracle 数据类型 | SeaTunnel 数据类型 | |------------------------------------------------------|---------------------| | INTEGER | DECIMAL(38,0) | | FLOAT | DECIMAL(38,18) | | NUMBER(precision <= 9, scale == 0) | INT | | NUMBER(9 < precision <= 18, scale == 0) | BIGINT | | NUMBER(18 < precision, scale == 0) | DECIMAL(38,0) | | NUMBER(scale != 0) | DECIMAL(38,18) | | BINARY_DOUBLE | DOUBLE | | BINARY_FLOAT
REAL | FLOAT | | CHAR
NCHAR
VARCHAR
NVARCHAR2
VARCHAR2
LONG
ROWID
NCLOB
CLOB
XML | STRING | | DATE | TIMESTAMP | | TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP | | BLOB
RAW
LONG RAW
BFILE | BYTES |

源选项

| 名称 | 类型 | 是否必需 | 默认值 | 描述 | |------------------------------------------|------------|----------|-----------------|--------------------------------------------------------| | url | String | 是 | - | JDBC 连接的 URL。例如:jdbc:oracle:thin:@datasource01:1523:xe | | driver | String | 是 | - | 用于连接远程数据源的 JDBC 类名,例如 oracle.jdbc.OracleDriver | | user | String | 否 | - | 连接实例的用户名 | | password | String | 否 | - | 连接实例的密码 | | query | String | 是 | - | 查询语句 | | connection_check_timeout_sec | Int | 否 | 30 | 验证连接时的超时时间(秒) | | partition_column | String | 否 | - | 用于并行处理的分区列名,仅支持数值类型主键 | | partition_lower_bound | BigDecimal | 否 | - | 扫描的 partition_column 最小值 | | partition_upper_bound | BigDecimal | 否 | - | 扫描的 partition_column 最大值 | | partition_num | Int | 否 | job parallelism | 分区数量,仅支持正整数,默认值为作业并行性 | | fetch_size | Int | 否 | 0 | 查询时的行抓取大小,0 表示使用 JDBC 默认值 | | properties | Map | 否 | - | 其他连接配置参数 |

| 名称 | 类型 | 是否必需 | 默认值 | 描述 | |--------------------------------------------|------------|----------|-----------------|--------------------------------------------------------| | url | String | 是 | - | JDBC 连接的 URL,例如:jdbc:mysql://localhost:3306/test | | driver | String | 是 | - | 用于连接远程数据源的 JDBC 类名,例如 MySQL 为 com.mysql.cj.jdbc.Driver。 | | user | String | 否 | - | 连接实例的用户名 | | password | String | 否 | - | 连接实例的密码 | | query | String | 是 | - | 查询语句 | | connection_check_timeout_sec | Int | 否 | 30 | 验证连接时的超时时间(秒) | | partition_column | String | 否 | - | 用于并行处理的分区列名,仅支持数值类型主键,且只能配置一列。 | | partition_lower_bound | BigDecimal | 否 | - | 扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。 | | partition_upper_bound | BigDecimal | 否 | - | 扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。 | | partition_num | Int | 否 | job parallelism | 分区数量,仅支持正整数,默认值为作业并行性。 | | fetch_size | Int | 否 | 0 | 查询时的行抓取大小,0 表示使用 JDBC 默认值。 | | properties | Map | 否 | - | 其他连接配置参数,当 properties 和 URL 有相同参数时,优先级由具体驱动的实现决定。 | | table_path | String | 否 | - | 表的完整路径,可以使用该配置替代 query。例如: MySQL 为 testdb.table1,Oracle 为 test_schema.table1。 | | table_list | Array | 否 | - | 要读取的表列表,可以使用该配置替代 table_path。例如: [{ table_path = "testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name from testdb.table2"}] | | where_condition | String | 否 | - | 所有表/查询的通用行过滤条件,必须以 where 开头,例如 where id > 100。 | | split.size | Int | 否 | 8096 | 表的拆分大小(行数),捕获的表在读取时被拆分为多个部分。 | | split.even-distribution.factor.lower-bound | Double | 否 | 0.05 | 块键分布因子的下限,用于判断表数据是否均匀分布。 | | split.even-distribution.factor.upper-bound | Double | 否 | 100 | 块键分布因子的上限,用于判断表数据是否均匀分布。 | | split.sample-sharding.threshold | Int | 否 | 10000 | 触发样本分片策略的估计分片数量阈值。 | | split.inverse-sampling.rate | Int | 否 | 1000 | 样本分片策略中使用的采样率的反值。 | | common-options | | 否 | - | 源插件通用参数,详细信息请参考 Source Common Options。 |

并行读取

JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用一定规则拆分表中的数据,并将其交给读取器进行读取。读取器的数量由 parallelism 选项决定。

拆分键规则

  1. 如果 partition_column 不为空,则使用该列进行计算。该列必须为 支持的拆分数据类型
  2. 如果 partition_column 为空,SeaTunnel 将读取表的模式并获取主键和唯一索引。如果主键和唯一索引中有多个列,则使用第一个支持的拆分数据类型列进行拆分。例如,表的主键为 (guid, name),因为 guid 不在 支持的拆分数据类型 中,因此使用 name 列进行拆分。

支持的拆分数据类型

  • String
  • Number (int, bigint, decimal, ...)
  • Date

拆分相关选项

split.size

每个拆分中包含的行数,捕获的表在读取时被拆分为多个部分。

split.even-distribution.factor.lower-bound

不推荐使用

块键分布因子的下限。用于判断表数据是否均匀分布。如果计算出的分布因子大于或等于该下限(即 (MAX(id) - MIN(id) + 1) / row count),则表块将被优化为均匀分布。否则,如果分布因子小于下限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold 指定的值,将使用基于采样的分片策略。默认值为 0.05。

split.even-distribution.factor.upper-bound

不推荐使用

块键分布因子的上限。用于判断表数据是否均匀分布。如果计算出的分布因子小于或等于该上限(即 (MAX(id) - MIN(id) + 1) / row count),则表块将被优化为均匀分布。否则,如果分布因子大于上限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold 指定的值,将使用基于采样的分片策略。默认值为 100.0。

split.sample-sharding.threshold

此配置指定估计分片数量的阈值,以触发样本分片策略。当分布因子超出指定的上下限时,且估计的分片数量(计算为近似行数 / 块大小)超过此阈值时,将使用样本分片策略。这可以帮助更高效地处理大型数据集。默认值为 1000 个分片。

split.inverse-sampling.rate

样本分片策略中使用的采样率的反值。例如,如果该值设置为 1000,则在采样过程中应用 1/1000 的采样率。此选项提供灵活性,以控制采样的粒度,从而影响最终的分片数量。尤其在处理非常大的数据集时,较低的采样率更为合适。默认值为 1000。

partition_column [string]

拆分数据的列名。

partition_upper_bound [BigDecimal]

扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。

partition_lower_bound [BigDecimal]

扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。

partition_num [int]

不推荐使用,正确的方法是通过 split.size 控制拆分的数量。

我们需要拆分成多少个块,仅支持正整数,默认值为作业并行性。

小贴士

如果表无法拆分(例如,表没有主键或唯一索引,且未设置 partition_column),则将以单一并发执行。

使用 table_path 替代 query 进行单表读取。如果需要读取多个表,使用 table_list

示例任务

以下是一些使用 JDBC 源连接器的任务示例:

简单示例

该示例在 TEST_TABLE 表中查询所有字段,您还可以指定要查询哪些字段以最终输出到控制台。

env {
  parallelism = 4
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        query = "SELECT * FROM TEST_TABLE"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

partition_column 并行读取

使用配置的分片字段和分片数据并行读取查询表,如果想读取整个表,可以使用此方法。

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        partition_column = "ID"
        partition_num = 10
    }
}
sink {
  Console {}
}

按主键或唯一索引并行读取

通过配置 table_path 开启自动分片,可以配置 split.* 来调整分片策略。

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        table_path = "DA.SCHEMA1.TABLE1"
        query = "select * from SCHEMA1.TABLE1"
        split.size = 10000
    }
}
sink {
  Console {}
}

并行读取

根据配置的上下限更高效地读取数据。

source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        partition_column = "ID"
        partition_lower_bound = 1
        partition_upper_bound = 500
        partition_num = 10
    }
}

多表读取

配置 table_list 将开启自动拆分,以配置split.来调整分割策略*。

env {
  job.mode = "BATCH"
  parallelism = 4
}
source {
  Jdbc {
    url = "jdbc:oracle:thin:@datasource01:1523:xe"
    driver = "oracle.jdbc.OracleDriver"
    connection_check_timeout_sec = 100
    user = "root"
    password = "123456"
    "table_list"=[
        {
            "table_path"="XE.TEST.USER_INFO"
        },
        {
            "table_path"="XE.TEST.YOURTABLENAME"
        }
    ]
    #where_condition= "where id > 100"
    split.size = 10000
    #split.even-distribution.factor.upper-bound = 100
    #split.even-distribution.factor.lower-bound = 0.05
    #split.sample-sharding.threshold = 1000
    #split.inverse-sampling.rate = 1000
  }
}

sink {
  Console {}
}

总结

Apache SeaTunnel为连接和集成Oracle数据库提供了灵活的解决方案。通过简单的配置,用户可以高效地从Oracle数据库读取和处理数据,满足不同的业务需求。

无论是在实时数据处理还是批量数据集成场景中,SeaTunnel都能为用户带来显著的便利和高效。

本文由 白鲸开源科技 提供发布支持!

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
AI总结
返回顶部
顶部