文档章节

MyCat - 源代码篇(5)

张哈希_
 张哈希_
发布于 2016/04/24 11:38
字数 1415
阅读 11
收藏 0

数据库路由中间件MyCat - 源代码篇(5)

3. 连接模块

如之前所述,MyCat的连接分为前端和后端,下面是连接基本相关类图:
这里写图片描述

3.1 ClosableConnection:

public interface ClosableConnection {
    String getCharset();
    //关闭连接
    void close(String reason);
    boolean isClosed();
    public void idleCheck();
    long getStartupTime();
    String getHost();
    int getPort();
    int getLocalPort();
    long getNetInBytes();
    long getNetOutBytes();
}

根据字面意思,一个可以关闭的连接需要实现关闭方法-_-,并且需要原因判断是否是正常关闭。MySQL的通信都需要指定字符集。MyCat服务器建立ServerSocket时输入的端口为服务器在其上面监听客户的连接,当有客户连接时,在随机选择一个没用的端口与客户端通信;建立客户socket时输入的为服务端的监听端口,在本地选择一个未用端口与服务器通信,至于服务器怎么知道和客户端的哪个端口通信,和客户端怎么知道和服务端的哪个端口通信(因为这两个端口都是随机生成的),tcp是采用”三次握手”建立连接,而udp则是每次发送信息时将端口号放在ip报文的数据段里面。所以,连接里面需要提供获得监听端口和服务端口的方法。此外,还需要检查连接是否为空闲状态(idle)。最后,需要一些统计数据。

3.2 NIOConnection:

public interface NIOConnection extends ClosableConnection {

    //connected
    void register() throws IOException;

    //处理数据
    void handle(byte[] data);

    // 写出一块缓冲数据
    void write(ByteBuffer buffer);

}

所有NIO的通信需要在多路复用选择器上注册channel,这里有个对应的register()方法需要实现。然后,读取和写入数据都需要通过缓冲。缓冲区(Buffer)就是在内存中预留指定大小的存储空间用来对输入/输出(I/O)的数据作临时存储,这部分预留的内存空间就叫做缓冲区,使用缓冲区有这么两个好处:
1. 减少实际的物理读写次数
2. 缓冲区在创建时就被分配内存,这块内存区域一直被重用,可以减少动态分配和回收内存的次数
读取到的数据需要经过处理,这里对应的就是handle(byte[])方法。

3.3 AbstractConnection:

这里写图片描述
从上面的实体图,我们发现,AbstractConnection其实就是把Java的NetworkChannel进行封装,同时需要依赖其他几个类来完成他所需要的操作,如下:
这里写图片描述
其中,NIOProcessor是对AbstractConnection实现NIO读写的方法类,NIOHandler是处理AbstractConnection读取的数据的处理方法类,NIOSocketWR是执行以上方法的线程类。

3.3.1 NIOProcessor:

这里写图片描述
NIOProcessor的构建方法:

public NIOProcessor(String name, BufferPool bufferPool,
            NameableExecutor executor) throws IOException {
        this.name = name;
        this.bufferPool = bufferPool;
        this.executor = executor;
        this.frontends = new ConcurrentHashMap<Long, FrontendConnection>();
        this.backends = new ConcurrentHashMap<Long, BackendConnection>();
        this.commands = new CommandCount();
    }

调用位置:
这里写图片描述
MyCatServer.java

...
bufferPool = new BufferPool(processBuferPool, processBufferChunk,
                socketBufferLocalPercent / processorCount);
        businessExecutor = ExecutorUtil.create("BusinessExecutor",
                threadPoolSize);
...
for (int i = 0; i < processors.length; i++) {
            processors[i] = new NIOProcessor("Processor" + i, bufferPool,
                    businessExecutor);
        }
...

每个MyCat实例会初始化processors个NIOProcessor,每个NIOProcessor公用同一个bufferPool和businessExecutor。
bufferPool是缓冲池,BufferPool这个类负责缓冲统一管理
businessExecutor如之前所述,是业务线程池。
NIOProcessor被池化,很简单,就是保存到数组中,通过MyCatServer的nextProcessor()方法轮询获取一个NIOProcessor,之后每个AbstractConnection通过setNIOProcessor方法,设置NIOProcessor。

