文档章节

canal源码分析系列——ErosaConnection分析

杨武兵
 杨武兵
发布于 2016/03/24 09:36
字数 1132
阅读 518
收藏 3

类结构

ErosaConnection

|

|--------------------------------------

|                                                    |

MysqlConnection                            LocalBinLogConnection

ErosaConnection是一个连接的接口,定义了一些通用的方法。目前它有两个实现类,MysqlConnection是与MySQL服务器连接的实现类,LocalBinLogConnection是与本地的binlog文件进行连接的实现类。从类中可以看出,目前canal还不支持oracle的实现。

接口定义

package com.alibaba.otter.canal.parse.inbound;

import java.io.IOException;

/**
 * 通用的Erosa的链接接口, 用于一般化处理mysql/oracle的解析过程
 * 
 * @author: yuanzu Date: 12-9-20 Time: 下午2:47
 */
public interface ErosaConnection {

	/**
	 * 建立连接
	 * @throws IOException
	 */
    public void connect() throws IOException;

    /**
     * 重新建立连接,会断开已有连接
     * @throws IOException
     */
    public void reconnect() throws IOException;

    /**
     * 断开连接。
     * @throws IOException
     */
    public void disconnect() throws IOException;

    /**
     * 是否建立连接。
     * @return
     */
    public boolean isConnected();

    /**
     * 用于快速数据查找,和dump的区别在于,seek会只给出部分的数据
     * @param binlogfilename biglog文件名
     * @param binlogPosition binlog起始位置。
     * @param func 事件解析处理器。
     */
    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

    /**
     * 获取binlog事件,如果没有数据会阻塞,等待数据的到达。
     * @param binlogfilename biglog文件名
     * @param binlogPosition binlog起始位置。
     * @param func 事件解析处理器。
     */
    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

    /**
     * 获取binlog事件,如果没有数据会阻塞,等待数据的到达。
     * @param timestamp 起始时间,只同步该时间之后的产生的新事件。
     * @param func 事件解析处理器。
     */
    public void dump(long timestamp, SinkFunction func) throws IOException;

    /**
     * 产生一个新的连接。
     */
    ErosaConnection fork();
}

其中实现类MysqlConnection是我们经常会使用到的一个类,先看看这个类是如何实现的。

MysqlConnection

属性和构造函数

private MysqlConnector      connector;
    private long                slaveId;
    private Charset             charset = Charset.forName("UTF-8");
    private BinlogFormat        binlogFormat;
    private BinlogImage         binlogImage;

    public MysqlConnection(){
    }

    public MysqlConnection(InetSocketAddress address, String username, String password){

        connector = new MysqlConnector(address, username, password);
    }

    public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber,
                           String defaultSchema){
        connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);
    }

从代码可以看出,它大部分依赖一个MysqlConnector组件来实现与MySQL的连接。我们稍后看看该代码的实现。

构造函数需要的是MySql服务器的地址,用户名和密码,该用户必须具备了replication slave权限才可以。slaveId是当前解析器的slaveId,它不能与其它的slaveId冲突。

连接方法实现

public void connect() throws IOException {
        connector.connect();
    }

    public void reconnect() throws IOException {
        connector.reconnect();
    }

    public void disconnect() throws IOException {
        connector.disconnect();
    }

    public boolean isConnected() {
        return connector.isConnected();
    }


基本上都是调用了MysqlConnection的方法实现的,还需要进入该类查看实现。

dump方法


public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        updateSettings();
        sendBinlogDump(binlogfilename, binlogPosition);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        fetcher.start(connector.getChannel());
        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
        LogContext context = new LogContext();
        while (fetcher.fetch()) {
            LogEvent event = null;
            event = decoder.decode(fetcher, context);

            if (event == null) {
                throw new CanalParseException("parse failed");
            }

            if (!func.sink(event)) {
                break;
            }
        }
    }  
