上一篇 Apache SeaTunnel 同步MySQL 数据 到 Apache Kafka (SeaTunnel Engine) 使用SeaTunnel同步了MySQL到Kafka,本篇使用SeaTunnel再来体验一下MySQL到Elasticsearch,支持CDC实时变更同步数据。
环境及软件安装可参考上一篇博客,本篇只需要将Kafka连接器换成 ES 连接器即可。
连接器插件下载
编辑配置作业配置文件
该文件决定了seatunnel启动后数据输入、处理、输出的方式和逻辑。下面是一个MySQL-CDC 到 ES 配置文件的例子。
配置文件及路径 apache-seatunnel-incubating-2.3.1/config/mysql2es.conf
env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 15000
}
source {
MySQL-CDC {
result_table_name = "table1"
username = "mysqlcdc"
password = "mysqlcdc"
hostname = 127.0.0.1
base-url="jdbc:mysql://127.0.0.1:3306/test01"
startup.mode=INITIAL
catalog {
factory=MySQL
}
table-names=[
"test01.xxl_job_group"
]
# compatible_debezium_json options
# format = compatible_debezium_json
debezium = {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
# include ddl
include.schema.changes = true
}
# compatible_debezium_json fixed schema
schema = {
fields = {
id = "int32"
app_name = "string"
title = "string"
address_type = "INT16"
address_list = "string"
update_time = "string"
}
}
}
}
transform {
Sql {
source_table_name = "table1"
result_table_name = "table2"
query = "select id, app_name, title,address_type,address_list,update_time from table1"
}
}
sink {
Elasticsearch {
source_table_name = "table2"
hosts = ["192.168.122.135:9200"]
index = "seatunnel-test01"
## 用于生成 document _ id 的主键字段,这是 cdc 必需的选项。
primary_keys = ["id"]
}
}
检查点储存配置修改(测试方便改为本地)
apache-seatunnel-incubating-2.3.1/config/seatunnel.yaml
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 10000
timeout: 60000
max-concurrent: 5
tolerable-failure: 2
storage:
type: localfile
max-retained: 3
plugin-config:
storage.type: localfile
fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission
二、运行及效果查看
sh bin/seatunnel.sh --config config/mysql2es.conf -e local &
至此简单使用就完成了。