文档章节

zk源码阅读3: 持久化FileTxnLog

beyondflyh
 beyondflyh
发布于 2017/09/07 12:29
字数 2602
阅读 3
收藏 0
点赞 0
评论 0

摘要

本节讲解

zk的持久化框架
事务日志FileTxnLog日志结构
FileTxnLog源码
LogFormatter完成事务日志的反序列化
分析事务日志demo

持久化总体框架

持久化的类主要在包org.apache.zookeeper.server.persistence下,结构如下图

zk持久化框架

TxnLog,接口类型,读取事务性日志的接口。
FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。
Snapshot,接口类型,持久层快照接口。
FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。
FileTxnSnapLog,封装了TxnLog和SnapShot。
Util,工具类,提供持久化所需的API。

两种日志

zk主要存放了两类文件

snapshot(内存快照)
log(事务日志,类似MySQL的binlog,将所有与修改数据相关的操作记录在log中)

关于事务性日志的定义,可以参照refer,简而言之就是 zk事务日志文件用来记录事物操作,每一个事务操作如添加,删除节点等等,都会在事务日志中记录一条记录,用来在zookeeper异常情况下恢复数据

下面介绍事务日志

事务日志

正常运行过程中,针对所有更新操作,在返回客户端“更新成功”的响应前,ZK会确保已经将本次更新操作的事务日志写到磁盘上,只有这样,整个更新操作才会生效。

接口TxnLog

public interface TxnLog {
    
    /**
     * roll the current
     * log being appended to
     * @throws IOException 
     */
    // 滚动日志,从当前日志滚到下一个日志,不是回滚
    void rollLog() throws IOException;
    /**
     * Append a request to the transaction log
     * @param hdr the transaction header
     * @param r the transaction itself
     * returns true iff something appended, otw false 
     * @throws IOException
     */
    // 添加一个请求至事务性日志
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /**
     * Start reading the transaction logs
     * from a given zxid
     * @param zxid
     * @return returns an iterator to read the 
     * next transaction in the logs.
     * @throws IOException
     */
    // 读取事务性日志
    TxnIterator read(long zxid) throws IOException;
    
    /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    // 事务性操作的最新zxid
    long getLastLoggedZxid() throws IOException;
    
    /**
     * truncate the log to get in sync with the 
     * leader.
     * @param zxid the zxid to truncate at.
     * @throws IOException 
     */
    // 清空zxid以后的日志
    boolean truncate(long zxid) throws IOException;
    
    /**
     * the dbid for this transaction log. 
     * @return the dbid for this transaction log.
     * @throws IOException
     */
    // 获取数据库的id
    long getDbId() throws IOException;
    
    /**
     * commmit the trasaction and make sure
     * they are persisted
     * @throws IOException
     */
    // 提交事务并进行确认
    void commit() throws IOException;
   
    /** 
     * close the transactions logs
     */
    // 关闭事务性日志
    void close() throws IOException;
    /**
     * an iterating interface for reading 
     * transaction logs. 
     */
    // 读取事务日志的迭代器接口
    public interface TxnIterator {
        /**
         * return the transaction header.
         * @return return the transaction header.
         */
        // 获取事务头部
        TxnHeader getHeader();
        
        /**
         * return the transaction record.
         * @return return the transaction record.
         */
        // 获取事务
        Record getTxn();
     
        /**
         * go to the next transaction record.
         * @throws IOException
         */
        // 下个事务
        boolean next() throws IOException;
        
        /**
         * close files and release the 
         * resources
         * @throws IOException
         */
        // 关闭文件释放资源
        void close() throws IOException;
    }
}

实现类 FileTxnLog

文件结构

/**
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 * 
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes
 *   }
 * 
 * TxnList:
 *     Txn || Txn TxnList
 *     
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 * 
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 * 
 * Txnlen:
 *     len 4bytes
 * 
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *     
 * Record:
 *     See Jute definition file for details on the various record types
 *      
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote> 
 */

主要接口

