文档章节

基于Netty模拟解析Binlog

ksfzhaohui
 ksfzhaohui
发布于 01/10 16:43
字数 1748
阅读 423
收藏 18
点赞 0
评论 0

系列文章

MySql Binlog初识
MySql Binlog事件介绍篇
MySql Binlog事件数据篇
Mysql通讯协议分析
基于Netty模拟解析Binlog

前言
最近一段时间一直再看mysql binlog相关的内容,也整理了几篇相关的文章,对mysql的事件以及通讯协议在理论上有了一个大概的了解,但是缺少实战;本文的目的就是从实战出发,了解binlog解析的整个过程。

解析思路
把binlog的解析过程大致分为以下几个步骤:
1.服务器启动首先获取上一次解析成功的位置(实例中存储在本地文件中);
2.和mysql服务器建立连接;
3.接受mysql发送来的binlog事件;
4.对不同的binlog事件进行解析;
5.将数据进行存储(实例中仅在日志中打印);
6.存储成功后,定时记录Binaly Log位置。

关于binlog相关的配置可以参考系列文章,里面有详解的介绍,下面对步骤进行详细的介绍;

1.服务器启动首先获取上一次解析成功的位置(实例中存储在本地文件中)
binlog的位置信息存储在文件namePosition,有更新也同样更新到namePosition中,部分代码如下:

public class NamePositionStore {
 
    private static Logger log = LoggerFactory.getLogger(NamePositionStore.class);
 
    public static final String BINLOG_NAME = "binlogName";
    public static final String BINLOG_POSITIION = "binlogPosition";
 
    private static Map<String, String> binlogMap = new HashMap<String, String>();
 
    private static String lineSeparator = (String) System.getProperties().get("line.separator");
    private static String localStoreUrl = "namePosition";
 
    static {
        loadNamePosition();
    }
 
    public static synchronized Map<String, String> loadNamePosition() {
        binlogMap = load();
        return binlogMap;
    }
 
    public static synchronized Map<String, String> getNamePosition() {
        return binlogMap;
    }
 
    public static synchronized void putNamePosition(String binlogName, long binlogPosition) {
        binlogMap.put(BINLOG_NAME, binlogName);
        binlogMap.put(BINLOG_POSITIION, binlogPosition + "");
 
        store(binlogMap);
    }
 
    public static synchronized void putNamePosition(long binlogPosition) {
        binlogMap.put(BINLOG_POSITIION, binlogPosition + "");
        store(binlogMap);
    }
         
    ...以下代码省略,可参考码云完整代码...
}

namePosition中存储了两个字段分别是:binlogName和binlogPosition,这两个字段会在客户端请求mysql binlog的时候需要的参数;

2.和mysql服务器建立连接
在文章Mysql通讯协议分析中可以看到和mysql服务器建立连接的步骤:mysql发送握手包,客户端发送认证包,mysql发送认证的结果;

public class HandshakeHandler extends SimpleChannelInboundHandler<DataPackage> {
 
    private Logger logger = LoggerFactory.getLogger(HandshakeHandler.class);
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DataPackage pk) throws Exception {
        logger.info("Handshake start");
        if (null == pk) {
            return;
        }
        ByteBuf msg = (ByteBuf) pk.getContent();
        int protocolVersion = msg.readByte();
        String serverVersion = ByteUtil.NullTerminatedString(msg);
        int threadId = ByteUtil.readInt(msg, 4);
        logger.info("protocolVersion = " + protocolVersion + ",serverVersion = " + serverVersion + ",threadId = "
                + threadId);
        String randomNumber1 = ByteUtil.NullTerminatedString(msg);
        msg.readBytes(2);
        byte encode = msg.readByte();
        msg.readBytes(2);
        msg.readBytes(13);
        String randomNumber2 = ByteUtil.NullTerminatedString(msg);
        logger.info("Handshake end");
        AuthenticateDataBean dataBean = new AuthenticateDataBean(encode, randomNumber1 + randomNumber2,
                Constants.userName, Constants.password);
        ctx.channel().writeAndFlush(new DataPackage(1, dataBean));
        ctx.pipeline().remove(this);
    }
}

