文档章节

MySql Binlog事件数据篇

ksfzhaohui
 ksfzhaohui
发布于 2017/11/24 16:20
字数 2612
阅读 219
收藏 15
点赞 0
评论 0

前言
前两篇文章MySql Binlog初识MySql Binlog事件介绍篇分别从Binlog入门和Binlog事件如何产生的两个角度来介绍Binlog,本文将从Binlog事件的数据来更深入的了解Binlog。

Binlog事件数据
1.QUERY_EVENT
执行更新语句时会生成此事件,包括:create,insert,update,delete;
Fixed data part,总长度13字节:
4字节:执行sql的线程id;
4字节:执行sql的时间;
1字节:数据库名称的长度;
2字节:执行sql产生的错误码;
2字节:状态变量的长度,具体内容在Variable part;

Variable part:
可变字节:状态变量,每个状态变量key为一个字节,后面跟着value,不同的key对应不同长度的value,但是总长度在Fixed data part中已经定义;
可变字节:数据库名称
可变字节:sql语句,通过事件的总长度-header长度-Fixed data-状态变量,剩余的字节数组通过utf-8编码即可获取;

2.STOP_EVENT
当mysqld停止时生成此事件;
Fixed data part:空的

Variable part:空的

3.ROTATE_EVENT
当mysqld切换到新的binlog文件生成此事件;
Fixed data part,总长度8字节:
8字节:下一个binlog文件的第一个事件的position,这个值一直是4,因为魔数占用了4字节;

Variable data part:
可变字节:下一个binlog的名称,它的长度=事件总长度-header长度-Fixed data

4.INTVAR_EVENT
当sql语句中使用了AUTO_INCREMENT的字段或者LAST_INSERT_ID()函数;
Fixed data part:空的

Variable part:
1字节:一个变量类型的值:LAST_INSERT_ID_EVENT = 1 或者 INSERT_ID_EVENT = 2;
8字节:LAST_INSERT_ID()函数调用,或者AUTO_INCREMENT字段生成的一个无符号的整型;

5.RAND_EVENT

| bin-log.000003 |  438 | RAND               |         1 |         473 | rand_seed1=223769196,rand_seed2=1013907192

执行包含RAND()函数的语句产生此事件,此事件没有被用在binlog_format为ROW模式的情况下;
Fixed data part:空的

Variable part:
8字节:第一个种子值(ex:rand_seed1=223769196)
8字节:第二个种子值(ex:rand_seed2=1013907192)

6.USER_VAR_EVENT

| bin-log.000003 |  711 | User var           |         1 |         756 | @`age`=50

执行包含了用户变量的语句产生此事件,此事件没有被用在binlog_format为ROW模式的情况下;
Fixed data part:空的

Variable part:
4字节:用户变量名的大小;
可变字节:用户变量名,具体长度上一个4字节的数据指定了;
1字节:如果是变量值是NULL,那么此值是非0的;如果是此值是0,那么才有接下来的其他数据;应该是对有空值情况的一种优化;
1字节:用户变量类型,包括:(STRING_RESULT=0, REAL_RESULT=1, INT_RESULT=2, ROW_RESULT=3, DECIMAL_RESULT=4);
4字节:用户变量字符的数量;
4字节:用户变量值的长度;
可变字节:变量的值,通过变量类型和变量值的长度,可以解析出具体的变量值;

7.FORMAT_DESCRIPTION_EVENT

| bin-log.000003 |    4 | Format_desc        |         1 |         107 | Server ver: 5.5.29-log, Binlog ver: 4

描述事件,被写在每个binlog文件的开始位置,用在MySQL5.0以后的版本中,代替了START_EVENT_V3;
Fixed data part:
2字节:binlog版本,Mysql5.0以及以上的版本值为:4
50字节:Mysql Server版本;
4字节:事件创建的时间戳;
1字节:header的长度,binlog版本为4的情况下header长度是19;
可变字节:从START_EVENT_V3开始到第27个Event,每个Event的fixed part lengths,每个事件一个字节,总共27个字节;

Variable part:空的

8.XID_EVENT

| bin-log.000003 |  315 | Xid                |         1 |         342 | COMMIT /* xid=32 */

事务提交产生的XID_EVENT事件;
Fixed data part:空的