public NIOProcessor nextProcessor() {
        int i = ++nextProcessor;
        if (i >= processors.length) {
            i = nextProcessor = 0;
        }
        return processors[i];
    }

可以看出,每个AbstractConnection依赖于一个NIOProcessor,每个NIOProcessor保存着多个AbstractConnection。AbstractConnection分为FrontendConnection和BackendConnection被分别保存在NIOProcessor的frontends和backends这两个ConcurrentHashMap中。
用ConcurrentHashMap是因为NIOAcceptor和NIOConnector线程以及RW线程池都会访问这两个变量。
NIOProcessor其实主要负责连接资源的管理:
* MyCat会定时检查前端和后端空闲连接,并清理和回收资源:*
MyCatServer.java:

// 处理器定时检查任务
    private TimerTask processorCheck() {
        return new TimerTask() {
            @Override
            public void run() {
                timerExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            for (NIOProcessor p : processors) {
                                p.checkBackendCons();
                            }
                        } catch (Exception e) {
                            LOGGER.warn("checkBackendCons caught err:" + e);
                        }

                    }
                });
                timerExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            for (NIOProcessor p : processors) {
                                p.checkFrontCons();
                            }
                        } catch (Exception e) {
                            LOGGER.warn("checkFrontCons caught err:" + e);
                        }
                    }
                });
            }
        };
    }

检查前端连接,回收空闲资源:

    /** * 定时执行该方法,回收部分资源。 */
    public void checkFrontCons() {
        frontendCheck();
    }
    private void frontendCheck() {
        Iterator<Entry<Long, FrontendConnection>> it = frontends.entrySet()
                .iterator();
        while (it.hasNext()) {
            FrontendConnection c = it.next().getValue();

            // 删除空连接
            if (c == null) {
                it.remove();
                this.frontendsLength.decrementAndGet();
                continue;
            }

            // 清理已关闭连接,否则空闲检查。
            if (c.isClosed()) {
                c.cleanup();
                it.remove();
                this.frontendsLength.decrementAndGet();
            } else {
                // very important ,for some data maybe not sent
                checkConSendQueue(c);
                c.idleCheck();
            }
        }
    }

在关闭前端连接时,会清理连接占用的缓存资源:
FrontendConnection.java:

protected void cleanup() {
        //回收读缓冲
        if (readBuffer != null) {
            recycle(readBuffer);
            this.readBuffer = null;
            this.readBufferOffset = 0;
        }
        //回收写缓冲
        if (writeBuffer != null) {
            recycle(writeBuffer);
            this.writeBuffer = null;
        }
        //回收压缩协议栈编码解码队列
        if(!decompressUnfinishedDataQueue.isEmpty())
        {
            decompressUnfinishedDataQueue.clear();
        }
        if(!compressUnfinishedDataQueue.isEmpty())
        {
            compressUnfinishedDataQueue.clear();
        }
        //回收写队列
        ByteBuffer buffer = null;
        while ((buffer = writeQueue.poll()) != null) {
            recycle(buffer);
        }
    }

后端连接检查,除了要清理已关闭的连接,还有要检查SQL执行时间是否超时:

    /** * 定时执行该方法,回收部分资源。 */
    public void checkBackendCons() {
        backendCheck();
    }
    // 后端连接检查
    private void backendCheck() {
        long sqlTimeout = MycatServer.getInstance().getConfig().getSystem().getSqlExecuteTimeout() * 1000L;
        Iterator<Entry<Long, BackendConnection>> it = backends.entrySet().iterator();
        while (it.hasNext()) {
            BackendConnection c = it.next().getValue();

            // 删除空连接
            if (c == null) {
                it.remove();
                continue;
            }
            // SQL执行超时的连接关闭
            if (c.isBorrowed()
                    && c.getLastTime() < TimeUtil.currentTimeMillis()
                            - sqlTimeout) {
                LOGGER.warn("found backend connection SQL timeout ,close it "
                        + c);
                c.close("sql timeout");
            }

            // 清理已关闭连接,否则空闲检查。
            if (c.isClosed()) {
                it.remove();

            } else {
                // very important ,for some data maybe not sent
                if (c instanceof AbstractConnection) {
                    checkConSendQueue((AbstractConnection) c);
                }
                c.idleCheck();
            }
        }
    }

