文档章节

基于NIO的消息路由的实现(五) 服务端连接管理

皮鞋铮亮
 皮鞋铮亮
发布于 2015/08/26 15:17
字数 1030
阅读 43
收藏 1

行业解决方案、产品招募中!想赚钱就来传!>>>

一、对客户端连接(也就是SocketChannel)的管理

  1. 客户端与服务端建立连接后,服务端来维护其连接,首先定义了一个GVConnection类,用来定义连接;

此类包括四个成员:token、userId、channel、lastAccessTime;如下:

public class GVConnection {

    private String token;
    private String userId;
    private SocketChannel channel;
    private int lastAccessTime;

    public GVConnection(String token,String userId, SocketChannel channel, int lastAccessTime) {

        this.token = token;
        this.userId = userId;
        this.channel = channel;
        this.lastAccessTime = lastAccessTime;

    }

    public void setLastAccessTime(int lastAccessTime){
        this.lastAccessTime = lastAccessTime;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public String getToken() {
        return token;
    }

    public String getUserId(){
        return userId;
    }

    public int getLastAccessTime() {
        return lastAccessTime;
    }
}

2.对GVConnection的操作单独封装一个类,GVConnTools,在这个类中,我定义了两个map,用来保存userId与GVConnection的对应关系,以及token与GVConnection的对应关系。这两个Map需要保持同步,在删除的时候,可以通过userId或者token进行删除。这两个方法有可能同步执行会遭遇异常,所以我增加了一个互斥锁(我不确定这样做的合理性,敬请指导)。其他新增和更新的方法我均采用了同步方法。

public class GVConnTools {

    private static Map<String, GVConnection> CONNECTION_BY_USERID = new ConcurrentHashMap<String, GVConnection>();
    private static Map<String, GVConnection> CONNECTION_BY_TOKEN = new ConcurrentHashMap<String, GVConnection>();

    private static Lock lock = new ReentrantLock();

    public static GVConnection getConnByToken(String token) {

        if (CONNECTION_BY_TOKEN.get(token) != null) {
            return CONNECTION_BY_TOKEN.get(token);
        } else {
            return null;
        }

    }

    public static GVConnection getConnByUserId(String userId) {

        return CONNECTION_BY_USERID.get(userId);

    }

    public static SocketChannel getChannelByUserId(String userId) {
        if (CONNECTION_BY_USERID.get(userId) != null) {
            return CONNECTION_BY_USERID.get(userId).getChannel();
        } else {
            return null;
        }

    }

    public static SocketChannel getChannelByToken(String token) {

        return CONNECTION_BY_TOKEN.get(token).getChannel();

    }

    public static synchronized void addConn2Cache(GVConnection conn) {

        CONNECTION_BY_TOKEN.put(conn.getToken(), conn);
        CONNECTION_BY_USERID.put(conn.getUserId(), conn);

    }

    public static void removeConnByUserId(String userId) {

        lock.lock();
        try {

            String token = CONNECTION_BY_USERID.get(userId).getToken();
            CONNECTION_BY_USERID.remove(userId);
            CONNECTION_BY_TOKEN.remove(token);

        } finally {
            lock.unlock();
        }
    }

    public static void removeConnByToken(String token) {

        lock.lock();
        try {

            String userId = CONNECTION_BY_TOKEN.get(token).getToken();
            CONNECTION_BY_TOKEN.remove(token);
            CONNECTION_BY_USERID.remove(userId);

        } finally {
            lock.unlock();
        }

    }

    public static String getUserIdByToken(String token) {

        return CONNECTION_BY_TOKEN.get(token).getUserId();

    }

    public static String getTokenByUserId(String userId) {

        return CONNECTION_BY_USERID.get(userId).getToken();

    }

    public static synchronized void updLastAccessTime(String token, int lastAccessTime) {

        String userId = CONNECTION_BY_TOKEN.get(token).getUserId();
        CONNECTION_BY_USERID.get(userId).setLastAccessTime(lastAccessTime);
        CONNECTION_BY_TOKEN.get(token).setLastAccessTime(lastAccessTime);

    }

    public static Map getConnCache() {
        return CONNECTION_BY_TOKEN;
    }

}

3.通道清理线程,目前的连接清理采用的是超时清理:此处我遇到一个问题,就是我根本无法判断客户端的异常中断,所以我只能依靠客户端发给服务端的心跳,不断的更新GVConnection中的lastAccessTime,在我的超时线程中,对GVConnection的集合进行遍历,一旦发现有channel关闭的以及超时的,就将其进行清理。而客户端主动关闭,我是可以获取的,在GVServer

public class ConnTimeoutCleanThread implements Runnable {
    /**
     * 超时时间
     */
    private int outTime;
    /**
     * 执行周期
     */
    private int cycle;

    private static Logger logger = LogManager.getLogger(ConnTimeoutCleanThread.class.getName());

    public ConnTimeoutCleanThread(int outTime, int cycle) {
        this.outTime = outTime;
        this.cycle = cycle;
    }