Variable part:
8字节:事务编号;

9.BEGIN_LOAD_QUERY_EVENT

| bin-log.000003 |  964 | Begin_load_query   |         1 |        1008 | ;file_id=3;block_len=21

执行LOAD DATA INFILE 语句时产生此事件
Fixed data part:
4字节:加载Data File的ID,防止加载的Data File内容是相同的;

Variable part:
加载数据的第一个块,如果文件大小超过某个阀值,后面会有多个APPEND_BLOCK_EVENT事件,每一个包含一个数据块;可变字节长度 = 事件的总长度 – header长度 – Fixed data;因为测试数据量比较少(999, 101, ‘zhaohui’)总共就21个字节,所以一个块足够了;

10.EXECUTE_LOAD_QUERY_EVENT

| bin-log.000003 | 1008 | Execute_load_query |         1 |        1237 | use `test`; LOAD DATA INFILE 'D:/btest.sql' INTO TABLE `btest` FIELDS TERMINATED BY ',' ENCLOSED BY '' ESCAPED BY '\\' LINES TERMINATED BY '\n' (`id`, `age`, `name`) ;file_id=3 |

执行LOAD DATA INFILE产生的事件,类似QUERY_EVENT事件,Fixed data的前13个字节和QUERY_EVENT类似;
Fixed data part:
4字节:执行sql的线程id;
4字节:执行sql的时间;
1字节:数据库名称的长度;
2字节:执行sql产生的错误码;
2字节:状态变量的长度,具体内容在Variable part;
4字节:加载Data File的ID;
4字节:文件名替换语句中的起始位置;
4字节:文件名替换语句中的结束位置;
1字节:如何处理重复数据,三个选项:LOAD_DUP_ERROR = 0, LOAD_DUP_IGNORE = 1, LOAD_DUP_REPLACE = 2

Variable part:
1.状态变量,每个状态变量key为一个字节,后面跟着value,不同的key对应不同长度的value,但是总长度在Fixed data part中已经定义;
2.sql语句,通过事件的总长度-header长度-Fixed data-状态变量,剩余的字节数组通过utf-8编码即可获取;

11.TABLE_MAP_EVENT

| bin-log.000004 |  844 | Table_map   |         1 |         892 | table_id: 33 (test.btest)

将表的定义映射到一个数字,在行操作事件之前记录(包括:WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT,DELETE_ROWS_EVENT);
Fixed data part:
6字节:表Id;
2字节:保留字段为将来使用;

Variable part:
1字节:数据库名字的长度;
可变字节:数据库名字,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取;
1字节:表名的长度;
可变字节:表名,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取;
Packed integer:用来记录表中字段的数量;
注:Packed integer是一个可变字节的类型,根据数据大小字节大小不一样,
更多详细:https://dev.mysql.com/doc/internals/en/event-content-writing-conventions.html
可变字节:表字段类型数组,每个字段一个字节;
Packed integer:用来记录表元数据的长度;
可变字节:元数据块,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取;
可变字节:用位域表示每一个字段是否为null,一个字节有8位,所以N个字段需要(N+7)/8个字节;

12.WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT和DELETE_ROWS_EVENT
binlog_format为ROW模式下,执行insert,update和delete操作产生的事件;
Fixed data part:
6字节:表Id;
2字节:保留字段为将来使用;

Variable part:
Packed integer:记录表中字段的数量;
可变字节:用位域表示每个字段是否被使用(比如只有更新、插入的字段才是被使用的),N个字段需要(N+7)/8个字节;
可变字节:仅用在UPDATE_ROWS_EVENT事件中,用位域表示每个字段更新之后是否被使用(值只有真正被更新了才是被使用的),N个字段需要(N+7)/8个字节;
接下来是记录的每一行的数据:
可变字节:当前行中的字段值是否为NULL,只有这个字段被标识为被使用,才会出现在这;
可变字节:当前行所有字段的值,只有这个字段被标识为被使用,并且值不为NULL才会有值;

13.INCIDENT_EVENT
主服务器发生了不正常的事件,通知从服务器并告知可能会导致数据处于不一致的状态;
Fixed data part:
1字节:不正常事件的编号;
1字节:消息的长度;

Variable part:
消息的内容,根据Fixed data part中指定的消息长度读取消息的内容;