append
//添加一条事务性日志
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr != null) { // 事务头部不为空
            if (hdr.getZxid() <= lastZxidSeen) {
                LOG.warn("Current zxid " + hdr.getZxid()
                        + " is <= " + lastZxidSeen + " for "
                        + hdr.getType());
            }
            if (logStream==null) { //日志流为空
               if(LOG.isInfoEnabled()){
                    LOG.info("Creating new log file: log." +  
                            Long.toHexString(hdr.getZxid()));
               }
               //生成一个新的log文件
               logFileWrite = new File(logDir, ("log." + 
                       Long.toHexString(hdr.getZxid())));
               fos = new FileOutputStream(logFileWrite);
               logStream=new BufferedOutputStream(fos);
               oa = BinaryOutputArchive.getArchive(logStream);
               //用TXNLOG_MAGIC VERSION dbId来生成文件头
               FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
               fhdr.serialize(oa, "fileheader");//序列化
               // Make sure that the magic number is written before padding.
               logStream.flush();
               currentSize = fos.getChannel().position();
               streamsToFlush.add(fos);
            }
            padFile(fos);//剩余空间不够4k时则填充文件64M
            byte[] buf = Util.marshallTxnEntry(hdr, txn);
            if (buf == null || buf.length == 0) {
                throw new IOException("Faulty serialization for header " +
                        "and txn");
            }
            Checksum crc = makeChecksumAlgorithm();//生成验证算法
            crc.update(buf, 0, buf.length);
            oa.writeLong(crc.getValue(), "txnEntryCRC");//将验证算法的值写入long
            Util.writeTxnBytes(oa, buf);//将序列化事务记录写入OutputArchive,以0x42('B')结束
            return true;
        }
        return false;
    }

添加事务性日志 )

getLogFiles

//找出<=snapshot的中最大的zxid的logfile以及后续的logfile
    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        List<File> files = Util.sortDataDir(logDirList, "log", true);//按照后缀抽取zxid,按zxid升序排序
        long logZxid = 0;
        // Find the log file that starts before or at the same time as the
        // zxid of the snapshot
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid > snapshotZxid) {
                continue;
            }
            // the files
            // are sorted with zxid's
            if (fzxid > logZxid) {
                logZxid = fzxid;
            }
        }
        List<File> v=new ArrayList<File>(5);
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid < logZxid) {
                continue;
            }
            v.add(f);
        }
        return v.toArray(new File[0]);
    }

getLastLoggedZxid

//获取记录在log中的最后一个zxid
    public long getLastLoggedZxid() {
        File[] files = getLogFiles(logDir.listFiles(), 0);
        //找到最大的zxid所在的文件
        long maxLog=files.length>0?
                Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

        // if a log file is more recent we must scan it to find
        // the highest zxid
        long zxid = maxLog;
        TxnIterator itr = null;
        try {
            FileTxnLog txn = new FileTxnLog(logDir);
            itr = txn.read(maxLog);
            while (true) {
                if(!itr.next())
                    break;
                TxnHeader hdr = itr.getHeader();//遍历这个文件,找到最后一条事务日志记录
                zxid = hdr.getZxid();//取出zxid
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            close(itr);
        }
        return zxid;
    }

commit

//提交事务日志至磁盘
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();// 强制刷到磁盘
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush();// 强制刷到磁盘
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                log.getChannel().force(false);

                long syncElapsedMS =
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.removeFirst().close();// 移除流并关闭
        }
    }

truncate

