【Zookeeper源码三】Zookeeper 单机版服务器介绍

原创
2015/12/01 16:16
阅读数 1.7K

#0 系列目录#

#1 单机版服务器启动方式# 单机版的服务器启动,使用ZooKeeperServerMain的main函数来启动,参数分为两种:

  • 只有一个参数:表示为一个配置文件地址;
  • 有2~4个参数:分别表示端口、dataDir、tickTime、maxClientCnxns;

详细介绍见开篇的介绍运行ZooKeeper

接下来看下启动的整个过程:

输入图片说明

第一步:创建一个ZooKeeperServer,代表着一个服务器对象;

第二步:根据配置参数dataLogDir和dataDir创建出用于管理事务日志和快照的对象FileTxnSnapLog;

第三步:对ZooKeeperServer设置一些配置参数,如tickTime、minSessionTimeout、maxSessionTimeout;

第四步:创建ServerCnxnFactory,用于创建ServerSocket,等待客户端的socket连接;

第五步:启动ZooKeeperServer服务;

#2 ZooKeeperServer服务器对象# ZooKeeperServer是单机版才使用的服务器对象,集群版都是使用的是它的子类,来看下继承类图:

输入图片说明

可以看到,集群版分别用的是LeaderZooKeeperServer、FollowerZooKeeperServer、ObserverZooKeeperServer后两者都属于LearnerZooKeeperServer

##2.1 ZooKeeperServer的重要属性## 输入图片说明

  • tickTime:默认3000ms,用于计算默认的minSessionTimeout、maxSessionTimeout。计算方式如下:
public int getMinSessionTimeout() {
    return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
}

public int getMaxSessionTimeout() {
    return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
}

同时还用于指定SessionTrackerImpl的执行过期检查的周期时间,详细见说明使用sessionTracker的session过期检查

  • minSessionTimeout、maxSessionTimeout:用于限制客户端给出的sessionTimeout时间

  • SessionTracker sessionTracker:负责创建和管理session,同时负责定时进行过期检查

  • ZKDatabase zkDb:用于存储ZooKeeper树形数据的模型。

  • FileTxnSnapLog txnLogFactory:负责管理事务日志和快照日志文件,能根据它加载出数据到ZKDatabase中,同时能将ZKDatabase中的数据以及session保存到快照日志文件中。后面会详细说明FileTxnSnapLog。

操作如下:

new ZKDatabase(txnLogFactory)
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
  • RequestProcessor firstProcessor:ZooKeeperServer请求处理器链中的第一个处理器

  • long hzxid:ZooKeeperServer最大的事务编号,每来一个事务请求,都会分配一个事务编号

  • ServerCnxnFactory serverCnxnFactory:负责创建ServerSocket,接受客户端的socket连接

  • ServerStats serverStats:负责统计server的运行状态

##2.2 ZKDatabase 介绍## 先来看下ZKDatabase的注释和属性:

输入图片说明

从注释中可以看到ZKDatabase中所包含的信息有:

  • sessions信息,即ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,也就是说仅仅会保存sessionId对应的timeout时间

  • DataTree:即ZooKeeper的内存节点信息;

  • LinkedList<Proposal> committedLog:用于保存最近提交的一些事务

来重点看下DataTree的实现:DataTree.java

public class DataTree {