14.HEARTBEAT_LOG_EVENT
主服务器告诉从服务器,主服务器还活着,不写入到日志文件中;
Fixed data part:空的

Variable part:空的

更多参考:https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html

Java读取简单实例
1.创建表,并插入数据,产生binlog日志文件;

2.查看binlog中的事件;

mysql> show binlog events in 'bin-log.000001';
+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Log_name       | Pos | Event_type  | Server_id | End_log_pos | Info                                                                                                                                                                                                           |
+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| bin-log.000001 |   4 | Format_desc |         1 |         107 | Server ver: 5.5.29-log, Binlog ver: 4                                                                                                                                                                          |
| bin-log.000001 | 107 | Query       |         1 |         364 | use `test`; CREATE TABLE `btest` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `age` int(11) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
| bin-log.000001 | 364 | Query       |         1 |         432 | BEGIN                                                                                                                                                                                                          |
| bin-log.000001 | 432 | Query       |         1 |         536 | use `test`; insert into btest values(1,100,'zhaohui')                                                                                                                                                          |
| bin-log.000001 | 536 | Xid         |         1 |         563 | COMMIT /* xid=30 */                                                                                                                                                                                            |
| bin-log.000001 | 563 | Stop        |         1 |         582 |                                                                                                                                                                                                                |
+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

3.通过java代码来读取binlog日志,具体代码如下:

public class BinlogRead {
 
    private static RandomAccessFile file;
    /** 魔数的字节长度 **/
    private static final int MAGIN_LEN = 4;
    /** 事件header长度 **/
    private static final int EVENT_HEADER_LEN = 19;
    /** Query_Event fix data长度 **/
    private static final int QUERY_EVENT_FIX_LEN = 13;
 
    public static void main(String[] args) throws Exception {
        file = new RandomAccessFile(new File("D://bin-log.000001"), "rw");
        FileChannel channel = file.getChannel();
 
        /** 1.魔数4字节 **/
        ByteBuffer magic = ByteBuffer.allocate(MAGIN_LEN);
        channel.read(magic);
 
        /** 2.Format_desc_Event事件 **/
        EventHeader header = getEventHeader(channel);
        channel.position(header.getEventLen() + MAGIN_LEN);
 
        /** 3.Query_Event事件 **/
        header = getEventHeader(channel);
        System.out.println(getQueryEventSql(header.getEventLen(), channel));
 
        /** 4.Query_Event事件 **/
        header = getEventHeader(channel);
        System.out.println(getQueryEventSql(header.getEventLen(), channel));
 
        /** 5.Query_Event事件 **/
        header = getEventHeader(channel);
        System.out.println(getQueryEventSql(header.getEventLen(), channel));
 
        /** 6.Xid_Event事件 **/
        header = getEventHeader(channel);
        ByteBuffer xidNumber = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
        channel.read(xidNumber);
        xidNumber.flip();
        System.out.println("xidNumber = " + xidNumber.getLong());
 
        /** 7.Stop_Event事件 **/
        header = getEventHeader(channel);
 
    }
 
    /**
     * 获取事件Header信息
     * 
     * @param channel
     * @return
     * @throws IOException
     */
    private static EventHeader getEventHeader(FileChannel channel) throws IOException {
        ByteBuffer formatDescEventHeader = ByteBuffer.allocate(EVENT_HEADER_LEN).order(ByteOrder.LITTLE_ENDIAN);
        channel.read(formatDescEventHeader);
        formatDescEventHeader.flip();
        EventHeader header = new EventHeader();
        header.setTimestamp(formatDescEventHeader.getInt());
        header.setTypeCode(formatDescEventHeader.get());
        header.setServerId(formatDescEventHeader.getInt());
        header.setEventLen(formatDescEventHeader.getInt());
        header.setNextPosition(formatDescEventHeader.getInt());
        header.setFlags(formatDescEventHeader.getShort());
        System.out.println(header.toString());
        return header;
    }
 