//清空大于指定zxid的事务日志
    public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            itr = new FileTxnIterator(this.logDir, zxid);//根据zxid找到迭代器
            PositionInputStream input = itr.inputStream;
            if(input == null) {
                throw new IOException("No log files found to truncate! This could " +
                        "happen if you still have snapshots from an old setup or " +
                        "log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
            }
            long pos = input.getPosition();
            // now, truncate at the current position
            RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
            raf.setLength(pos);//把当前log后面的部分(zxid更大的)截断
            raf.close();
            while (itr.goToNextLog()) {
                if (!itr.logFile.delete()) {//把后面的log文件都删除
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            close(itr);
        }
        return true;
    }

rollLog

这个一定要看注释,意思不是回滚日志,是从当前日志滚到下一个

    /**
     * rollover the current log file to a new one.
     * @throws IOException
     */
    public synchronized void rollLog() throws IOException {
        if (logStream != null) {
            this.logStream.flush();
            this.logStream = null;
            oa = null;
        }
    }

事务日志可视化 LogFormatter

可以结合org.apache.zookeeper.server.persistence.FileTxnLog#append进行理解 传入参数为对应的事务日志路径即可

public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("USAGE: LogFormatter log_file");
            System.exit(2);
        }
        FileInputStream fis = new FileInputStream(args[0]);
        BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
        FileHeader fhdr = new FileHeader();
        fhdr.deserialize(logStream, "fileheader");
        //反序列化header完成验证
        if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
            System.err.println("Invalid magic number for " + args[0]);
            System.exit(2);
        }
        System.out.println("ZooKeeper Transactional Log File with dbid "
                + fhdr.getDbid() + " txnlog format version "
                + fhdr.getVersion());

        int count = 0;
        while (true) {
            long crcValue;
            byte[] bytes;
            try {
                crcValue = logStream.readLong("crcvalue");//获取反序列化的checksum

                bytes = logStream.readBuffer("txnEntry");
            } catch (EOFException e) {
                System.out.println("EOF reached after " + count + " txns.");
                return;
            }
            if (bytes.length == 0) {
                // Since we preallocate, we define EOF to be an
                // empty transaction
                System.out.println("EOF reached after " + count + " txns.");
                return;
            }
            Checksum crc = new Adler32();
            crc.update(bytes, 0, bytes.length);
            if (crcValue != crc.getValue()) {//比较自己生成的checksum与传递过来的checksum
                throw new IOException("CRC doesn't match " + crcValue +
                        " vs " + crc.getValue());
            }
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(bytes, hdr);//反序列化事务
            System.out.println(DateFormat.getDateTimeInstance(DateFormat.SHORT,
                    DateFormat.LONG).format(new Date(hdr.getTime()))
                    + " session 0x"
                    + Long.toHexString(hdr.getClientId())
                    + " cxid 0x"
                    + Long.toHexString(hdr.getCxid())
                    + " zxid 0x"
                    + Long.toHexString(hdr.getZxid())
                    + " " + TraceFormatter.op2String(hdr.getType()) + " " + txn);
            if (logStream.readByte("EOR") != 'B') {
                LOG.error("Last transaction was partial.");
                throw new EOFException("Last transaction was partial.");
            }
            count++;
        }
    }

反序列化事务记录

事务日志可视化效果

针对http://www.jianshu.com/p/d1f8b9d6ad57贴出的demo 利用LogFormatter进行解析,事先把事务日志目录清空 输出为

ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
17-5-24 下午04时15分41秒 session 0x15c398687180000 cxid 0x0 zxid 0x1 createSession 20000

17-5-24 下午04时15分41秒 session 0x15c398687180000 cxid 0x2 zxid 0x2 create '/test1,#7a6e6f646531,v{s{31,s{'world,'anyone}}},T,1

17-5-24 下午04时15分41秒 session 0x15c398687180000 cxid 0x3 zxid 0x3 create '/test2,#7a6e6f646532,v{s{31,s{'world,'anyone}}},T,2

17-5-24 下午04时15分41秒 session 0x15c398687180000 cxid 0x4 zxid 0x4 create '/test3,#7a6e6f646533,v{s{31,s{'world,'anyone}}},T,3

17-5-24 下午04时15分43秒 session 0x15c398687180000 cxid 0x9 zxid 0x5 setData '/test2,#7a4e6f64653232,1

17-5-24 下午04时15分43秒 session 0x15c398687180000 cxid 0xb zxid 0x6 delete '/test2

17-5-24 下午04时15分43秒 session 0x15c398687180000 cxid 0xc zxid 0x7 delete '/test1

17-5-24 下午04时16分04秒 session 0x15c398687180000 cxid 0x0 zxid 0x8 closeSession null
EOF reached after 8 txns.

结合FileTxnLog#append很好理解

吐槽

tag不匹配

序列化时
org.apache.zookeeper.server.persistence.FileTxnLog#append里面是
oa.writeLong(crc.getValue(), "txnEntryCRC");//将验证算法的值写入long
反序列化,解析的时候是
org.apache.zookeeper.server.LogFormatter#main
crcValue = logStream.readLong("crcvalue");
这俩tag都不一样,虽然并不影响运行!!!