同时,在检查连接是否关闭时,需要检查写队列是否为空。写队列不为空,证明还有请求没有响应。需要将写队列的剩余请求异步写出,通过NIOSocketWR。

    private void checkConSendQueue(AbstractConnection c) {
        // very important ,for some data maybe not sent
        if (!c.writeQueue.isEmpty()) {
            c.getSocketWR().doNextWriteCheck();
        }
    }

© 著作权归作者所有

张哈希_
粉丝 7
博文 42
码字总数 67907
作品 0
房山
程序员
私信 提问
MyCat 看了这篇什么都会了

1、什么是MyCat 1、一个彻底开源的,面向企业应用开发的大数据库集群 2、支持事务、ACID、可以替代MySQL的加强版数据库 3、一个可以视为MySQL集群的企业级数据库,用来替代昂贵的Oracle集群 ...

木迪_2a4e
2018/09/17
0
0
Mycat多租户/分布数据库/集群数据库以及SAAS研究进度报告

1、mycat操作 (1)垂直切分配置 (2)水平切分配置 (3)mycat修改schema.xml之后热重启 (4)mycat注解的实现 (5)mycat高可用(搭配HAproxy等一起做集群,多个mycat节点) 2、mycat可以配...

蓝奔z
2018/08/24
0
0
使用mycat实现mysql读写分离

使用mycat实现mysql读写分离 mycat在数据层的作用和nginx在应用层很像,都是代理后面实际的服务。mycat实现了mysql协议,对外看起来就是一个mysql数据库,可以和使用普通mysql一样进行操作 ...

肥肥小浣熊
2018/05/06
0
0
Mycat读写分离笔记Windows

Mycat读写分离笔记Windows 自己搭了一个运用Mycat中间件搭建了一个读写分离的demo,昨晚还在奋战当中,因为连接mycat的时候老是报错:No Mycat DataBases selected.尝试了很多的方法,然后还...

推荐码发放
2018/04/29
0
0
高性能Mysql中间组件Mycat安装配置

高性能Mysql中间组件Mycat安装配置 由于公司项目需要使用到读写分离,但是官方版本的Mysql-proxy读写分离组件太不稳定且有很多BUG,最终决定使用Mycat作为中间组件,其实读写分离只是Mycat中...

幸运券发放
2018/05/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring系列教程八: Spring实现事务的两种方式

一、 Spring事务概念: 事务是一系列的动作,它们综合在一起才是一个完整的工作单元,这些动作必须全部完成,如果有一个失败的话,那么事务就会回滚到最开始的状态,仿佛什么都没发生过一样。...

我叫小糖主
58分钟前
4
0
CentOS 的基本使用

1. 使用 sudo 命令, 可以以 root 身份执行命令, 必须要在 /etc/sudoers 中定义普通用户 2. 设置 阿里云 yum 镜像, 参考 https://opsx.alibaba.com/mirror # 备份mv /etc/yum.repos.d/CentO...

北漂的我
昨天
2
0
Proxmox VE技巧 移除PVE “没有有效订阅” 的弹窗提示

登陆的时候提示没有有效的订阅You do not have a valid subscription for this server. Please visit www.proxmox.com to get a list of available options. 用的是免费版的,所以每次都提示......

以谁为师
昨天
3
0
Java设计模式之外观模式(门面模式)

什么是外观模式   外观模式(Facade),他隐藏了系统的复杂性,并向客户端提供了一个可以访问系统的接口。这种类型的设计模式属于结构性模式。为子系统中的一组接口提供了一个统一的访问接口...

须臾之余
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部