接受mysql发送的握手包,进行相关的解析工作,其中比较重要的是两个挑战随机数,客户端在认证的时候需要使用随机数对密码加密;解析完之后客户端发送认证数据包(封装在AuthenticateDataBean),具体类信息如下:

public class AuthenticateDataBean implements IDataBean {
    /** 认证需要的用户名密码 **/
    private String userName;
    private String password;
    /** 编码和挑战随机数 **/
    private byte encode;
    private String randomNumber;
 
    ...以下代码省略,可参考码云完整代码...
 
    @Override
    public byte[] toByteArray() throws Exception {
        int clientPower = PowerType.CLIENT_LONG_FLAG | PowerType.CLIENT_PROTOCOL_41
                | PowerType.CLIENT_SECURE_CONNECTION;
        byte clientPowerBytes[] = ByteUtil.writeInt(clientPower, 4);
        int maxLen = 0;
        byte maxLenBytes[] = ByteUtil.writeInt(maxLen, 4);
        byte encodeBytes[] = ByteUtil.writeInt(encode, 1);
        byte zeroBytes[] = ByteUtil.writeInt(0, 23);
 
        byte[] userNameBytes = (userName + "\0").getBytes();
        byte[] passwordBytes = "".equals(password) ? new byte[0]
                : ByteUtil.passwordCompatibleWithMySQL411(password, randomNumber);
        ByteBuf byteBuf = Unpooled.buffer();
        byteBuf.writeBytes(clientPowerBytes);
        byteBuf.writeBytes(maxLenBytes);
        byteBuf.writeBytes(encodeBytes);
        byteBuf.writeBytes(zeroBytes);
        byteBuf.writeBytes(userNameBytes);
        byteBuf.writeByte((byte) passwordBytes.length);
        byteBuf.writeBytes(passwordBytes);
        return byteBuf.array();
    }
 
}

发送的认证包到服务器之后,客户端会收到认证的结果,具体处理在AuthenticateResultHandler中:

public class AuthenticateResultHandler extends SimpleChannelInboundHandler<DataPackage> {
 
    private Logger logger = LoggerFactory.getLogger(AuthenticateResultHandler.class);
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DataPackage dataPackage) throws Exception {
        ByteBuf msg = (ByteBuf) dataPackage.getContent();
        int mark = msg.readByte();
        if (mark == 0) {
            Map<String, String> binlongMap = NamePositionStore.getNamePosition();
            RequestBinlogDumpDataBean dataBean = new RequestBinlogDumpDataBean(Constants.serverId,
                    binlongMap.get(NamePositionStore.BINLOG_NAME),
                    Long.valueOf(binlongMap.get(NamePositionStore.BINLOG_POSITIION)));
            ctx.channel().writeAndFlush(new DataPackage(0, dataBean));
            logger.info("Authenticate success:" + ByteUtil.bytesToHexString(msg.array()));
        } else {
            logger.info("Authenticate fail:" + ByteUtil.bytesToHexString(msg.array()));
        }
        ctx.pipeline().remove(this);
    }
}

如果认证成功,这时候客户端需要发送请求接受binlog的请求,这里面包含两个重要的参数就是binlogName和binlogPosition,具体信息在RequestBinlogDumpDataBean类中,结构类似AuthenticateDataBean,此处省略。

3.接受mysql发送来的binlog事件
服务器收到客户端的binlog请求,这时服务器如果产生了binlog日志,会发送给客户端,客户端需要一个接受binlog事件的类:

public class BinlogEventParseHandler extends SimpleChannelInboundHandler<DataPackage> {
 