    public void run() {

        logger.info("定期通道清理线程启动……");
        while (true) {
            logger.info("定期通道清理开启……");

            Map<String, GVConnection> connCache = GVConnTools.getConnCache();

            for (String token : connCache.keySet()) {
                int currentTime = CommonTools.systemTimeUtc();
                GVConnection gvConn = GVConnTools.getConnByToken(token);
                SocketChannel socketChannel = gvConn.getChannel();
                if (!socketChannel.isOpen()) {
                    try {
                        socketChannel.close();
                        GVConnTools.removeConnByToken(token);
                        logger.info("清理了关闭的连接:token:<" + token + ">");
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }else if (currentTime - gvConn.getLastAccessTime() > outTime) {
                    if (socketChannel.isOpen()) {
                        try {
                            socketChannel.close();
                            GVConnTools.removeConnByToken(token);
                            logger.info("清理了超时的连接:token:<" + token + ">");
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
            logger.info("定期通道清理执行完毕!");
            try {
                Thread.sleep(cycle * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


三、再谈发博文目的:

我之所以发博文是因为:

  1. 我的代码其实大部分是我四处拼拼凑凑,有些是我自己想的,毕竟9年没有写过代码了,我的java思维仍然停留在jdk1.4刚刚出现的时候,所以我希望大家能够一起学习进步,代码中有很多幼稚的、不科学的、不确定的、甚至我自己也不明白的地方,但是是可执行,并且经过我测试的。希望有时间、有精力的朋友能够帮助我,或者想学习的可以跟我一起学习。

  2. 我的代码仍然在不断完善中,因为我本身不是搞开发的,完全是一种兴趣,所以更新的频率不会太快。可能会出现4、5天都不更新的情况。

  3. 自从看了少帮主的znet之后,觉得自己做的简直不堪入目,所以我会对我的代码进行改写。不过改写之前会讲之前已经做的都整理出来,至少可以看看成长的经历。


皮鞋铮亮
粉丝 36
博文 12
码字总数 11603
作品 0
沈阳
私信 提问
加载中
请先登录后再评论。
Netty那点事(三)Channel与Pipeline

Channel是理解和使用Netty的核心。Channel的涉及内容较多,这里我使用由浅入深的介绍方法。在这篇文章中,我们主要介绍Channel部分中Pipeline实现机制。为了避免枯燥,借用一下《盗梦空间》的...

黄亿华
2013/11/24
2W
22
用vertx实现高吞吐量的站点计数器

工具:vertx,redis,mongodb,log4j 源代码地址:https://github.com/jianglibo/visitrank 先看架构图: 如果你不熟悉vertx,请先google一下。我这里将vertx当作一个容器,上面所有的圆圈要...

jianglibo
2014/04/03
4.1K
3
SQLServer实现split分割字符串到列

网上已有人实现sqlserver的split函数可将字符串分割成行,但是我们习惯了split返回数组或者列表,因此这里对其做一些改动,最终实现也许不尽如意,但是也能解决一些问题。 先贴上某大牛写的s...

cwalet
2014/05/21
9.6K
0
beego API开发以及自动化文档

beego API开发以及自动化文档 beego1.3版本已经在上个星期发布了,但是还是有很多人不了解如何来进行开发,也是在一步一步的测试中开发,期间QQ群里面很多人都问我如何开发,我的业余时间实在...

astaxie
2014/06/25
2.7W
22
树莓派(Raspberry Pi):完美的家用服务器

自从树莓派发布后,所有在互联网上的网站为此激动人心的设备提供了很多有趣和具有挑战性的使用方法。虽然这些想法都很棒,但树莓派( RPi )最明显却又是最不吸引人的用处是:创建你的完美家用...

异次元
2013/11/09
6.6K
8

没有更多内容

加载失败,请刷新页面

加载更多

Azure Application Gateway(一)对后端 Web App 进行负载均衡

一,引言   今天,我们学习一个新的知识点-----Azure Application Gateway,通过Azure 应用程序网关为我么后端的服务提供负载均衡的功能。我们再文章头中大概先了解一下什么是应用程序网关...

osc_lc4icfkt
30分钟前
4
0
WoLai(我来) 注册码 ——国内版 notion 【笔记】

注册码: SQGYG23 ❤ W4T9PKP JLTHNJP KMTXK7P JDHKJEM KRJXX5P 6M7PPAP DEGLMG3 N3BZMRI 87BR22I GSIWGWP GNGBNTI QA8URIM UDUV9VM IHKJA7P MD9ZA3M 3XR67ZI TBUP9JX TI4TYMM 注册完了可以把......

osc_c05lkk3u
31分钟前
15
0
2020hdu多校第二场比赛及补题

这一场我们队只A了一题 1010 Lead of Wisdom 直接爆搜,但是T了好几发,剪了下枝 如果一个物品的a,b,c,d都比不上另外一个同种物品的a,b,c,d,那这个物品就可以直接淘汰掉了 #include<iostrea...

osc_usgpahnw
33分钟前
21
0
为什么Java有瞬态字段? - Why does Java have transient fields?

问题: 为什么Java有瞬态字段? 解决方案: 参考一: https://stackoom.com/question/3opS/为什么Java有瞬态字段 参考二: https://oldbug.net/q/3opS/Why-does-Java-have-transient-fields...

富含淀粉
33分钟前
16
0
轻松搭建CAS 5.x系列(6)-在CAS Server上增加OAuth2.0协议

概述说明 CAS Server默认搭建出来,客户端程序只能按照CAS自身的协议接入。CAS的强大在于,有官方的插件,可以支持其他的协议。本章节就让CAS Server怎么增加OAuth2.0的登录协议。 安装步骤 ...

osc_inj0cicw
34分钟前
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部