FileTxnLog#getLogFiles效率低

都已经按zxid升序排序了,一次循环就该搞定了

思考

文件后缀是按照zxid来生成的

logFileWrite = new File(logDir, ("log." +  Long.toHexString(hdr.getZxid())));
这对于定位文件,zxid都提供了一些便利
比如在getLastLoggedZxid中的调用

rollLog函数的意义

函数没有参数 一定要注意,是从当前日志,滚到下一个日志(比如日志量太大了之类的场景) 不是回滚日志里面的记录,试想回滚怎么能不告诉回滚的zxid呢

可以比较一下,rollLog函数造成logstream为null,append函数便会生成新的文件logFileWrite,新的流logStream

commit和rollLog两个函数都调用了flush,区别是什么

涉及到FileChannel,nio相关,

写入FileChannel调用链如下 org.apache.zookeeper.server.persistence.FileTxnLog#append org.apache.zookeeper.server.persistence.FileTxnLog#padFile org.apache.zookeeper.server.persistence.Util#padLogFile java.nio.channels.FileChannel#write(java.nio.ByteBuffer, long)

用了FileChannel的write方法

在commit函数中调用了 log.getChannel().force(false); 即java.nio.channels.FileChannel#force

查阅相关资料如 https://java-nio.avenwu.net/java-nio-filechannel.html 说明了

force方法会把所有未写磁盘的数据都强制写入磁盘。
这是因为在操作系统中出于性能考虑回把数据放入缓冲区,所以不能保证数据在调用write写入文件通道后就及时写到磁盘上了,除非手动调用force方法。 
force方法需要一个布尔参数,代表是否把meta data也一并强制写入。

也就是只有commit方法会进行真正的写入磁盘,rollLog并没有

事务日志什么时候会调用truncate 清空部分日志

集群版learner向leader同步的时候,leader告诉learner需要回滚同步 调用方Learner#syncWithLeader,后面40节会讲

问题

rollLog函数调用flush的作用

上面讲了commit和rollLog两个函数的区别 rollLog调用flush,最后的效果是什么呢?又没有写入磁盘(否则不会再调用commit) 写入了内存吗?又没有调用FileChannel的相关方法。

refer

http://www.cnblogs.com/leesf456/p/6279956.html 如何查看事务日志 FileTxnLog 什么是事务性日志 ZooKeeper运维之数据文件和事务日志

© 著作权归作者所有

共有 人打赏支持
beyondflyh
粉丝 0
博文 2
码字总数 3332
作品 0
深圳
程序员
elastic-job 源码解读之job配置数据存储

elastic-job作业相关的数据都是配置在zk上的,包括分片参数,作业失效转移,运行实例等等都是保存在ZK上的,那具体的zk节点的树形结构会是什么样子?每一个节点又是什么时候注册到zk上的? ...

一滴水的坚持
2017/11/26
0
0
ActiveMQ从入门到精通(三)

这是ActiveMQ系列的最后一篇文章,主要是关于ActiveMQ集群,这里采用的方式是:Zookeeper+LevelDB+ActiveMQ。前面2篇博客地址如下:《ActiveMQ从入门到精通(一)》、《ActiveMQ从入门到精通...

zfz_linux_boy
2017/03/26
0
0
elastic-job 源码解读之从源码看zookeeper的API的使用

在众多的zookeeper客户端工具中,elastic-job拥抱了Curator,而不是ZKClient,或者原生的zookeeper原生的客户端 。 先看一下Elastic-Job引入的依赖: curator.png 从图中看,引入了curator-c...

一滴水的坚持
2017/11/19
0
0
zookeeper数据迁移失败求解答

原创:转载请:http://my.oschina.net/u/1760714/blog/655124 通过查看从PAXOS到ZOOKEEPER分布式一致性原理与实践以及阅读相关源码,今天终于知道为什么会失败。 科普zookeeper领导者选举的过...

杭电任宇翔
2016/04/08
257
0
dubbo源码解析-zookeeper创建节点

前言 在之前dubbo源码解析-本地暴露中的前言部分提到了两道高频的面试题,其中一道在上周的dubbo源码解析-zookeeper连接中已经讲到,这周解析的是另一道,即 上周就有朋友问到我,为什么我的源码...