    private Logger logger = LoggerFactory.getLogger(BinlogEventParseHandler.class);
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DataPackage datePackage) throws Exception {
        ByteBuf contentBuf = (ByteBuf) datePackage.getContent();
        contentBuf.skipBytes(1);
        EventHeader header = new EventHeader();
        header.setTimestamp(ByteUtil.readInt(contentBuf, 4));
        header.setTypeCode((byte) ByteUtil.readInt(contentBuf, 1));
        header.setServerId(ByteUtil.readInt(contentBuf, 4));
        header.setEventLen(ByteUtil.readInt(contentBuf, 4));
        header.setNextPosition(ByteUtil.readInt(contentBuf, 4));
        header.setFlags(ByteUtil.readInt(contentBuf, 2));
        logger.info(header.toString());
 
        IEventParser parser = EventParserFactory.getEventParser(header.getTypeCode());
        if (parser == null) {
            logger.error("不支持的binlog事件类型解析;typeCode = " + header.getTypeCode());
        }
        parser.parse(contentBuf, header);
        if (header.getTypeCode() != EventType.ROTATE_EVENT
                && header.getTypeCode() != EventType.FORMAT_DESCRIPTION_EVENT) {
            NamePositionStore.putNamePosition(header.getNextPosition());
        }
    }
}

首先解析事件头包括:eventType,eventLen,nextPosition等信息,然后根据事件类型,调用不同的解析器进行解析;

4.对不同的binlog事件进行解析
步骤3中通过不同的事件类型,获取对应的解析器,这些解析器都在EventParserFactory中,下面以FormatDescriptionEventParser为例

public class FormatDescriptionEventParser implements IEventParser {
 
    private Logger logger = LoggerFactory.getLogger(FormatDescriptionEventParser.class);
 
    @Override
    public void parse(ByteBuf msg, EventHeader eventHeader) {
        long binlogVersion = ByteUtil.readInt(msg, 2);
        String serverVersion = ByteUtil.readFixedLenString(msg, 50);
        long timestamp = ByteUtil.readInt(msg, 4);
        byte headerLength = msg.readByte();
        StringBuffer eventTypeFixDataLen = new StringBuffer();
        for (int i = 0; i < 27; i++) {
            eventTypeFixDataLen.append(msg.readByte() + ",");
        }
        logger.info("binlogVersion = " + binlogVersion + ",serverVersion = " + serverVersion + ",timestamp = "
                + timestamp + ",headerLength = " + headerLength + ",eventTypeStr = " + eventTypeFixDataLen);
    }
}

根据FormatDescriptionEvent的格式读取ByteBuf里面的数据包括:binlog版本,服务器版本,时间戳,事件头长度以及每个Event的fixed part lengths,本次实战中仅仅将解析后的数据打印到日志中,没有做其他处理。

5.将数据进行存储(实例中仅在日志中打印)
本次使用的binlog模式是:STATEMENT,所有所有的sql语句都会发送给客户端,对应的事件是QueryEvent,包括创建表,增删改等操作:

public class QueryEventParser implements IEventParser {
 
    private Logger logger = LoggerFactory.getLogger(QueryEventParser.class);
 
    private static final int QUERY_EVENT_FIX_LEN = 13;
 
    @Override
    @SuppressWarnings("unused")
    public void parse(ByteBuf msg, EventHeader eventHeader) {
        long threadId = ByteUtil.readInt(msg, 4);
        long time = ByteUtil.readInt(msg, 4);
        int dbNameLen = msg.readByte();
        int errorCode = ByteUtil.readInt(msg, 2);
        int variableLen = ByteUtil.readInt(msg, 2);
 
        msg.skipBytes(variableLen);
 
        String dbName = ByteUtil.NullTerminatedString(msg);
        String sql = ByteUtil.readFixedLenString(msg, (int) (eventHeader.getEventLen() - variableLen
                - EventHeader.EVENT_HEADER_LEN - QUERY_EVENT_FIX_LEN - dbName.getBytes().length - 1));
        logger.info("dbName = " + dbName + ",sql = " + sql);
    }
}

以上的QueryEventParser解析执行的更新语句,记录了数据库名称和相关的更新sql语句。

6.存储成功后,定时记录Binaly Log位置
在步骤三中的BinlogEventParseHandler类中,我们在解析玩之后,存储了nextPosition信息到文件中,方便下次启动读取,同时binlog还有一个切换binlog文件的事件,同样也需要记录;

