Mysql binlog输入插件使用指南

原创
2023/08/23 09:44
阅读数 20

1.概述

bboss-datatran采用标准的输入输出异步管道来处理数据,输入插件和输出插件可以自由组合,输入插件从数据源采集数据,经过数据异步并行流批一体化处理后,输出插件将处理后的数据、指标数据输出到目标地。

图片 图片

bboss插件参考文档:

https://esdoc.bbossgroups.com/#/datatran-plugins

bboss mysql binlog数据采集插件原理图如下:

图片

Mysql binlog插件通过配置对应的mysql master ip和端口、数据库账号和口令、监听的数据库表以及binlog文件路径等信息,实时采集mysql增删改数据,支持以下三种数据采集模式:

模式1 直接读取binlog文件,采集文件中的增删改数据

模式2 监听mysql master slave ip和端口,作业重启从binlog最新位置采集数据

模式3 监听mysql master slave ip和端口,启用故障容灾配置,每次重启作业从上次采集结束的位置开始采集数据

模式1适用一次性离线数据采集场景,模式2和模式3适用于实时采集场景。源表本来就有数据需要同步+实时同步,原来的数据可以基于模式1采集binlog文件,如果没有binlog文件,可以直接用数据库输入插件,直接一次性采集全表数据,然后再用模式3实现增量采集。

视频教程

实时采集Mysql binlog增删改数据教程(db-db单表多表)

实战:基于bboss cdc实时同步mysql增删改数据到Elasticsearch

本文介绍mysql binlog插件的使用方法,以实时同步Mysql Binlog增删改数据到Elasticsearch作为案例来讲解。

2.采集作业实现

2.1 binlog输入插件配置

2.1.1 模式1案例

模式1 直接读取binlog文件,采集文件中的增删改数据


 ImportBuilder importBuilder = new ImportBuilder();
        importBuilder.setBatchSize(1000);//设置批量入Elasticsearch库的记录数
        //binlog插件配置开始
        MySQLBinlogConfig mySQLBinlogConfig = new MySQLBinlogConfig();
        mySQLBinlogConfig.setHost("192.168.137.1");
        mySQLBinlogConfig.setPort(3306);
        mySQLBinlogConfig.setDbUser("root");
        mySQLBinlogConfig.setDbPassword("123456");
        //如果直接监听文件则设置binlog文件路径,否则不需要配置文件路径
        mySQLBinlogConfig.setFileNames("F:\\6_environment\\mysql\\binlog.000107,F:\\6_environment\\mysql\\binlog.000127");
        mySQLBinlogConfig.setTables("cityperson");//监控增量表名称
        mySQLBinlogConfig.setDatabase("bboss");//监控增量表名称

        //binlog插件配置结束
        importBuilder.setInputConfig(mySQLBinlogConfig);

2.1.2 模式2案例

模式2 监听mysql master slave ip和端口,作业重启从binlog最新位置采集删改数据

//binlog插件配置开始
MySQLBinlogConfig mySQLBinlogConfig = new MySQLBinlogConfig();
mySQLBinlogConfig.setHost("192.168.137.1");
mySQLBinlogConfig.setPort(3306);
mySQLBinlogConfig.setDbUser("root");
mySQLBinlogConfig.setDbPassword("123456");

mySQLBinlogConfig.setTables("cityperson");//监控增量表名称
mySQLBinlogConfig.setDatabase("bboss");//监控增量表名称
mySQLBinlogConfig.setServerId(65536L);//模拟slave节点ID
//binlog插件配置结束
importBuilder.setInputConfig(mySQLBinlogConfig);

2.1.3 模式3案例

监听mysql master slave ip和端口,启用故障容灾配置,每次重启作业从上次采集结束的位置开始采集数据

        MySQLBinlogConfig mySQLBinlogConfig = new MySQLBinlogConfig();
        mySQLBinlogConfig.setHost("192.168.137.1");
        mySQLBinlogConfig.setPort(3306);
        mySQLBinlogConfig.setDbUser("root");
        mySQLBinlogConfig.setDbPassword("123456");
        mySQLBinlogConfig.setServerId(100000L);
        mySQLBinlogConfig.setTables("cityperson,batchtest");//
        mySQLBinlogConfig.setDatabase("bboss");
        mySQLBinlogConfig.setEnableIncrement(true);//启用模式3
       // mysql binlog插件增加异步启动机制,JoinToConnectTimeOut大于0生效,否则是同步启动,启用方法:
        mySQLBinlogConfig.setJoinToConnectTimeOut(20000L);
        importBuilder.setInputConfig(mySQLBinlogConfig);
        importBuilder.setPrintTaskLog(true);
        int batchSize = 500;//批量入库记录数

        importBuilder.setBatchSize(batchSize);//设置批量入库的记录数
        importBuilder.setFlushInterval(10000L);//如果10秒内没有达到500条数据,但是有数据,则强制输出数据
        //启用模式3 故障容灾机制配置       
//        importBuilder.setStatusDbname("testStatus");//指定增量状态数据源名称
      importBuilder.setLastValueStorePath("binlog2db_import");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样
        importBuilder.setLastValueStoreTableName("binlog");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab

通过setEnableIncrement方法启用模式3:

mySQLBinlogConfig.setEnableIncrement(true);//启用模式3

2.2 Elasticsearch输出插件配置

通过ElasticsearchOutputConfig 配置Elasticsearch服务器地址及连接参数、索引表、文档Id字段等信息


ElasticsearchOutputConfig elasticsearchOutputConfig = new ElasticsearchOutputConfig();
        elasticsearchOutputConfig
                .addTargetElasticsearch("elasticsearch.serverNames","default")
                .addElasticsearchProperty("default.elasticsearch.rest.hostNames","192.168.137.1:9200")
                .addElasticsearchProperty("default.elasticsearch.showTemplate","true")
                .addElasticsearchProperty("default.elasticUser","elastic")
                .addElasticsearchProperty("default.elasticPassword","changeme")
                .addElasticsearchProperty("default.elasticsearch.failAllContinue","true")
                .addElasticsearchProperty("default.http.timeoutSocket","60000")
                .addElasticsearchProperty("default.http.timeoutConnection","40000")
                .addElasticsearchProperty("default.http.connectionRequestTimeout","70000")
                .addElasticsearchProperty("default.http.maxTotal","200")
                .addElasticsearchProperty("default.http.defaultMaxPerRoute","100")
                .setIndex("binlogdemo") //设置全局索引表
                .setEsIdField("rowNo");//设置文档主键,不设置,则自动产生文档id,设置好id后,binlog采集的增删改数据,会自动同步到Elasticsearch
        importBuilder.setOutputConfig(elasticsearchOutputConfig);

设置索引表时,可以直接指定索引名称,也可以指定按天分表的动态索引名称:直接指定索引名称

2.2.1 全局索引表配置

elasticsearchOutputConfig.setIndex("binlogdemo") //设置索引表

按天动态分表索引名称

elasticsearchOutputConfig.setIndex("binlogdemo-{dateformat=yyyy.MM.dd}") //设置索引表,当前日期按天分表

elasticsearchOutputConfig.setIndex("binlogdemo-{field=agentStarttime,dateformat=yyyy.MM.dd}") //设置索引表,根据日期字段agentStarttime对应的日期按天分表

2.2.2 记录级别索引名称设置

如果通过mysql binlog插件采集了多张表的数据,并且需要给每张表指定定义的索引名称,则通过以下方式进行配置:


 importBuilder.setDataRefactor(new DataRefactor() {
            @Override
            public void refactor(Context context) throws Exception {
                //根据表名称指定不同的Elasticsearch索引表
                String table = (String)context.getMetaValue("table");
                if(table.equals("cityperson"))
                    context.setIndex("cityperson-{dateformat=yyyy.MM.dd}");
                else
                    context.setIndex("batchtest-{dateformat=yyyy.MM.dd}");

            }
        });

2.2.3 Elasticsearch文档Id设置

设置Elasticsearch文档主键,不设置,则自动产生文档id,设置好id后,binlog采集的删除和修改数据,才会自动同步到Elasticsearch,设置方法如下:

elasticsearchOutputConfig.setEsIdField("rowNo");//设置文档主键,不设置,则自动产生文档id,设置好id后,binlog采集的增删改数据,会自动同步到Elasticsearch

2.3 文档数据加工和处理

通过setDataRefactor接口来处理同步的数据记录


 importBuilder.setDataRefactor(new DataRefactor() {
            @Override
            public void refactor(Context context) throws Exception {
                //根据表名称指定不同的Elasticsearch索引表
                String table = (String)context.getMetaValue("table");
                if(table.equals("cityperson"))
                    context.setIndex("cityperson-{dateformat=yyyy.MM.dd}");
                else
                    context.setIndex("batchtest-{dateformat=yyyy.MM.dd}");

            }
        });

可以直接参考以下文档章节:【2.8.10 灵活控制文档数据结构】

https://esdoc.bbossgroups.com/#/db-es-tool

2.4 执行作业

配置好输入输出插件后,通过importBuilder构建DataStream 对象,然后执行execute方法即可启动运行binlog数据采集作业

DataStream dataStream = importBuilder.builder();       
dataStream.execute();

2.5 完整的作业源码

源码工程地址:https://gitee.com/bboss/bboss-datatran-demo

案例作业代码文件:

https://gitee.com/bboss/bboss-datatran-demo/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/binlog/Binlog2EleasticsearchOutput.java

更多案例,可以参考文档:

mysql binlog数据采集案例

2.6 视频教程

mysql binlog数据采集作业开发调测发布部署视频教程:

https://www.bilibili.com/video/BV1ko4y1M7My/

3 开发交流

 

支持我们

如果您正在使用bboss,或是想支持我们继续开发,您可以通过如下方式支持我们:

1.Star并向您的朋友推荐或分享

bboss elasticsearch client🚀

数据采集&流批一体化处理🚀

2.通过爱发电 直接捐赠,或者扫描下面二维码进行一次性捐款赞助,请作者喝一杯咖啡☕️

 

非常感谢您对开源精神的支持!❤您的捐赠将用于bboss社区建设、QQ群年费、网站云服务器租赁费用。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部