    private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);

    public static final long CONTAINER_EPHEMERAL_OWNER = Long.MIN_VALUE;

    /**
     * This hashtable provides a fast lookup to the datanodes. The tree is the
     * source of truth and is where all the locking occurs
     */
    private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();

    /** the root of zookeeper tree */
    private static final String rootZookeeper = "/";

    /** the zookeeper nodes that acts as the management and status node **/
    private static final String procZookeeper = Quotas.procZookeeper;

    /** this will be the string thats stored as a child of root */
    private static final String procChildZookeeper = procZookeeper.substring(1);

    /**
     * the zookeeper quota node that acts as the quota management node for
     * zookeeper
     */
    private static final String quotaZookeeper = Quotas.quotaZookeeper;

    /** this will be the string thats stored as a child of /zookeeper */
    private static final String quotaChildZookeeper = quotaZookeeper.substring(procZookeeper.length() + 1);

    /**
     * the zookeeper config node that acts as the config management node for
     * zookeeper
     */
    private static final String configZookeeper = ZooDefs.CONFIG_NODE;

    /** this will be the string thats stored as a child of /zookeeper */
    private static final String configChildZookeeper = configZookeeper.substring(procZookeeper.length() + 1);
    
    /**
     * the path trie that keeps track fo the quota nodes in this datatree
     */
    private final PathTrie pTrie = new PathTrie();

    /**
     * This hashtable lists the paths of the ephemeral nodes of a session.
     */
    private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();

    /**
     * This set contains the paths of all container nodes
     */
    private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

    /**
     * this is map from longs to acl's. It saves acl's being stored for each
     * datanode.
     */
    private final Map<Long, List<ACL>> longKeyMap = new HashMap<Long, List<ACL>>();

    /**
     * this a map from acls to long.
     */
    private final Map<List<ACL>, Long> aclKeyMap = new HashMap<List<ACL>, Long>();

    /**
     * these are the number of acls that we have in the datatree
     */
    private long aclIndex = 0;

    ......
}
  • ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>(); 维护了path对应的DataNode。每个DataNode内容如下:

输入图片说明

有DataNode parent和Set<String> children,同时byte data[]存储本节点的数据。StatPersisted stat存储本节点的状态信息;

  • Map<Long, HashSet<String>> ephemerals =new ConcurrentHashMap<Long, HashSet<String>>();维护了每个session对应的临时节点的集合。

  • WatchManager dataWatches、WatchManager childWatches分别用于管理节点自身数据更新的事件触发和该节点的所有子节点变动的事件触发。每个WatchManager的结构如下:

输入图片说明

watchTable维护着每个path对应的Watcher。watch2Paths维护着每个Watcher监控的所有path,即每个Watcher是可以监控多个path的。在服务器端Watcher的实现其实是ServerCnxn,如下:

public abstract class ServerCnxn implements Stats, Watcher

而每个ServerCnxn则代表服务器端为每个客户端分配的handler,负责与客户端进行通信。客户端每次对某个path注册的Watcher,在传输给服务器端的时候仅仅是传输一个boolean值,即是否监听某个path,并没有把我们自定义注册的Watcher传输到服务器端(况且Watcher也不能序列化),而是在本地客户端进行存储,存储着对某个path注册的Watcher。服务器端接收到该boolean值之后,如果为true,则把该客户端对应的ServerCnxn作为Watcher存储到上述WatchManager中,即上述WatchManager中存储的是一个个ServerCnxn实例一旦服务器端数据变化,触发对应的ServerCnxn,ServerCnxn然后把该事件又传递客户端,客户端这时才会真正引发我们自定义注册的Watcher

上面只是简单描述了一下,之后的文章会详细源码分析整个过程。

DataTree就负责进行node的增删改查。我们知道node的类型分为四种类型:

PERSISTENT:持久型节点;

PERSISTENT_SEQUENTIAL:持久型顺序型节点;

EPHEMERAL:临时型节点;

EPHEMERAL_SEQUENTIAL:临时型顺序型节点;

前两者持久型节点和后两者临时型节点的不同之处就在于,一旦当客户端session过期,则会清除临时型节点,不会清除持久型节点,除非去执行删除操作。

而顺序型节点,则是每次创建一个节点,会在一个节点路径的后面加上父节点的cversion版本号(即该父节点的所有子节点一旦发生变化,就会自增该版本号)的格式化形式,如下:

输入图片说明

可以看到是将父节点的cversion版本号以10进制形式输出,宽度是10位,不足的话前面补0。所以是在执行DataTree创建node方法之前就已经定好了path路径的。

再来看下是如何区分持久型和临时型节点的呢?

在DataTree创建node方法会传递一个ephemeralOwner参数,当客户端选择的是持久型节点,给出的sessionId为0,当为临时型节点时,给出客户端的sessionId,如下:

输入图片说明

先来看下DataTree创建node方法的方法:

输入图片说明

  1. 先判断父节点存不存在,不存在的话,报错。
  2. 然后检查父节点的所有子节点是否已存在要创建的节点,如果存在报错。
  3. 创建出节点,并存放到DataTree的ConcurrentHashMap<String, DataNode> nodes属性中,见上文描述。
  4. 判断该节点是否是临时节点,如果是临时节点,则ephemeralOwner参数即为客户端的sessionId。然后以sessionId为key,存储该客户端所创建的所有临时节点到DataTree的Map<Long, HashSet<String>> ephemerals属性中,见上文描述。

##2.3 ZooKeeperServer请求处理器链介绍## ZooKeeper使用请求处理器链的方式来处理请求,先看下请求处理器的定义RequestProcessor:

输入图片说明

从注释上可以看到几个要点:

  1. RequestProcessor是以责任链的形式来处理事务的。
  2. 请求是被顺序的进行处理的,单机版、集群版的Leader、Follower略有不同。
  3. 对于请求的处理,是通过processRequest(Request request)方法来处理的。有些处理器是一个线程,即请求被扔到该线程中进行处理。
  4. 当调用shutdown时,也会关闭它所关联的RequestProcessor。

来看下ZooKeeperServer请求处理器链的具体情况:

输入图片说明

即PrepRequestProcessor-》SyncRequestProcessor-》FinalRequestProcessor,来一个一个具体看看:

###2.3.1 PrepRequestProcessor处理器### 主要内容:对请求进行区分是否是事务请求,如果是事务请求则创建出事务请求头,同时执行一些检查操作对于增删改等影响数据状态的操作都被认为是事务,需要创建出事务请求头。大体属性如下:

输入图片说明

LinkedBlockingQueue<Request> submittedRequests:提交的用户请求

RequestProcessor nextProcessor:下一个请求处理器

ZooKeeperServer zks:服务器对象

PrepRequestProcessor所实现的processRequest接口方法即为:将该请求放入submittedRequests请求队列中

    public void processRequest(Request request) {
        submittedRequests.add(request);
    }

同时PrepRequestProcessor又是一个线程,在run方法中又会不断的取出上述用户提交的请求,进行处理,整个处理过程如下:

    @Override
    public void run() {
        try {
            while (true) {
                Request request = submittedRequests.take();
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }
                pRequest(request);
            }
        } catch (RequestProcessorException e) {
            if (e.getCause() instanceof XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            handleException(this.getName(), e);
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }

    protected void pRequest(Request request) throws RequestProcessorException {
        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
        request.setHdr(null);
        request.setTxn(null);

        try {
            switch (request.type) {
            case OpCode.createContainer:
            case OpCode.create:
            case OpCode.create2:
                CreateRequest create2Request = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break;
            case OpCode.deleteContainer:
            case OpCode.delete:
                DeleteRequest deleteRequest = new DeleteRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
                break;
            case OpCode.setData:
                SetDataRequest setDataRequest = new SetDataRequest();                
                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                break;
            case OpCode.reconfig:
                ReconfigRequest reconfigRequest = new ReconfigRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
                pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
                break;
            case OpCode.setACL:
                SetACLRequest setAclRequest = new SetACLRequest();                
                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
                break;
            case OpCode.check:
                CheckVersionRequest checkRequest = new CheckVersionRequest();              
                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
                break;
            case OpCode.multi:
                MultiTransactionRecord multiRequest = new MultiTransactionRecord();
                try {
                    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                } catch(IOException e) {
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                            Time.currentWallTime(), OpCode.multi));
                    throw e;
                }
                List<Txn> txns = new ArrayList<Txn>();
                //Each op in a multi-op must have the same zxid!
                long zxid = zks.getNextZxid();
                KeeperException ke = null;

                //Store off current pending change records in case we need to rollback
                Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

                for(Op op: multiRequest) {
                    Record subrequest = op.toRequestRecord();
                    int type;
                    Record txn;

                    /* If we've already failed one of the ops, don't bother
                     * trying the rest as we know it's going to fail and it
                     * would be confusing in the logfiles.
                     */
                    if (ke != null) {
                        type = OpCode.error;
                        txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                    }

                    /* Prep the request and convert to a Txn */
                    else {
                        try {
                            pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                            type = request.getHdr().getType();
                            txn = request.getTxn();
                        } catch (KeeperException e) {
                            ke = e;
                            type = OpCode.error;
                            txn = new ErrorTxn(e.code().intValue());

                            LOG.info("Got user-level KeeperException when processing "
                                    + request.toString() + " aborting remaining multi ops."
                                    + " Error Path:" + e.getPath()
                                    + " Error:" + e.getMessage());

                            request.setException(e);

                            /* Rollback change records from failed multi-op */
                            rollbackPendingChanges(zxid, pendingChanges);
                        }
                    }

                    //FIXME: I don't want to have to serialize it here and then
                    //       immediately deserialize in next processor. But I'm
                    //       not sure how else to get the txn stored into our list.
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                    txn.serialize(boa, "request") ;
                    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

                    txns.add(new Txn(type, bb.array()));
                }

                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                        Time.currentWallTime(), request.type));
                request.setTxn(new MultiTxn(txns));

                break;

            //create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
                if (!request.isLocalSession()) {
                    pRequest2Txn(request.type, zks.getNextZxid(), request,
                                 null, true);
                }
                break;

            //All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
            case OpCode.exists:
            case OpCode.getData:
            case OpCode.getACL:
            case OpCode.getChildren:
            case OpCode.getChildren2:
            case OpCode.ping:
            case OpCode.setWatches:
            case OpCode.checkWatches:
            case OpCode.removeWatches:
                zks.sessionTracker.checkSession(request.sessionId,
                        request.getOwner());
                break;
            default:
                LOG.warn("unknown type " + request.type);
                break;
            }
        } catch (KeeperException e) {
            if (request.getHdr() != null) {
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(e.code().intValue()));
            }
            LOG.info("Got user-level KeeperException when processing "
                    + request.toString()
                    + " Error Path:" + e.getPath()
                    + " Error:" + e.getMessage());
            request.setException(e);
        } catch (Exception e) {
            // log at error level as we are returning a marshalling
            // error to the user
            LOG.error("Failed to process " + request, e);

            StringBuilder sb = new StringBuilder();
            ByteBuffer bb = request.request;
            if(bb != null){
                bb.rewind();
                while (bb.hasRemaining()) {
                    sb.append(Integer.toHexString(bb.get() & 0xff));
                }
            } else {
                sb.append("request buffer is null");
            }

            LOG.error("Dumping request buffer: 0x" + sb.toString());
            if (request.getHdr() != null) {
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
            }
        }
        request.zxid = zks.getZxid();
        nextProcessor.processRequest(request);
    }