public class RotateEventParser implements IEventParser {
 
    private Logger logger = LoggerFactory.getLogger(RotateEventParser.class);
 
    @Override
    public void parse(ByteBuf msg, EventHeader eventHeader) {
        long binlogPosition = ByteUtil.readLong(msg, 8);
        int variablePartLen = (int) (eventHeader.getEventLen() - EventHeader.EVENT_HEADER_LEN - 8);
        byte variablePart[] = new byte[variablePartLen];
        msg.readBytes(variablePart);
        String binlogName = new String(variablePart);
 
        logger.info("binlogPosition = " + binlogPosition + ",binlogName = " + binlogName);
 
        NamePositionStore.putNamePosition(binlogName, binlogPosition);
    }
}

对应的事件是RotateEvent,因为切换成新的binlongName,所有需要同时记录binlongName和binlogPosition。

以上具体代码可以参考:
码云:https://gitee.com/OutOfMemory/easy-binlog
github:https://github.com/ksfzhaohui/easy-binlog

总结
本文旨在让大家更加了解binlog同步的大致过程,所以本文提供的项目没有经过大量的测试,仅供大家学习使用;本项目中参考了一些优秀的开源软件:mysql-binlog-connector-java和MySQL-Binlog

个人博客:codingo.xyz

© 著作权归作者所有

共有 人打赏支持
ksfzhaohui

ksfzhaohui

粉丝 303
博文 128
码字总数 158547
作品 3
南京
高级程序员
MySQL-Binlog V0.5发布

V0.5发布: 软件下载地址: http://git.oschina.net/qiangzigege/MySQL-Binlog 本次更新实现了自动探测表主键来决定数据按表并行加速还是按行并行加速, 主要原理是在解析binlog的数据时生成...

强子哥哥
2016/03/10
3.1K
3
MySQL-Binlog 1.2.0 发布

MySQL-Binlog V1.2.0 发布了。 什么是MySQL-Binlog MySQL-Binlog 连接到 MySQL 数据库,基于 Netty 框架,实时解析MySQL的二进制日志binlog而非定时查询数据库, 提取出用户级感兴趣的事件:...

强子哥哥
2016/05/24
2.7K
13
强子哥哥/MyBinlog

#MySQL-Binlog 为什么要自己写binlog? 提出问题: 当初选型Canal,看了半天源码发现未能进入binlog解析核心。 直接用,碰到问题去群里问,还是自己造轮子,有问题秒定位?我选择了后者. 分析问题:...

强子哥哥
2016/01/22
0
0
MySQL-Binlog V0.7 发布

什么是MySQL-Binlog MySQL-Binlog 连接到 MySQL 数据库,基于 Netty 框架,实时解析MySQL的二进制日志binlog而非定时查询数据库, 提取出用户级感兴趣的事件:INSERT,UPDATE,DELETE,供后续分析...

强子哥哥
2016/04/17
2.1K
0
MySQL-Binlog V1.0.0 正式发布

MySQL-Binlog V1.0.0 正式发布了。 什么是MySQL-Binlog MySQL-Binlog 连接到 MySQL 数据库,基于 Netty 框架,实时解析MySQL的二进制日志binlog而非定时查询数据库, 提取出用户级感兴趣的事...

强子哥哥
2016/04/28
3.8K
7
MySQL 日志解析工具--MySQL-Binlog

MySQL-Binlog 连接到 MySQL 数据库,基于 Netty 框架,实时解析MySQL的二进制日志binlog而非定时查询数据库, 提取出用户级感兴趣的事件:INSERT,UPDATE,DELETE,供后续分析如事件抛到大数据系...

强子哥哥
2016/01/18
5.5K
2
MySQL-Binlog V1.1.0 发布

MySQL-Binlog V1.1.0 发布了。 什么是MySQL-Binlog MySQL-Binlog 连接到 MySQL 数据库,基于 Netty 框架,实时解析MySQL的二进制日志binlog而非定时查询数据库, 提取出用户级感兴趣的事件:...

