文档章节

zk工厂方法实现NIOServerCnxnFactory

writeademo
 writeademo
发布于 10/17 14:10
字数 494
阅读 16
收藏 0

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

NIOServerCnxnFactory类

内部类

AbstractSelectThread

AcceptThread

SelectorThread

属性

ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT

10s session过期时间

ZOOKEEPER_NIO_NUM_SELECTOR_THREADS

selector 线程数

ZOOKEEPER_NIO_NUM_WORKER_THREADS

worker 线程数

directBuffer

buffer用来线程间数据交互

ipMap

限制ip上连接数

cnxnExpiryQueue

连接失效时间分桶队列

workerPool

WorkerService worker执行服务

acceptThread

接收新连接,simple round-robin 分配到选择线程

selectorThreads

 
   

 

方法

停止接收

private void pauseAccept(long millisecs) {
    acceptKey.interestOps(0);
    try {
        selector.select(millisecs);
    } catch (IOException e) {
        // ignore
    } finally {
        acceptKey.interestOps(SelectionKey.OP_ACCEPT);
    }
}

private boolean doAccept() {
    boolean accepted = false;
    SocketChannel sc = null;
    try {
        sc = acceptSocket.accept();
        accepted = true;
        InetAddress ia = sc.socket().getInetAddress();
        int cnxncount = getClientCnxnCount(ia);

        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
            throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
        }

        LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());

        sc.configureBlocking(false);

        // Round-robin assign this connection to a selector thread
        if (!selectorIterator.hasNext()) {
            selectorIterator = selectorThreads.iterator();
        }
        SelectorThread selectorThread = selectorIterator.next();
        if (!selectorThread.addAcceptedConnection(sc)) {
            throw new IOException("Unable to add connection to selector queue"
                                  + (stopped ? " (shutdown in progress)" : ""));
        }
        acceptErrorLogger.flush();
    } catch (IOException e) {
        // accept, maxClientCnxns, configureBlocking
        ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
        acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
        fastCloseSock(sc);
    }
    return accepted;
}




private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        try {
            key = accepted.register(selector, SelectionKey.OP_READ);
            NIOServerCnxn cnxn = createConnection(accepted, key, this);
            key.attach(cnxn);
            addCnxn(cnxn);
        } catch (IOException e) {
            // register, createConnection
            cleanupSelectionKey(key);
            fastCloseSock(accepted);
        }
    }
}



configure


获取客户端连接数
private int getClientCnxnCount(InetAddress cl) {
    Set<NIOServerCnxn> s = ipMap.get(cl);
    if (s == null) {
        return 0;
    }
    return s.size();
}


创建连接
protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException {
    return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);
}


创建连接
private void addCnxn(NIOServerCnxn cnxn) throws IOException {
    InetAddress addr = cnxn.getSocketAddress();
    if (addr == null) {
        throw new IOException("Socket of " + cnxn + " has been closed");
    }
    Set<NIOServerCnxn> set = ipMap.get(addr);
    if (set == null) {
        // in general we will see 1 connection from each
        // host, setting the initial cap to 2 allows us
        // to minimize mem usage in the common case
        // of 1 entry --  we need to set the initial cap
        // to 2 to avoid rehash when the first entry is added
        // Construct a ConcurrentHashSet using a ConcurrentHashMap
        set = Collections.newSetFromMap(new ConcurrentHashMap<NIOServerCnxn, Boolean>(2));
        // Put the new set in the map, but only if another thread
        // hasn't beaten us to it
        Set<NIOServerCnxn> existingSet = ipMap.putIfAbsent(addr, set);
        if (existingSet != null) {
            set = existingSet;
        }
    }
    set.add(cnxn);

    cnxns.add(cnxn);
    touchCnxn(cnxn);
}

思考:
为什么单机和集群模式启动不一样
单机可以直接从日志,快照恢复数据
集群根据角色划分,涉及到数据同步

 

© 著作权归作者所有

writeademo
粉丝 25
博文 704
码字总数 268781
作品 0
东城
私信 提问
ZooKeeper客户端Curator使用一 创建连接

如何创建一个ZK连接 工厂方法newClient() 首先,对于ZooKeeper的连接就是创建一个CuratorFramework实例的过程.一般会把CuratorFramework实例的创建交给工厂类CuratorFrameworkFactory,使用工厂...

孟飞阳
2018/07/06
99
0
Zookeeper源码分析(二)建立连接——服务端篇

这篇我们来分析一下zookeeper建立连接时服务端的源码,这里我们略过服务端启动的流程(因为包含了太多内容,一篇讲不完),直接看服务端处理请求的代码。 服务端处理请求的代码有两种NIOServ...

司徒无涯
09/12
29
0
使用 ZK 框架的富 Internet 应用程序

简介: ZK 是一个用 Java™ 代码编写的开源 Asynchronous JavaScript + XML (Ajax) 框架,使用该框架,您无需编写 JavaScript 代码就可以编写一个支持 Web 2.0 的富 Internet 应用程序。Doj...

红薯
2010/04/09
4.4K
8
Zookeeper集群搭建和简单使用

1、zookeeper集群搭建 一、解压 zookeeper-3.4.6.tar.gz 命令 [root@node22 java]# tar -zxvf zookeeper-3.4.6.tar.gz 二、在/usr/local/java/zookeeper-3.4.6/conf文件下新建一个zoo.cfg文件......

xiaozhou18
2016/11/13
153
0
使用ZooKeeper提供的Java API操作ZooKeeper

建立客户端与zk服务端的连接 我们先来创建一个普通的maven工程,然后在pom.xml文件中配置zookeeper依赖: 在resources目录下创建一个zk-connect.properties属性配置文件,我们在该文件中填写...

ZeroOne01
2018/04/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

是否有内置功能可以打印对象的所有当前属性和值?

所以我在这里寻找的是类似PHP的print_r函数。 这样一来,我可以通过查看问题对象的状态来调试脚本。 #1楼 可能值得一看- 是否有与Perl的Data :: Dumper等效的Python? 我的建议是 https://gi...

技术盛宴
18分钟前
5
0
直击面试,聊聊 GC 机制

前言 文章来源:https://studyidea.cn/ GC 中文直译垃圾回收,是一种回收内存空间避免内存泄漏的机制。当 JVM 内存紧张,通过执行 GC 有效回收内存,转而分配给新对象从而实现内存的再利用。 ...

程序通事
21分钟前
4
0
Mybatis where 1=1 和 标签

在mybatis中拼接查询语句,偶尔会出现where后面可能一个字段的值都没有,就导致所有条件无效,导致where没有存在的意义;但也有可能这些条件会存在。那解决这个问题的方法,最常见的就是: ...

观海562
23分钟前
4
0
git常用初始化设置

日志编辑工具 git config --global core.editor vim ssh访问 cd /home/weiguangyue/.sshssh-keygen -t rsa -C weiyue888999@126.com 提交者信息用户名 git config --global user.nam......

萧默
24分钟前
4
0
面试题-关于Java线程池一篇文章就够了

在Java面试中,线程池相关知识,虽不能说是必问提,但出现的频次也是非常高的。同时又鉴于公众号“程序新视界”的读者后台留言让写一篇关于Java线程池的文章,于是就有本篇内容,本篇将基于J...

程序新视界
28分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部