输入图片说明

createSession、closeSession也属于事务操作,而那些获取数据的操作则不属于事务操作,只需要验证下sessionId是否合法等处理完成之后就交给了下一个处理器继续处理该请求。

我们以创建session和创建节点为例,来具体看下代码,先看下创建session,即如下代码:

输入图片说明

首先会为该request获取一个事务id即zxid,该zxid的值来自于ZooKeeper服务器的一个hzxid变量,默认是0,每来一个请求就会执行自增操作。

输入图片说明

输入图片说明

首先获取客户端传递过来的sessionTimeout时间,然后使用ZooKeeperServer的sessionTracker来创建一个session,同时为该session的owner属性赋值,但是对于创建session的request请求,并没有为owner赋值。而是在创建其他请求的时候才会为请求的owner赋值为本机器

创建session的request如下:

输入图片说明

创建其他的request的如下:

输入图片说明

接下来看看创建一个node的处理:

  1. 首先进行的是session检查

输入图片说明

先检查服务器端该sessionId是否存在,如果不存在则表示已经过期,抛出SessionExpiredException异常。如果session存在,owner为空,则会对owner进行赋值。如果owner存在则进行owner核对,如果不一致抛出SessionMovedException异常。

第一次创建session后,该session的owner是为空的,之后的请求操作owner都是有值的,此时则会为该session赋值