    /**
     * 获取Query Event sql语句
     * 
     * @param queryEventLen
     * @param channel
     * @return
     * @throws IOException
     */
    private static String getQueryEventSql(int queryEventLen, FileChannel channel) throws IOException {
        /** Query_Event fix data **/
        ByteBuffer queryEventFix = ByteBuffer.allocate(QUERY_EVENT_FIX_LEN).order(ByteOrder.LITTLE_ENDIAN);
        channel.read(queryEventFix);
        queryEventFix.flip();
        queryEventFix.position(11);
 
        /** 状态变量的长度 **/
        int statusLen = queryEventFix.getShort();
 
        int queryEventValLen = queryEventLen - EVENT_HEADER_LEN - QUERY_EVENT_FIX_LEN;
        ByteBuffer queryEventVal = ByteBuffer.allocate(queryEventValLen).order(ByteOrder.LITTLE_ENDIAN);
        channel.read(queryEventVal);
        queryEventVal.flip();
        queryEventVal.position(statusLen);
 
        /** 数据库名称 **/
        queryEventVal.mark();
        int length = 0;
        while ('\0' != queryEventVal.get()) {
            length++;
        }
        queryEventVal.reset();
        byte dbName[] = new byte[length];
        queryEventVal.get(dbName);
        System.out.println("db name : " + new String(dbName, "utf-8"));
 
        /** sql语句 **/
        byte sql[] = new byte[queryEventValLen - statusLen - length - 1];
        queryEventVal.get(sql);
        return new String(sql, "utf-8");
    }
}
public class EventHeader {
 
    private int timestamp;
    private byte typeCode;
    private int serverId;
    private int eventLen;
    private int nextPosition;
    private int flags;
 
    @Override
    public String toString() {
        return "EventHeader [timestamp=" + timestamp + ", typeCode=" + typeCode + ", serverId=" + serverId
                + ", eventLen=" + eventLen + ", nextPosition=" + nextPosition + ", flags=" + flags + "]";
    }
         
        //...get/set方法省略...
}

总结
本文对事件的数据格式做了详细的介绍,因为所有事件的event header部分都是一样的,所以文中主要介绍的event data部分,event data主要包括两个部分:Fixed data part和Variable part;最后通过一个简单实例来大致了解事件数据的读取方式,后续会提供更详细的binlog事件数据读取。

© 著作权归作者所有

共有 人打赏支持
ksfzhaohui

ksfzhaohui

粉丝 303
博文 128
码字总数 158547
作品 3
南京
高级程序员
Mysql数据库之Binlog日志使用总结(必看篇)

Mysql数据库之Binlog日志使用总结(必看篇) 下面小编就为大家带来一篇Mysql数据库之Binlog日志使用总结(必看篇)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看...

xiaocao13140
06/07
0
0
MySQL-Binlog V0.5发布

V0.5发布: 软件下载地址: http://git.oschina.net/qiangzigege/MySQL-Binlog 本次更新实现了自动探测表主键来决定数据按表并行加速还是按行并行加速, 主要原理是在解析binlog的数据时生成...

强子哥哥
2016/03/10
3.1K
3
如何通过Mysql的二进制日志恢复数据库数据

经常有网站管理员因为各种原因和操作,导致网站数据误删,而且又没有做网站备份,结果不知所措,甚至给网站运营和盈利带来负面影响。所以本文我们将和大家一起分享学习下如何通过Mysql的二机...

it1000001001
2017/04/17
267
1
MySQL高可用浅析

对于多数应用来说,MySQL都是作为最关键的数据存储中心的,所以,如何让MySQL提供HA服务,是我们不得不面对的一个问题。当master当机的时候,我们如何保证数据尽可能的不丢失,如何保证快速的...

59090939
2016/05/09
0
0
MySQL-Binlog V1.0.0 正式发布

MySQL-Binlog V1.0.0 正式发布了。 什么是MySQL-Binlog MySQL-Binlog 连接到 MySQL 数据库,基于 Netty 框架,实时解析MySQL的二进制日志binlog而非定时查询数据库, 提取出用户级感兴趣的事...

强子哥哥
2016/04/28
3.8K
7
详解MySQL数据库设置主从同步的方法

简介 MySQL主从同步是目前使用比较广泛的数据库架构,技术比较成熟,配置也不复杂,特别是对于负载比较大的网站,主从同步能够有效缓解数据库读写的压力。 MySQL主从同步的机制: MySQL同步的...

xiaocao13140
05/23
0
0
MySQL-Binlog 1.2.0 发布

