Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (Table API方式)

原创
05/02 19:17
阅读数 427

上一篇采用 DataStream 方式 Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式) ,该篇采用Table API方式。

一、创建项目

基于jdk17 + springboot3.0.2 + elasticsearch7.17.9 + flink1.16.0 + flink CDC2.3.0

1、pom 主要依赖

<properties>
    <java.version>17</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>3.0.2</spring-boot.version>
    <flink.version>1.16.0</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
    <!-- Flink CDC connector for MySQL -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7</artifactId>
        <version>3.0.0-1.16</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2、实现MySQL变更监听

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MysqlEventListener implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) {
        mysql2es();
    }

    /**
     * mysql to es
     */
    private void mysql2es() {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置1个并行源任务
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            // 数据源表
            String sourceDDL =
                    """
                            CREATE TABLE IF NOT EXISTS system_dept (
                                id BIGINT,
                                name VARCHAR(30),
                                sort INT,
                                leader_user_id BIGINT,
                                phone VARCHAR(11),
                                email VARCHAR(50),
                                status TINYINT,
                                creator VARCHAR(64),
                                create_time TIMESTAMP(19),
                                PRIMARY KEY(id) NOT ENFORCED
                            ) WITH (
                                'connector' = 'mysql-cdc',
                                'hostname' = 'localhost',
                                'port' = '3306',
                                'username' = 'flinkcdc',
                                'password' = 'flinkcdc',
                                'database-name' = 'db01',
                                'table-name' = 'system_dept'
                            )
                            """;
            // 输出目标表
            String sinkDDL =
                    """
                            CREATE TABLE IF NOT EXISTS system_dept_es (
                                id BIGINT,
                                name VARCHAR(30),
                                sort INT,
                                leader_user_id BIGINT,
                                phone VARCHAR(11),
                                email VARCHAR(50),
                                status TINYINT,
                                creator VARCHAR(64),
                                create_time TIMESTAMP(19),
                                PRIMARY KEY(id) NOT ENFORCED
                            ) WITH (
                                'connector' = 'elasticsearch-7',
                                'hosts' = 'http://localhost:9200',
                                'index' = 'system_dept_search',
                                'sink.bulk-flush.max-actions' = '1'
                            )
                            """;
            // 简单的聚合处理
            String transformSQL = "INSERT INTO system_dept_es SELECT * FROM system_dept";
            tableEnv.executeSql(sourceDDL);
            tableEnv.executeSql(sinkDDL);
            TableResult result = tableEnv.executeSql(transformSQL);
            result.print();
            env.executeAsync("mysql-cdc-es");
        } catch (Exception e) {
            log.error("mysql --> es, Exception=", e);
        }
    }
}

到此就大功告成啦! 代码地址:https://gitee.com/qianxkun/lakudouzi-components/tree/master/flink-cdc-mysql2es-2

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部