/**
     * the settings that will need to be checked or set:<br>
     * <ol>
     * <li>wait_timeout</li>
     * <li>net_write_timeout</li>
     * <li>net_read_timeout</li>
     * </ol>
     * 
     * @param channel
     * @throws IOException
     */
    private void updateSettings() throws IOException {
        try {
            update("set wait_timeout=9999999");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }
        try {
            update("set net_write_timeout=1800");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            update("set net_read_timeout=1800");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // 设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化
            update("set names 'binary'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // mysql5.6针对checksum支持需要设置session变量
            // 如果不设置会出现错误: Slave can not handle replication events with the
            // checksum that master is configured to log
            // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
            update("set @master_binlog_checksum= '@@global.binlog_checksum'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // mariadb针对特殊的类型,需要设置session变量
            update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }
    }


dump方法的实现流程是这样的。

1. 更新MySQL配置信息。调用方法updateSettings();主要包括设置超时时间、设置数据库直接发送二进制数据,设置master_binlog_checksum和mariadb_slave_capability等变量值。

2.发送binlogdump命令。发送COM_BINLOG_DUMP命令,携带binlogFileName、binlogPosition和slaveServerId等关键信息。

3.构建一个binlog获取器组件DirectLogFetcher。使用它获得binlog数据。

4.循环从DirectLogFetcher获取内容,将获取到的数据转化为event。

5.调用SinkFunction处理获取到的event,若处理失败则会中断循环,否则继续。

接下来要看懂他们就需要了解MySQL的binlog协议及数据格式定义了。






© 著作权归作者所有

杨武兵

杨武兵

粉丝 276
博文 61
码字总数 123254
作品 1
昌平
架构师
私信 提问
加载中

评论(1)

风过疏竹
风过疏竹
额,这篇分析的有点水了
【Canal源码分析】parser工作过程

本文主要分析的部分是instance启动时,parser的一个启动和工作过程。主要关注的是AbstractEventParser的start()方法中的parseThread。 一、序列图 二、源码分析 parseThread中包含的内容比较...

端木轩
2018/05/24
66
0
canal源码分析系列

canal的源码分析的分享请看过来 http://my.oschina.net/ywbrj042/blog/637489

杨武兵
2016/03/17
1K
1
Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密...

小致Daddy
2018/08/03
21.2K
1
canal源码分析——项目组成结构

背景 canal项目的介绍及源码请到项目主页:https://github.com/alibaba/canal 去查看。 项目结构初探 下载源码后,我们打开目录看到的是如下一个目录结构。 可以看出canal是一个基于maven构建...

杨武兵
2016/03/15
2.2K
3
C语言自学完备手册(21)——递归

版权声明: https://blog.csdn.net/lfdfhl/article/details/82897146 自定义View系列教程00–推翻自己和过往,重学自定义View 自定义View系列教程01–常用工具介绍 自定义View系列教程02–o...

谷哥的小弟
2018/09/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周一乱弹 —— 人生,还真是到处是意外

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @这次装个文艺青年吧 :#今日歌曲推荐# 分享lil peep的单曲《High School》 《High School》- lil peep 手机党少年们想听歌,请使劲儿戳(这里...

小小编辑
23分钟前
3
0
Spring使用ThreadPoolTaskExecutor自定义线程池及实现异步调用

多线程一直是工作或面试过程中的高频知识点,今天给大家分享一下使用 ThreadPoolTaskExecutor 来自定义线程池和实现异步调用多线程。 一、ThreadPoolTaskExecutor 本文采用 Executors 的工厂...

CREATE_17
今天
6
0
CSS盒子模型

CSS盒子模型 组成: content --> padding --> border --> margin 像现实生活中的快递: 物品 --> 填充物 --> 包装盒 --> 盒子与盒子之间的间距 content :width、height组成的 内容区域 padd......

studywin
今天
7
0
修复Win10下开始菜单、设置等系统软件无法打开的问题

因为各种各样的原因导致系统文件丢失、损坏、被修改,而造成win10的开始菜单、设置等系统软件无法打开的情况,可以尝试如下方法解决 此方法只在部分情况下有效,但值得一试 用Windows键+R打开...

locbytes
昨天
8
0
jquery 添加和删除节点

本文转载于:专业的前端网站➺jquery 添加和删除节点 // 增加一个三和一节点function addPanel() { // var newPanel = $('.my-panel').clone(true) var newPanel = $(".triple-panel-con......

前端老手
昨天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部