MySQL-Binlog V1.2.0 发布了。 什么是MySQL-Binlog MySQL-Binlog 连接到 MySQL 数据库,基于 Netty 框架,实时解析MySQL的二进制日志binlog而非定时查询数据库, 提取出用户级感兴趣的事件:...

强子哥哥
2016/05/24
2.7K
13
MySQL——binlog

一、binlog简介: 1、什么是binlog: binlog日志用于记录所有更新了数据或者已经潜在更新了数据(例如,没有匹配任何行的一个DELETE)的所有语句。语句以“事件”的形式保存,它描述数据更改...

一个笨小孩
2017/07/26
0
0
MySQL-Binlog解析篇:4 大体框架出炉

这几天快速写了一个粗糙的版本, 具体原理: 技术选型 mysql-binlog-connector-java-0.2.4 + open-replicator-1.0.7 + Netty-4.0.33 原理及流程 1)创建到主数据库的socket连接 2)定制Net...

强子哥哥
2015/12/19
331
0
Mysql主从基本原理,以及读写分离导致主库从库数据不一致问题

Mysql主从基本原理,以及读写分离导致主库从库数据不一致问题   1、Mysql的主从同步就是当master(主库)发生数据变化的时候,会实时同步到slave(从库)。   2、主从复制可以水平扩展数...

xiaocao13140
06/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

机器学习管理平台 MLFlow

最近工作很忙,博客一直都没有更新。抽时间给大家介绍一下Databrick开源的机器学习管理平台-MLFlow。 谈起Databrick,相信即使是不熟悉机器学习和大数据的工程湿们也都有所了解,它由Spark的...

naughty
48分钟前
0
0
idea tomcat 远程调试

tomcat 配置 编辑文件${tomcat_home}/bin/catalina.sh,在文件开头添加如下代码。    CATALINA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=7829" Idea端配......

qwfys
今天
1
0
遍历目录下的文件每250M打包一个文件

#!/usr/bin/env python # -*- utf-8 -*- # @Time : 2018/7/20 0020 下午 10:16 # @Author : 陈元 # @Email : abcmeabc@163.com # @file : tarFile.py import os import tarfile import thr......

寻爱的小草
今天
1
0
expect同步文件&expect指定host和要同步的文件&构建文件分发系统&批量远程执行命令

20.31 expect脚本同步文件 expect通过与rsync结合,可以在一台机器上把文件自动同步到多台机器上 编写脚本 [root@linux-5 ~]# cd /usr/local/sbin[root@linux-5 sbin]# vim 4.expect#!/...

影夜Linux
今天
1
0
SpringBoot | 第九章:Mybatis-plus的集成和使用

前言 本章节开始介绍数据访问方面的相关知识点。对于后端开发者而言,和数据库打交道是每天都在进行的,所以一个好用的ORM框架是很有必要的。目前,绝大部分公司都选择MyBatis框架作为底层数...

oKong
今天
13
0
win10 上安装解压版mysql

1.效果 2. 下载MySQL 压缩版 下载地址: https://downloads.mysql.com/archives/community/ 3. 配置 3.1 将下载的文件解压到合适的位置 我最终将myql文件 放在:D:\develop\mysql 最终放的位...

Lucky_Me
今天
2
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

问题终结者
今天
2
0
expect脚本同步文件expect脚本指定host和要同步的文件 构建文件分发系统批量远程执行命令

expect脚本同步文件 在一台机器上把文件同步到多台机器上 自动同步文件 vim 4.expect [root@yong-01 sbin]# vim 4.expect#!/usr/bin/expectset passwd "20655739"spawn rsync -av ro...

lyy549745
今天
1
0
36.rsync下 日志 screen

10.32/10.33 rsync通过服务同步 10.34 linux系统日志 10.35 screen工具 10.32/10.33 rsync通过服务同步: rsync还可以通过服务的方式同步。那需要开启一个服务,他的架构是cs架构,客户端服务...

王鑫linux
今天
1
0
matplotlib 保存图片时的参数

简单绘图 import matplotlib.pyplot as pltplt.plot(range(10)) 保存为csv格式,放大后依然很清晰 plt.savefig('t1.svg') 普通保存放大后会有点模糊文件大小20多k plt.savefig('t5.p...

阿豪boy
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部