我们想象下这样的场景:客户端连接一台服务器server1,该客户端拥有的session的owner是server1,客户端发送操作请求,由于网络原因造成请求阻塞,客户端认为server1不稳定,则会拿着刚才的session去连接另一台服务器server2,连接成功后,该session对应的owner被设置为null了(根据上文知道创建session的时候,owner会被清空),然后继续在server2上执行同样的操作,此时会为该session的owner属性赋值为server2,如果之前对server1的请求此时终于到达server1了,此request的owner是server1,则在检查的时候,发现该owner不一致,server1则会抛出SessionMovedException异常。即session的owner已经变化了的异常。则会阻止该请求的执行,防止了重复执行相同的操作。这里再留个疑问:为什么当创建session的时候要清空owner呢?

  1. 反序列化出CreateRequest对象,获取要创建的路径,同时获取父路径,检查该父路径是否存在,如果不存在抛出异常NoNodeException。如果存在获取父路径的修改记录,验证对父路径是否有修改权限

  2. 从CreateRequest中获取用户创建的node的类型,如果是临时节点的话,则根据父路径的子节点的版本cversion,来生成该临时节点的路径后缀部分。然后验证该路径是否存在,如果存在则抛出NodeExistsException异常。

  3. 判断父节点是否是临时节点,如果是临时节点则不应该有子节点,抛出NoChildrenForEphemeralsException异常,这部分代码该判断应该是提前应该做的,而不是留到现在才来判断。

  4. 如果该节点是临时节点,则为该节点ephemeralOwner属性设置为对应的sessionId,如果是永久节点则设置为0。而DataTree则是依据ephemeralOwner是否为0,来判断是否是临时节点还是持久节点,如果是临时节点,则会另外存储一份数据,以sessionId为key,即列出了每个sessionId所包含的所有临时节点,一旦该sessionId失效,则直接拿出该列表进行清除操作即可,不用再去遍历所有的节点了。

  5. 产生两条变化记录,分别是父节点的子节点列表变化的记录,和要创建的节点的创建记录

至此便完成预处理操作。该交给下一个RequestProcessor处理器来处理了。

###2.3.2 SyncRequestProcessor处理器### 主要对事务请求进行日志记录,同时事务请求达到一定次数后,就会执行一次快照。主要属性如下:

输入图片说明

ZooKeeperServer zks:ZooKeeper服务器对象

LinkedBlockingQueue<Request> queuedRequests:提交的请求(包括事务请求和非事务请求)

RequestProcessor nextProcessor:下一个请求处理器

Thread snapInProcess:执行一次快照任务的线程

LinkedList<Request> toFlush:那些已经被记录到日志文件中但还未被flush到磁盘上的事务请求

int snapCount:发生了snapCount次的事务日志记录,就会执行一次快照

int randRoll:上述是一个对所有服务器都统一的配置数据,为了避免所有的服务器在同一时刻执行快照任务,实际情况为发生了(snapCount / 2 + randRoll)次的事务日志记录,就会执行一次快照。randRoll的计算方式如下:r.nextInt(snapCount/2)

接下来就详细看下SyncRequestProcessor(也是一个线程)的详细实现:

对于RequestProcessor定义的接口:processRequest(Request request),SyncRequestProcessor和PrepRequestProcessor一样,都是讲请求放入阻塞式队列中,然后在线程run方法中执行相应的逻辑操作

