文档章节

canal源码分析——DirectLogFetcher源码分析

杨武兵
 杨武兵
发布于 2016/03/25 10:10
字数 777
阅读 482
收藏 7

类结构

DirectLogFetcher的类结构图如下。

LogBuffer

|

LogFetcher

|

DirectLogFetcher

LogBuffer是一个数据库复制日志的缓存区,可将日志缓冲存储起来。

LogFetcher是一个日志提取器的抽象类,它定义了一些提取日志的抽象方法,供子类实现。

DirectLogFetcher是一个既有socket的日志提取器实现类,它实现了LogFetcher类。我们重点研究这个类中代码的实现。

DirectLogFetcher源码分析

package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.taobao.tddl.dbsync.binlog.LogFetcher;

/**
 * 基于socket的logEvent实现
 * 
 * @author jianghang 2013-1-14 下午07:39:30
 * @version 1.0.0
 */
public class DirectLogFetcher extends LogFetcher {

    protected static final Logger logger            = LoggerFactory.getLogger(DirectLogFetcher.class);

    /** Command to dump binlog */
    public static final byte      COM_BINLOG_DUMP   = 18;

    /** Packet header sizes */
    public static final int       NET_HEADER_SIZE   = 4;
    public static final int       SQLSTATE_LENGTH   = 5;

    /** Packet offsets */
    public static final int       PACKET_LEN_OFFSET = 0;
    public static final int       PACKET_SEQ_OFFSET = 3;

    /** Maximum packet length */
    public static final int       MAX_PACKET_LENGTH = (256 * 256 * 256 - 1);

    private SocketChannel         channel;

    // private BufferedInputStream input;

    public DirectLogFetcher(){
        super(DEFAULT_INITIAL_CAPACITY, DEFAULT_GROWTH_FACTOR);
    }

    public DirectLogFetcher(final int initialCapacity){
        super(initialCapacity, DEFAULT_GROWTH_FACTOR);
    }

    public DirectLogFetcher(final int initialCapacity, final float growthFactor){
        super(initialCapacity, growthFactor);
    }

    public void start(SocketChannel channel) throws IOException {
        this.channel = channel;
        // 和mysql driver一样,提供buffer机制,提升读取binlog速度
        // this.input = new
        // BufferedInputStream(channel.socket().getInputStream(), 16384);
    }

