引言
在大数据时代,企业面临着数据的快速增长和多样化需求,如何高效地处理和整合来自不同数据源的数据成为了关键问题。
Apache SeaTunnel作为一款开源数据集成工具,提供了灵活的数据处理和实时数据同步能力,广泛应用于数据仓库、数据湖及实时分析场景中。与此同时,Oracle 数据库以其高性能和可靠性,成为许多企业数据存储的首选。结合 Apache SeaTunnel 与 Oracle 数据库,可以实现高效的数据迁移与转换。
Apache SeaTunnel
Apache SeaTunnel 是一款数据集成工具,支持批处理和流处理。它允许用户通过简单的配置实现数据的抽取、转换和加载(ETL)。SeaTunnel 提供了多种连接器,能够轻松集成不同的数据源和目标,包括关系型数据库、NoSQL 数据库、文件系统等。其灵活性和扩展性使其成为企业数据集成的重要选择。
Oracle 数据库
Oracle 数据库是一款广泛使用的关系型数据库管理系统,以其强大的事务处理能力和安全性而著称。它适用于处理大型企业级应用程序的数据存储需求。Oracle 支持多种数据类型和复杂查询,能够高效地管理和分析大量数据,适合用于金融、电信、医疗等行业。
Oracle JDBC 连接器概述
连接器描述
Oracle JDBC 连接器通过 JDBC 方式读取外部数据源的数据,支持 Apache SeaTunnel 的多种引擎,包括 Spark、Flink 和 SeaTunnel Zeta。
使用依赖
Spark/Flink 引擎
- 确保将 JDBC 驱动 jar 包 放置在
${SEATUNNEL_HOME}/plugins/
目录中。 - 为支持国际化字符集,复制
orai18n.jar
到${SEATUNNEL_HOME}/plugins/
目录。
SeaTunnel Zeta 引擎
- 确保将 JDBC 驱动 jar 包 放置在
${SEATUNNEL_HOME}/lib/
目录中。 - 为支持国际化字符集,复制
orai18n.jar
到${SEATUNNEL_HOME}/lib/
目录。
主要特性
- 批处理支持
- 精确一次(exactly-once)语义
- 列投影(column projection)
- 并行处理(parallelism)
- 支持用户定义的分割(user-defined split)
支持的数据源信息
| 数据源 | 支持版本 | 驱动 | URL | Maven | |---------|------------------------------|--------------------------|-------------------------------------|----------------------------------------------------------| | Oracle | 依赖版本不同有不同驱动类 | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@datasource01:1523:xe | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
数据库依赖
请下载对应于 Maven 的支持列表,并将其复制到 ${SEATUNNEL_HOME}/plugins/jdbc/lib/
工作目录中。
例如 Oracle 数据源:
cp ojdbc8-xxxxxx.jar $SEATUNNEL_HOME/lib/
为支持国际化字符集,复制 orai18n.jar
到 ${SEATUNNEL_HOME}/lib/
目录中。
数据类型映射
| Oracle 数据类型 | SeaTunnel 数据类型 | |------------------------------------------------------|---------------------| | INTEGER | DECIMAL(38,0) | | FLOAT | DECIMAL(38,18) | | NUMBER(precision <= 9, scale == 0) | INT | | NUMBER(9 < precision <= 18, scale == 0) | BIGINT | | NUMBER(18 < precision, scale == 0) | DECIMAL(38,0) | | NUMBER(scale != 0) | DECIMAL(38,18) | | BINARY_DOUBLE | DOUBLE | | BINARY_FLOAT
REAL | FLOAT | | CHAR
NCHAR
VARCHAR
NVARCHAR2
VARCHAR2
LONG
ROWID
NCLOB
CLOB
XML | STRING | | DATE | TIMESTAMP | | TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP | | BLOB
RAW
LONG RAW
BFILE | BYTES |
源选项
| 名称 | 类型 | 是否必需 | 默认值 | 描述 | |------------------------------------------|------------|----------|-----------------|--------------------------------------------------------| | url | String | 是 | - | JDBC 连接的 URL。例如:jdbc:oracle:thin:@datasource01:1523:xe
| | driver | String | 是 | - | 用于连接远程数据源的 JDBC 类名,例如 oracle.jdbc.OracleDriver
| | user | String | 否 | - | 连接实例的用户名 | | password | String | 否 | - | 连接实例的密码 | | query | String | 是 | - | 查询语句 | | connection_check_timeout_sec | Int | 否 | 30 | 验证连接时的超时时间(秒) | | partition_column | String | 否 | - | 用于并行处理的分区列名,仅支持数值类型主键 | | partition_lower_bound | BigDecimal | 否 | - | 扫描的 partition_column 最小值 | | partition_upper_bound | BigDecimal | 否 | - | 扫描的 partition_column 最大值 | | partition_num | Int | 否 | job parallelism | 分区数量,仅支持正整数,默认值为作业并行性 | | fetch_size | Int | 否 | 0 | 查询时的行抓取大小,0 表示使用 JDBC 默认值 | | properties | Map | 否 | - | 其他连接配置参数 |
| 名称 | 类型 | 是否必需 | 默认值 | 描述 | |--------------------------------------------|------------|----------|-----------------|--------------------------------------------------------| | url | String | 是 | - | JDBC 连接的 URL,例如:jdbc:mysql://localhost:3306/test
| | driver | String | 是 | - | 用于连接远程数据源的 JDBC 类名,例如 MySQL 为 com.mysql.cj.jdbc.Driver
。 | | user | String | 否 | - | 连接实例的用户名 | | password | String | 否 | - | 连接实例的密码 | | query | String | 是 | - | 查询语句 | | connection_check_timeout_sec | Int | 否 | 30 | 验证连接时的超时时间(秒) | | partition_column | String | 否 | - | 用于并行处理的分区列名,仅支持数值类型主键,且只能配置一列。 | | partition_lower_bound | BigDecimal | 否 | - | 扫描的 partition_column
最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。 | | partition_upper_bound | BigDecimal | 否 | - | 扫描的 partition_column
最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。 | | partition_num | Int | 否 | job parallelism | 分区数量,仅支持正整数,默认值为作业并行性。 | | fetch_size | Int | 否 | 0 | 查询时的行抓取大小,0 表示使用 JDBC 默认值。 | | properties | Map | 否 | - | 其他连接配置参数,当 properties
和 URL 有相同参数时,优先级由具体驱动的实现决定。 | | table_path | String | 否 | - | 表的完整路径,可以使用该配置替代 query
。例如: MySQL 为 testdb.table1
,Oracle 为 test_schema.table1
。 | | table_list | Array | 否 | - | 要读取的表列表,可以使用该配置替代 table_path
。例如: [{ table_path = "testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name from testdb.table2"}]
| | where_condition | String | 否 | - | 所有表/查询的通用行过滤条件,必须以 where
开头,例如 where id > 100
。 | | split.size | Int | 否 | 8096 | 表的拆分大小(行数),捕获的表在读取时被拆分为多个部分。 | | split.even-distribution.factor.lower-bound | Double | 否 | 0.05 | 块键分布因子的下限,用于判断表数据是否均匀分布。 | | split.even-distribution.factor.upper-bound | Double | 否 | 100 | 块键分布因子的上限,用于判断表数据是否均匀分布。 | | split.sample-sharding.threshold | Int | 否 | 10000 | 触发样本分片策略的估计分片数量阈值。 | | split.inverse-sampling.rate | Int | 否 | 1000 | 样本分片策略中使用的采样率的反值。 | | common-options | | 否 | - | 源插件通用参数,详细信息请参考 Source Common Options。 |
并行读取
JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用一定规则拆分表中的数据,并将其交给读取器进行读取。读取器的数量由 parallelism
选项决定。
拆分键规则
- 如果
partition_column
不为空,则使用该列进行计算。该列必须为 支持的拆分数据类型。 - 如果
partition_column
为空,SeaTunnel 将读取表的模式并获取主键和唯一索引。如果主键和唯一索引中有多个列,则使用第一个支持的拆分数据类型列进行拆分。例如,表的主键为(guid, name)
,因为guid
不在 支持的拆分数据类型 中,因此使用name
列进行拆分。
支持的拆分数据类型
- String
- Number (int, bigint, decimal, ...)
- Date
拆分相关选项
split.size
每个拆分中包含的行数,捕获的表在读取时被拆分为多个部分。
split.even-distribution.factor.lower-bound
不推荐使用
块键分布因子的下限。用于判断表数据是否均匀分布。如果计算出的分布因子大于或等于该下限(即 (MAX(id) - MIN(id) + 1) / row count
),则表块将被优化为均匀分布。否则,如果分布因子小于下限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold
指定的值,将使用基于采样的分片策略。默认值为 0.05。
split.even-distribution.factor.upper-bound
不推荐使用
块键分布因子的上限。用于判断表数据是否均匀分布。如果计算出的分布因子小于或等于该上限(即 (MAX(id) - MIN(id) + 1) / row count
),则表块将被优化为均匀分布。否则,如果分布因子大于上限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold
指定的值,将使用基于采样的分片策略。默认值为 100.0。
split.sample-sharding.threshold
此配置指定估计分片数量的阈值,以触发样本分片策略。当分布因子超出指定的上下限时,且估计的分片数量(计算为近似行数 / 块大小)超过此阈值时,将使用样本分片策略。这可以帮助更高效地处理大型数据集。默认值为 1000 个分片。
split.inverse-sampling.rate
样本分片策略中使用的采样率的反值。例如,如果该值设置为 1000,则在采样过程中应用 1/1000 的采样率。此选项提供灵活性,以控制采样的粒度,从而影响最终的分片数量。尤其在处理非常大的数据集时,较低的采样率更为合适。默认值为 1000。
partition_column [string]
拆分数据的列名。
partition_upper_bound [BigDecimal]
扫描的 partition_column
最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_lower_bound [BigDecimal]
扫描的 partition_column
最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_num [int]
不推荐使用,正确的方法是通过
split.size
控制拆分的数量。
我们需要拆分成多少个块,仅支持正整数,默认值为作业并行性。
小贴士
如果表无法拆分(例如,表没有主键或唯一索引,且未设置
partition_column
),则将以单一并发执行。使用
table_path
替代query
进行单表读取。如果需要读取多个表,使用table_list
。
示例任务
以下是一些使用 JDBC 源连接器的任务示例:
简单示例
该示例在 TEST_TABLE
表中查询所有字段,您还可以指定要查询哪些字段以最终输出到控制台。
env {
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = "root"
password = "123456"
query = "SELECT * FROM TEST_TABLE"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
按 partition_column
并行读取
使用配置的分片字段和分片数据并行读取查询表,如果想读取整个表,可以使用此方法。
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = "root"
password = "123456"
partition_column = "ID"
partition_num = 10
}
}
sink {
Console {}
}
按主键或唯一索引并行读取
通过配置 table_path
开启自动分片,可以配置 split.* 来调整分片策略。
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = "root"
password = "123456"
table_path = "DA.SCHEMA1.TABLE1"
query = "select * from SCHEMA1.TABLE1"
split.size = 10000
}
}
sink {
Console {}
}
并行读取
根据配置的上下限更高效地读取数据。
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = "root"
password = "123456"
partition_column = "ID"
partition_lower_bound = 1
partition_upper_bound = 500
partition_num = 10
}
}
多表读取
配置 table_list
将开启自动拆分,以配置split.
来调整分割策略*。
env {
job.mode = "BATCH"
parallelism = 4
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
"table_list"=[
{
"table_path"="XE.TEST.USER_INFO"
},
{
"table_path"="XE.TEST.YOURTABLENAME"
}
]
#where_condition= "where id > 100"
split.size = 10000
#split.even-distribution.factor.upper-bound = 100
#split.even-distribution.factor.lower-bound = 0.05
#split.sample-sharding.threshold = 1000
#split.inverse-sampling.rate = 1000
}
}
sink {
Console {}
}
总结
Apache SeaTunnel为连接和集成Oracle数据库提供了灵活的解决方案。通过简单的配置,用户可以高效地从Oracle数据库读取和处理数据,满足不同的业务需求。
无论是在实时数据处理还是批量数据集成场景中,SeaTunnel都能为用户带来显著的便利和高效。
本文由 白鲸开源科技 提供发布支持!