首先还是从LinkedBlockingQueue<Request> queuedRequests队列中取出一个Request,处理如下:

    @Override
    public void run() {
        try {
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } else {
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    // track the number of records written to the log
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        } finally{
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

输入图片说明

  1. 第一步:将该请求添加到事务日志中,这一部分会区分Request是事务请求还是非事务请求,依据就是前一个处理器PrepRequestProcessor为Request加上的事务请求头。如果是事务请求,则添加成功后返回true,添加成功即为将该请求序列化到一个指定的文件中。如果是非事务请求,直接返回false。
  2. 第二步:如果是事务请求,添加到事务日志中后,logCount++,该logCount就是用于记录已经执行多少次事务请求序列化到日志中了。
  3. 第三步:一旦logCount超过(snapCount / 2 + randRoll)次后,就需要执行一次快照了。
  4. 第四步:先将当前的事务日志记录flush到磁盘中,然后设置当前流为null,以便下一次事务日志记录重新开启一个新的文件来记录
  5. 第五步:创建一个ZooKeeperThread线程,用于执行一次快照任务,则会把当前的dataTree和sessionsWithTimeouts信息序列化到一个文件中。
  6. 第六七步:如果是非事务请求的话,则会直接交给下一个RequestProcessor处理器来处理。我们看到这里还加上了一个toFlush.isEmpty()的判断,即之前没有请求遗留,只有在这样的条件下才会直接交给下一个RequestProcessor处理器来处理,主要是为了保证请求的顺序性。如果之前还有遗留的请求,则后来的请求不能被先处理。

上述的请求除了直接被下一个处理器处理的情况,其余大部分都会被保存到LinkedList<Request> toFlush中,什么时候才会被执行呢?

输入图片说明

输入图片说明

两种情况下会被执行flush:

  1. 当request数量超过1000
  2. 当没有请求到来的时候

来看下具体的flush过程:

输入图片说明

  1. 第一步:执行事务日志文件执行commit操作。上述rollLog操作仅仅是先flush,然后设置当前日志记录流为null,以便下一次重新开启一个新的事务日志文件,同时这些流都会被保存到LinkedList<FileOutputStream> streamsToFlush属性中,commit操作则是先flush这些所有的流,然后执行这些流的close操作

  2. 第二步:便是将请求交给下一个处理器来处理

至此SyncRequestProcessor的内容也完成了,接下来就是下一个请求处理器即FinalRequestProcessor。

###2.3.3 FinalRequestProcessor处理器### 作为处理器链上的最后一个处理器,负责执行请求的具体任务,前面几个处理器都是辅助操作,如PrepRequestProcessor为请求添加事务请求头和执行一些检查工作SyncRequestProcessor也仅仅是把该请求记录下来保存到事务日志中。该请求的具体内容,如获取所有的子节点,创建node的这些具体的操作就是由FinalRequestProcessor来完成的。

下面就来详细看看FinalRequestProcessor处理request的过程:

输入图片说明

  1. 对于request是顺序执行,要删除那些zxid小于当前request的zxid的outstandingChanges、以及outstandingChangesForPath。这里就有一个疑问:outstandingChanges数据是由PrepRequestProcessor在预处理事务请求头的时候产生的,他们又被谁来消费呢?他们主要作用是什么?

  2. 接着就是落实具体的事务操作了,如创建节点、删除节点、设置数据等

来具体看下这个过程:

输入图片说明

这些事务操作分成两种情况,一部分就是针对dataTree的增删改节点,另一种就是创建session,关闭session。创建和关闭session都是使用sessionTracker来完成,这一部分之前已经详细描述过了。下面具体看下针对dataTree的增删改节点:

输入图片说明

根据事务请求头的不同类型,分别执行增删改操作。对于增加节点上面也已经详细描述过了。接下来就是开始准备返回值,然后响应给客户端。以创建session为例:

输入图片说明

使用sessionTimeout(客户端传递的sessionTimeout和服务器端协商后的),sessionId,根据sessionId获取的密码 这些数据构建一个ConnectResponse,然后进行序列化,传给客户端,并开始接收客户端的请求

上述是session创建成功的时候。即上图中的valid为true。什么时候为fasle呢?

当你已经创建session了,但是同服务器的连接断开了,然后拿着该session去重新连接下一台服务器,如果密码是错误的,服务器则会这设置valid为false。如果密码是正确的,但是在于服务器端已经建立TCP连接后,此时该重新激活session了,但是发现该session已经过期了,被服务器端清除了,也会导致valid为false。

一旦valid为false,返回给客户端的sessionTimeout为0,sessionId为0,密码为空。客户端在接收到该数据后,看到sessionTimeout为0,则认为建立session关联失败,发出session过期的异常事件,开始走向死亡,即客户端的ZooKeeper对象不可用,必须要重新创建一个新的ZooKeeper对象

##2.4 ServerStats介绍## 它是用于统计服务器的运行数据的。

输入图片说明

packetsSent:服务器端已发送的数据包

packetsReceived:服务器端已接收的数据包

maxLatency:处理一次请求的最大延迟

minLatency:处理一次请求的最小延迟

totalLatency:服务器端处理请求的总延迟

count:服务器端已经处理的请求数

ZooKeeperServer会创建一个ServerCnxnFactory,即创建了ServerSocket,等待客户端连接。每来一个客户端的TCP连接,ServerCnxnFactory就会为该连接创建一个ServerCnxn,每个ServerCnxn也会统计上述信息,即单独针对某个客户端的数据。而ZooKeeperServer则是统计所有客户端的上述数据。

#3 单机版服务器启动# 上面描述了ZooKeeper服务器的几个重要数据,下面就概述下单机版服务器的服务过程:

输入图片说明

  1. 第一步:创建一个ZooKeeperServer,代表着一个服务器对象,同时会创建出ServerStats用于统计服务器运行数据
  2. 第二步:根据配置参数dataLogDir和dataDir创建出用于管理事务日志和快照的对象FileTxnSnapLog,用于从磁盘上恢复数据和将内存数据快照到磁盘上、事务请求记录到磁盘上。
  3. 第三步:对ZooKeeperServer设置一些配置参数,如tickTime、minSessionTimeout、maxSessionTimeout
  4. 第四步:创建ServerCnxnFactory,用于创建ServerSocket,等待客户端的socket连接
  5. 第五步:启动ZooKeeperServer服务

当客户端第一次TCP连接ZooKeeper服务器的时候:

  1. 上述ServerCnxnFactory会为该客户端创建出一个ServerCnxn服务器代理对象,单独用于处理和该客户端的通信,TCP连接建立成功

  2. TCP连接建立成功后,客户端开始发送ConnectRequest请求,申请sessionId,会传递sessionTimeout时间

  3. ServerCnxn接收到该申请后,根据客户端传递过来的sessionTimeout时间以及ZooKeeperServer本身的minSessionTimeout、maxSessionTimeout参数,确定最终的sessionTimeout时间

  4. sessionTimeout确定好了,就开始判断客户端的请求是否已经含有sessionId,如果含有,则执行sessionId的是否过期、密码是否正确等检查。如果没有sessionId,则会使用sessionTracker分配sessionId,创建一个session。该session含有上述确定的sessionTimeout信息,即每个session都有各自的sessionTimeout信息。

  5. 之后就是构建一个Request请求,该请求的类型就是创建session。然后将该请求交给请求处理器链来处理。请求处理器链为PrepRequestProcessor-》SyncRequestProcessor-》FinalRequestProcessor

  6. 首先是PrepRequestProcessor的处理,它对请求分成事务请求和非事务请求。事务请求即能改变服务器状态数据的一些操作,即node的增删改,session的创建关闭等。为这些事务请求加上事务请求头,后面的请求处理器都是基于是否含有事务请求头来判别该请求是否是事务请求。 同时对事务请求进行一些检查,如session是否过期,session的owner是否一致、节点是否存在等。对于创建session来说,就检查了下session是否过期。同时会对所有请求都分配zxid

  7. 接下来是SyncRequestProcessor的处理仅仅对事务请求进行记录到事务日志中,一旦事务请求达到一定数量,就会执行一次对内存DataTree数据和session数据的快照

  8. 最后就是FinalRequestProcessor处理器:真正执行数据操作的地方。如为DataTree添加节点、删除节点、更改数据、查询数据等。同时也负责统计整个服务器端处理过程的最大延迟、最小延迟、总延迟、总处理数,每个ServerCnxn也针对自己的客户端也进行响应的统计。最后FinalRequestProcessor处理上述事务操作的结果给客户端。如创建session,则是返回sessionTimeout、sessionId、密码数据给客户端。如果创建失败,则返回的sessionTimeout为0,sessionId为0,密码为空。

客户端对服务器端响应的结果的处理:

  1. 首先判断服务器端返回的sessionTimeout是否小于等于0

  2. 如果是,则认为申请sessionId失败,向eventThread中添加了两个事件。首先是session过期的事件即KeeperState.Expired,客户端的默认Watcher接收到之后,必须自行采取响应的处理操作(如重新创建一个ZooKeeper对象),因为客户端的ZooKeeper对象即将失效了。第二个事件就是一个死亡事件,eventThread遇到该事件后,就会跳出事件循环,eventThread线程走向结束。

  3. 如果不是,则认为申请sessionId成功。则保存服务器端给出的sessionId、密码、sessionTimeout数据,重置客户端的readTimeout、connectTimeout。然后通过eventThread发送一个成功连接的事件即KeeperState.SyncConnected,客户端在接收到该事件后就可以执行相应的操作了。

展开阅读全文
打赏
0
4 收藏
分享
加载中
更多评论
打赏
0 评论
4 收藏
0
分享
返回顶部
顶部