强子哥哥
2016/04/29
3.5K
13
MySQL-Binlog V0.2发布,支持集群和界面操作

MySQL-Binlog V0.2 发布了,该版本已经实现web界面和ZK集群: 连接到MySQL数据库,基于Netty框架,实时解析MySQL的二进制日志binlog而非定时查询数据库,提取出用户级感兴趣的事件:INSERT,U...

强子哥哥
2016/02/18
1K
2
MySQL-Binlog V0.3 发布

MySQL-Binlog V0.3 发布,本次更新了若干软件bug,主要增加了: 任务在集群机器间的负载均衡,让任务均匀的分布到各机器内。 使用过程中有问题或者bug,请进入官方群: 398822659 -------------...

强子哥哥
2016/02/26
3.6K
8
MySQL-Binlog V0.4发布

本次更新: 添加了按表进行并行复制的策略。 优势:对于多库多表的更新比较有效。 劣势:对于热点表无效。 TODO: 0.5版本增加自动识别主键,实现按行并行复制的策略。 --------------------...

强子哥哥
2016/03/07
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

机器学习管理平台 MLFlow

最近工作很忙,博客一直都没有更新。抽时间给大家介绍一下Databrick开源的机器学习管理平台-MLFlow。 谈起Databrick,相信即使是不熟悉机器学习和大数据的工程湿们也都有所了解,它由Spark的...

naughty
48分钟前
0
0
idea tomcat 远程调试

tomcat 配置 编辑文件${tomcat_home}/bin/catalina.sh,在文件开头添加如下代码。    CATALINA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=7829" Idea端配......

qwfys
今天
1
0
遍历目录下的文件每250M打包一个文件

#!/usr/bin/env python # -*- utf-8 -*- # @Time : 2018/7/20 0020 下午 10:16 # @Author : 陈元 # @Email : abcmeabc@163.com # @file : tarFile.py import os import tarfile import thr......

寻爱的小草
今天
1
0
expect同步文件&expect指定host和要同步的文件&构建文件分发系统&批量远程执行命令

20.31 expect脚本同步文件 expect通过与rsync结合,可以在一台机器上把文件自动同步到多台机器上 编写脚本 [root@linux-5 ~]# cd /usr/local/sbin[root@linux-5 sbin]# vim 4.expect#!/...

影夜Linux
今天
1
0
SpringBoot | 第九章:Mybatis-plus的集成和使用

前言 本章节开始介绍数据访问方面的相关知识点。对于后端开发者而言,和数据库打交道是每天都在进行的,所以一个好用的ORM框架是很有必要的。目前,绝大部分公司都选择MyBatis框架作为底层数...

oKong
今天
13
0
win10 上安装解压版mysql

1.效果 2. 下载MySQL 压缩版 下载地址: https://downloads.mysql.com/archives/community/ 3. 配置 3.1 将下载的文件解压到合适的位置 我最终将myql文件 放在:D:\develop\mysql 最终放的位...

Lucky_Me
今天
2
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

问题终结者
今天
2
0
expect脚本同步文件expect脚本指定host和要同步的文件 构建文件分发系统批量远程执行命令

expect脚本同步文件 在一台机器上把文件同步到多台机器上 自动同步文件 vim 4.expect [root@yong-01 sbin]# vim 4.expect#!/usr/bin/expectset passwd "20655739"spawn rsync -av ro...

lyy549745
今天
1
0
36.rsync下 日志 screen

10.32/10.33 rsync通过服务同步 10.34 linux系统日志 10.35 screen工具 10.32/10.33 rsync通过服务同步: rsync还可以通过服务的方式同步。那需要开启一个服务,他的架构是cs架构,客户端服务...

王鑫linux
今天
1
0
matplotlib 保存图片时的参数

简单绘图 import matplotlib.pyplot as pltplt.plot(range(10)) 保存为csv格式,放大后依然很清晰 plt.savefig('t1.svg') 普通保存放大后会有点模糊文件大小20多k plt.savefig('t5.p...

阿豪boy
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部