肥朝
2017/11/19
0
0
使用zookeeper序列节点实现不可重入分布式锁

一、前言 在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源时候,juc包的锁就...

加多
01/12
0
0
Spark1.6.0功能扩展——为HiveThriftServer2增加HA

前言 HiveThriftServer2是Spark基于HiveServer2实现的多Session管理的Thrift服务,提供对Hive的集中式管理服务。HiveThriftServer2作为Yarn上的Application,目前只支持yarn-client模式——即...

beliefer
04/16
0
0
【Kafka源码】Kafka代码模块

Kafka源码依赖于Scala环境,首先需要安装scala,这块请自行百度进行安装。 传送门 当然,我们要分析源码,需要下载源码,请自行从github上面下载。 说明:本文使用的kafka版本为0.10.0.1,这...

端木轩
2017/10/23
0
0
elastic-job 源码解读之从源码看null值

在elastic-Job作业启动的时候,首先会去持久化LiteJob的配置信息,持久化保存之前先去会检查job存不存在,读取zk上的原有配置的job信息,如果节点不存在,或者需要重写的时候,去注册中心写节...

一滴水的坚持
2017/11/22
0
0
RocketMQ 源码分析 定时消息与消息重试

概述 建议前置阅读内容: 《RocketMQ 源码分析 —— Message 发送与接收》 《RocketMQ 源码分析 —— Message 拉取与消费(下)》 为什么把定时消息与消息重试放在一起?你猜。 你猜我猜不猜...

wangchen1999
04/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Git 基础 - 远程仓库的使用

远程仓库的使用 要参与任何一个 Git 项目的协作,必须要了解该如何管理远程仓库。远程仓库是指托管在网络上的项目仓库,可能会有好多个,其中有些你只能读,另外有些可以写。同他人协作开发某...

谢思华
12分钟前
0
0
面试宝典-悲观锁和乐观锁

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。 乐观锁(Optimistic...

suyain
13分钟前
0
0
崛起于Springboot2.X之集成MongoDb使用mongoTemplate CRUD(27)

1、pom依赖 <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version></dependency><dependency> <groupId>log4j</......

木九天
24分钟前
0
0
切分log日志

新建logback.xml放到resource里面 <?xml version="1.0" encoding="utf-8"?><configuration> <appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender"> <......

talen
29分钟前
0
0
spring @Resource 和 @Autowired 的使用区别

这两个 注解 @Resource 和 @Autowired , 常识都知道 @Resource 是 JAVAEE 自带的,@Autowired 是 spring 的自定义注解。 一般情况下, 使用 bean的时候, 这两个注解 随便使用一个即可。 但...

之渊
34分钟前
0
0
springboot集成elasticsearch客户端问题记录

1背景说明 服务端ES版本为5.5.2,springboot版本为1.5.6。 工程中添加如下依赖 2问题记录 2.1 NetworkPlugin类找不到 报错java.lang.ClassNotFoundException: org.elasticsearch.plugins.Net...

zjg23
36分钟前
1
0
快速构建ceph可视化监控系统

前言 ceph的可视化方案很多,本篇介绍的是比较简单的一种方式,并且对包都进行了二次封装,所以能够在极短的时间内构建出一个可视化的监控系统 本系统组件如下: ceph-jewel版本 ceph_expor...

万建宁
36分钟前
0
0
Java构造器使用注意

public class 父类A {int age = 10;protected void say() {System.out.println("父类A");}public 父类A() {override();}public void override() {Syst...

咸鱼AI
36分钟前
0
0
TensorFlow 线性分类

构造直线 z = 2 * x - 3 * y + 4 x0*w0+x1*w1+b=0 x1=-x0* w0/w1-b/w1 斜率 k= -w0/w1 截距 -b/w1 随机生成数据,加入一定的偏差,用直线将二维平面分为两部分 使用线性模型拟合参数 损失函数...

阿豪boy
39分钟前
0
0
翻译冒泡排序测试

翻译一个冒泡排序: var a = [1,3,2,4,6,5];var f = 0;var n = a.length ;for( var i =1; i<= n; i++) { for( var j = n-1 ; j >= i; j --) { if(a[j] < a[j+1]) { ......

钟元OSS
41分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部