Apache SeaTunnel 同步MySQL 数据 到 Elasticsearch (SeaTunnel Engine)

原创
05/19 11:37
阅读数 396

上一篇 Apache SeaTunnel 同步MySQL 数据 到 Apache Kafka (SeaTunnel Engine) 使用SeaTunnel同步了MySQL到Kafka,本篇使用SeaTunnel再来体验一下MySQL到Elasticsearch,支持CDC实时变更同步数据。

环境及软件安装可参考上一篇博客,本篇只需要将Kafka连接器换成 ES 连接器即可。

连接器插件下载

https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-elasticsearch/2.3.1/connector-elasticsearch-2.3.1.jar

编辑配置作业配置文件

该文件决定了seatunnel启动后数据输入、处理、输出的方式和逻辑。下面是一个MySQL-CDC 到 ES 配置文件的例子。

配置文件及路径 apache-seatunnel-incubating-2.3.1/config/mysql2es.conf

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 15000
}

source {
  MySQL-CDC {
    result_table_name = "table1"
    username = "mysqlcdc"
    password = "mysqlcdc"
    hostname = 127.0.0.1
    base-url="jdbc:mysql://127.0.0.1:3306/test01"
    startup.mode=INITIAL
    catalog {
        factory=MySQL
    }
    table-names=[
        "test01.xxl_job_group"
    ]
    # compatible_debezium_json options
    # format = compatible_debezium_json
    debezium = {
         # include schema into kafka message
        key.converter.schemas.enable = false
        value.converter.schemas.enable = false
         # include ddl
        include.schema.changes = true
    }
    # compatible_debezium_json fixed schema
    schema = {
        fields = {
            id = "int32"
            app_name = "string"
            title = "string"
            address_type = "INT16"
            address_list = "string"
            update_time = "string"
        }
    }
  }
}

transform {
  Sql {
    source_table_name = "table1"
    result_table_name = "table2"
    query = "select id, app_name, title,address_type,address_list,update_time from table1"
  }
}

sink {
    Elasticsearch {
        source_table_name = "table2"
        hosts = ["192.168.122.135:9200"]
        index = "seatunnel-test01"
        ## 用于生成 document _ id 的主键字段,这是 cdc 必需的选项。
        primary_keys = ["id"]
    }
}

检查点储存配置修改(测试方便改为本地)

apache-seatunnel-incubating-2.3.1/config/seatunnel.yaml

seatunnel:
  engine:
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 10000
      timeout: 60000
      max-concurrent: 5
      tolerable-failure: 2
      storage:
        type: localfile
        max-retained: 3
        plugin-config:
          storage.type: localfile
          fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission

二、运行及效果查看

sh bin/seatunnel.sh --config config/mysql2es.conf -e local &

至此简单使用就完成了。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部