    /**
     * {@inheritDoc}
     * 
     * @see com.taobao.tddl.dbsync.binlog.LogFetcher#fetch()
     */
    public boolean fetch() throws IOException {
        try {
            // Fetching packet header from input.
            if (!fetch0(0, NET_HEADER_SIZE)) {
                logger.warn("Reached end of input stream while fetching header");
                return false;
            }

            // Fetching the first packet(may a multi-packet).
            int netlen = getUint24(PACKET_LEN_OFFSET);
            int netnum = getUint8(PACKET_SEQ_OFFSET);
            if (!fetch0(NET_HEADER_SIZE, netlen)) {
                logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);
                return false;
            }

            // Detecting error code.
            final int mark = getUint8(NET_HEADER_SIZE);
            if (mark != 0) {
                if (mark == 255) // error from master
                {
                    // Indicates an error, for example trying to fetch from
                    // wrong
                    // binlog position.
                    position = NET_HEADER_SIZE + 1;
                    final int errno = getInt16();
                    String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);
                    String errmsg = getFixString(limit - position);
                    throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate
                                          + " errmsg = " + errmsg);
                } else if (mark == 254) {
                    // Indicates end of stream. It's not clear when this would
                    // be sent.
                    logger.warn("Received EOF packet from server, apparent"
                                + " master disconnected. It's may be duplicate slaveId , check instance config");
                    return false;
                } else {
                    // Should not happen.
                    throw new IOException("Unexpected response " + mark + " while fetching binlog: packet #" + netnum
                                          + ", len = " + netlen);
                }
            }

            // The first packet is a multi-packet, concatenate the packets.
            while (netlen == MAX_PACKET_LENGTH) {
                if (!fetch0(0, NET_HEADER_SIZE)) {
                    logger.warn("Reached end of input stream while fetching header");
                    return false;
                }

                netlen = getUint24(PACKET_LEN_OFFSET);
                netnum = getUint8(PACKET_SEQ_OFFSET);
                if (!fetch0(limit, netlen)) {
                    logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);
                    return false;
                }
            }

            // Preparing buffer variables to decoding.
            origin = NET_HEADER_SIZE + 1;
            position = origin;
            limit -= origin;
            return true;
        } catch (SocketTimeoutException e) {
            close(); /* Do cleanup */
            logger.error("Socket timeout expired, closing connection", e);
            throw e;
        } catch (InterruptedIOException e) {
            close(); /* Do cleanup */
            logger.info("I/O interrupted while reading from client socket", e);
            throw e;
        } catch (ClosedByInterruptException e) {
            close(); /* Do cleanup */
            logger.info("I/O interrupted while reading from client socket", e);
            throw e;
        } catch (IOException e) {
            close(); /* Do cleanup */
            logger.error("I/O error while reading from client socket", e);
            throw e;
        }
    }

    private final boolean fetch0(final int off, final int len) throws IOException {
        ensureCapacity(off + len);

        ByteBuffer buffer = ByteBuffer.wrap(this.buffer, off, len);
        while (buffer.hasRemaining()) {
            int readNum = channel.read(buffer);
            if (readNum == -1) {
                throw new IOException("Unexpected End Stream");
            }
        }

        // for (int count, n = 0; n < len; n += count) {
        // if (0 > (count = input.read(buffer, off + n, len - n))) {
        // // Reached end of input stream
        // return false;
        // }
        // }

        if (limit < off + len) limit = off + len;
        return true;
    }

    /**
     * {@inheritDoc}
     * 
     * @see com.taobao.tddl.dbsync.binlog.LogFetcher#close()
     */
    public void close() throws IOException {
        // do nothing
    }

}



上线是该类的源码,我们逐个方法来分析一下。


start()方法特别简单,几乎什么都没有干,直接给内部的channel赋值而已,这个没有什么看的。

重点是fetch()方法的实现特别复杂,

© 著作权归作者所有

杨武兵

杨武兵

粉丝 270
博文 61
码字总数 123254
作品 1
昌平
架构师
私信 提问
加载中

评论(3)

d
dubbo_boot
代码复制一遍嘛😓
juesequ
juesequ
😨
风过疏竹
风过疏竹
canal 系列第一篇质量最高,后面是一篇不如一篇
canal源码分析系列——ErosaConnection分析

类结构 ErosaConnection | |-------------------------------------- | | MysqlConnection LocalBinLogConnection ErosaConnection是一个连接的接口,定义了一些通用的方法。目前它有两个实现...

杨武兵
2016/03/24
512
1
【Canal源码分析】parser工作过程

本文主要分析的部分是instance启动时,parser的一个启动和工作过程。主要关注的是AbstractEventParser的start()方法中的parseThread。 一、序列图 二、源码分析 parseThread中包含的内容比较...

端木轩
2018/05/24
60
0
canal源码分析——项目组成结构

背景 canal项目的介绍及源码请到项目主页:https://github.com/alibaba/canal 去查看。 项目结构初探 下载源码后,我们打开目录看到的是如下一个目录结构。 可以看出canal是一个基于maven构建...

杨武兵
2016/03/15
2.2K
3
开源数据同步神器——canal

前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis、消息队列、大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将m...

IT米粉
01/10
0
0
canal源码分析系列

canal的源码分析的分享请看过来 http://my.oschina.net/ywbrj042/blog/637489

杨武兵
2016/03/17
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
10
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0
PHP+Ajax微信手机端九宫格抽奖实例

PHP+Ajax结合lottery.js制作的一款微信手机端九宫格抽奖实例,抽奖完成后有收货地址添加表单出现。支持可以设置中奖概率等。 奖品列表 <div class="lottery_list clearfix" id="lottery"> ......

ymkjs1990
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部