文档章节

基于NIO的消息路由的实现(四) 服务端通讯主线程(1)

皮鞋铮亮
 皮鞋铮亮
发布于 2015/08/18 18:55
字数 2252
阅读 1010
收藏 49
点赞 2
评论 5

一、简单介绍:

服务端通讯主线程是消息路由服务的启动类,其主要作用如下:

1、初始化相关配置;

2、根据配置的ip和port创建tcp服务;

3、接收客户端连接,并给客户端分配令牌;

4、接收客户端的登录请求,并将客户端相关信息(令牌、客户端登录标识、最后访问时间、当前token所使用的通道,保存到缓冲区)

5、接收客户端的报文请求,并添加到通讯队列,等待处理;

6、接收来自各处的指令发送请求,并发送至相关通道;

二、详细介绍:

1、启动方法:首先加载配置信息;然后启动主线程、通讯报文消费线程(处理通讯类报文)、超时、失效通道回收线程(进行超时和失效通道的回收工作)、短消息消费者线程(专门针对短消息队列进行处理的线程)。尤其是OP_WRITE,在OP_WRITE之后,必须将selector注册为OP_READ,否则会一直循环下去,死循环。

public static void main(String arg[]) throws Exception {
    //初始化配置数据
    Config cfg = new Config(arg[0]);
    final GVServer gvServer = new GVServer();
    //启动ServerSocket通道
    if (gvServer.initServer(cfg)) {

        ExecutorService threadPool = Executors.newCachedThreadPool();
        //启动通讯服务主线程
        threadPool.execute(gvServer);
        //启动通讯报文消费线程
        threadPool.execute(new CQueueConsumer(cfg.getWaitTime()));
        //启动超时通道、失效通道回收线程
        threadPool.execute(new ConnTimeoutCleanThread(cfg.getCleanThreadOutTime(), cfg.getCleanThreadCycle()));
        threadPool.execute(new MQueueConsumer());
    }
}

2、初始化配置:打开tcp服务等待连接(略);

3、通讯事件处理:通讯主线程的run方法,主要对接收到的事件分别处理。这个地方尤其要注意的是,我第一篇文章提到的,所有触发的时间都必须被消费,否则会一直循环下去。

    public void run() {

        while (true) {
            try {

                //监听事件key
                selector.select(2000);
                //迭代一组事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    //定义一个socket通道
                    SocketChannel socketChannel = null;

                    int count = 0;

                    SelectionKey key = keys.next();
                    //  Logs.info("有网络事件被触发,事件类型为:" + key.interestOps());
                    //删除Iterator中的当前key,避免重复处理
                    keys.remove();
                    if (!key.isValid()) {
                        continue;
                    } else if (key.isAcceptable()) {
                        //从客户端送来的key中获取ServerSocket通道
                        serverSocketChannel = (ServerSocketChannel) key.channel();
                        //接收此ServerSocket通道中的Socket通道,accept是一个阻塞方法,一直到获取到连接才会继续
                        socketChannel = serverSocketChannel.accept();
                        //将此socket通道设置为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //将此通道注册到selector,并等待接收客户端的读入数据
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        allocToken(socketChannel);

                    } else if (key.isReadable()) {

                        //获取事件key中的channel
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
                        //清理缓冲区,便于使用
                        byteBuffer.clear();
                        //将channel中的字节流读入缓冲区
                        count = socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        //处理粘包
                        if (count > 0) {
                            try {
                                handlePacket(socketChannel, byteBuffer);
                            } catch (Exception e) {
                                e.printStackTrace();
//                                continue;//如果当前包存在非法抛出异常,那么不再进行处理直接跳出循环,处理下一个包;此处存疑,测试阶段暂时注释
                            }
                        } else if (count == 0) {
                            continue;
                        } else {
                            socketChannel.close();
                            logger.info("客户端"+socketChannel.toString()+"连接关闭!");

                        }

                    } else if (key.isWritable()) {
                        ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
                    }
                }

            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

4、将合法报文放入通讯队列:我在配置初始化的时候,明确规定了几种报文类,并在协议中限定了报文的格式,凡是不符合我报文格式的报文均视为非法报文,直接会给客户端回复一个错误指令。

/**
 * 处理不合法报文以及将合法报文放入队列等待处理
 *
 * @param socketChannel
 * @param strPacket
 */
private void offerPacket(SocketChannel socketChannel, String strPacket) {
    IPacket packet = AnalyseTools.analysePacket(strPacket);
    if (packet.getHeader().equals(LoginPacket.HEADER)) {
        handleLoginPacket(socketChannel, packet);
    }
    //如果类为空或者从handle单例map中无法取到类,则证明报文非法
    if (packet == null || Config.getPacketInstance(packet.getHeader()) == null) {
        //不在服务端识别范围内的报文,回复E响应,告知客户端不合法
        ErrorOrder errorOrder = (ErrorOrder) Config.getOrderInstance(ErrorOrder.HEADER);
        errorOrder.initErrorOrder(errorOrder.INVAILD_REQ_CODE, errorOrder.INVAILD_REQ_MSG);

        logger.info("客户端发送非法报文:" + strPacket);
        GVServer.write2Client(errorOrder, socketChannel);
        //将合法报文放入消息队列中等待处理
    } else {
        if (!GVQueue.CQUEUE.offer(packet)) {
            logger.error("消息队列已满,请增加队列容量!");

        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("将添加至消息队列,队列内消息数量为:" + GVQueue.CQUEUE.size());
            }
        }
    }
}

5、分配客户端的令牌:客户端连接之后,服务端马上会给客户端回复一个T指令,告诉客户端它的令牌,从此以后,客户端每次报文,都必须携带此令牌;在通讯这一层里,服务端根据token确定客户端;

    private void allocToken(SocketChannel socketChannel) {

        //给连接上来的通道分配token
        TokenOrder tokenOrder = (TokenOrder) Config.getOrderInstance(TokenOrder.HEADER);
        String token = UUID.randomUUID().toString();
        tokenOrder.initTokenOrder(token);
        //返回给客户端token
        write2Client(tokenOrder, socketChannel);
        logger.info("客户端:<" + token + ">已经连接!");
        //将连接后的channel保存到全局map中
        //2015.8.13修改,先把userId存为null,等待用户登录后,在将,
//        GVConnection gvConnection = new GVConnection(token,null, socketChannel, CommonTools.systemTimeUtc());
//        GVConnTools.addConn2Cache(gvConnection);

    }

6、客户端登录处理:仅凭借客户端token,我没法将服务用于业务中,业务中往往会存在一个用户的用户标记,我需要能够根据用户的标记,往通道里面写入消息;所以,有了客户端登录过程,客户端将自己唯一的业务标记提交到服务端。服务端建立一个token、用户标记、用户最后访问时间、通道的缓冲区(统一成了一个类GVConnection),专门用语指令的发送,并且保持几项内容的同步,GVConnTools为操作这些内容的唯一入口;

/**
 * 专门处理客户端登录报文,保存GVConn到缓冲区
 * 【注】对于userId重复的情况,在这里不做处理了,由业务系统自己处理,
 * 这里对userId重复相当于后登录的用户替换了先登录用户的通道。
 *
 * @param socketChannel
 * @param packet
 */
private void handleLoginPacket(SocketChannel socketChannel, IPacket packet) {

    GVConnection gvConn = new GVConnection(packet.getClientToken(),
            packet.getPacketBody(), socketChannel, CommonTools.systemTimeUtc());
    GVConnTools.addConn2Cache(gvConn);

}

7、消息写入通道:其实我完全可以在奔雷的外部提供一个专门的写入方法,但是当时好像脑子进水了,这个等以后迭代的时候在考虑如何处理吧。暂时放到这里。需要注意的是,这个方法是唯一对协议的前四位包头进行封装的方法,在所有其他的类中,都不需要对报文的前四位予以考虑。在客户端读取的时候,也会将前四位截取掉之后,或者将字符串放入队列,或者将一个报文(指令)对象放入队列。(为什么需要这四位,我将在下一个小部分——粘包、断包中讲解)

/**
 * 向客户端写入信息的方法
 *
 * @param iOrder        报文处理类接口
 * @param socketChannel
 */
public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) {
    try {
        socketChannel.register(selector, SelectionKey.OP_WRITE);
        //创建一个byteBuffer用来存储要写入的buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock());
        //得出整个包体的长度
        String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length);
        //讲包体长度放入buffer的前四位
        byteBuffer.put(packetSize.getBytes());
        //移动buffer的postion指针到第四位,包体将从第四位开始写入
        byteBuffer.position(PACKET_HEAD_LENGTH);
        String str = iOrder.generateOrderStr();
        //写入包体
        if (logger.isDebugEnabled()) {
            logger.debug("服务端写入通道的包体:" + str);
        }
        byteBuffer.put(str.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

上面的代码是有问题的,我修改了一下,但是仍然不确定是否存在问题。就是在此处将iOrder放入队列,如下:

public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) {
    try {
        iOrder.setChannel(socketChannel);
        writeOrderQueue.offer(iOrder);
        socketChannel.register(selector, SelectionKey.OP_WRITE);
            } catch (IOException e) {
        e.printStackTrace();
    }
}

然后,在主线程run方法中

} else if (key.isWritable()) {
    doWrite();
}

doWrite代码:

private void doWrite() {

        while (true) {
            IOrder iOrder = writeOrderQueue.peek();
            if (iOrder == null) {
                break;
            }
            if (iOrder != null) {
                //创建一个byteBuffer用来存储要写入的buffer
                ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock());
                //得出整个包体的长度
                String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length);
                //讲包体长度放入buffer的前四位
                byteBuffer.put(packetSize.getBytes());
                //移动buffer的postion指针到第四位,包体将从第四位开始写入
                byteBuffer.position(PACKET_HEAD_LENGTH);
                String str = iOrder.generateOrderStr();
                //写入包体
                if (logger.isDebugEnabled()) {
                    logger.debug("服务端写入通道的包体:" + str);
                }
                byteBuffer.put(str.getBytes());

                byteBuffer.flip();
                //如果上次写入的是断包,则将byteBuffer移动到上次写入的位置,
                if (writePos > 0) {
                    byteBuffer.position(writePos);
                }

                SocketChannel channel = iOrder.getChannel();
                try {
                    channel.write(byteBuffer);
                    //如果buffer没有全部被写入,那么记录写入位置后,中断循环,等待下次OP_WRITE
                    if (byteBuffer.remaining() > 0) {
                        writePos = byteBuffer.position();
                        break;
                    } else {
                        writePos = 0;
                        writeOrderQueue.remove();
                        channel.register(selector, SelectionKey.OP_READ);
                    }
//                logger.info("读完的byteBuffer:" + byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }


            }

        }
    }

这么做的目的是nio本身的机制我当初没有理解,通道的OP_WRITE状态代表的是准备好写入,如果我强行写入是可以的,但是不保证其会准备好被写入,当其没准备好的时候,就可以写入失败。所以我要先将指令对象放入队列,等通道准备好写入,再从队列中拿出指令。但是在写入的时候,有可能一次写入不全,所以我判断了buffer的剩余字节,以确定全部写入,如果没有全部写入,那么就记录buffer的位置,下次OP_WRITE的时候再次从这个位置开始写入。不知道这样做有什么问题没有?敬请指导!



© 著作权归作者所有

共有 人打赏支持
皮鞋铮亮
粉丝 36
博文 12
码字总数 11603
作品 0
沈阳
加载中

评论(5)

皮鞋铮亮
皮鞋铮亮

引用来自“xtgss007”的评论

还期待楼主的更新,怎么不更新了。
我的代码有问题,自从看了一个大神的代码之后,我发现了许多自己的不足,不要直接使用我的代码,有隐患。
皮鞋铮亮
皮鞋铮亮

引用来自“xtgss007”的评论

还期待楼主的更新,怎么不更新了。

这两天比较忙,出差了。等我回去再写。
xtgss007
xtgss007
还期待楼主的更新,怎么不更新了。
皮鞋铮亮
皮鞋铮亮

引用来自“专治各种BUG”的评论

这种文章还上推荐。晕。

嗯,水平很低,敬请指导。
doge_刀戈
doge_刀戈
这种文章还上推荐。晕。
基于NIO的消息路由的实现(三)服务端与客户端结构

一、服务器端结构: 如图所示: 指令类和报文类:对下行的指令和上行的报文进行了类的封装,分别实现IOrder和IPacket接口,继承Order,Packet基类; 服务主线程:接受客户端连接,将客户端发...

皮鞋铮亮 ⋅ 2015/08/18 ⋅ 3

Netty干货分享:京东京麦的生产级TCP网关技术实践总结

1、引言 京东的京麦商家后台2014年构建网关,从HTTP网关发展到TCP网关。在2016年重构完成基于Netty4.x+Protobuf3.x实现对接PC和App上下行通信的高可用、高性能、高稳定的TCP长连接网关。 早期...

JackJiang2011 ⋅ 2017/12/01 ⋅ 0

基于NIO的消息路由的实现(一) 前言

一、前言: 已经很久没有碰编码了,大概有9年的时间,日新月异的框架和新东西让我眼花缭乱。之前一直在做web相关的应用。由于项目不大,分布式开发在我编码的那个年代里没有做过,后来走上管...

皮鞋铮亮 ⋅ 2015/08/17 ⋅ 16

Android与MINA2、Netty4的跨平台UDP双向通信实战

概述 本文演示的是一个Android客户端程序,通过UDP协议与两个典型的NIO框架服务端,实现跨平台双向通信的完整Demo。 当前由于NIO框架的流行,使得开发大并发、高性能的互联网服务端成为可能。...

JackJiang- ⋅ 2016/06/30 ⋅ 1

基于XMPP协议的Android即时通信系

以前做过一个基于XMPP协议的聊天社交软件,总结了一下。发出来。 设计基于开源的XMPP即时通信协议,采用C/S体系结构,通过GPRS无线网络用TCP协议连接到服务器,以架设开源的Openfn'e服务器作...

丁佳辉 ⋅ 2016/06/14 ⋅ 0

基于XMPP协议的Android即时通信

设计基于开源的XMPP即时通信协议,采用C/S体系结构,通过GPRS无线网络用TCP协议连接到服务器,以架设开源的Openfn'e服务器作为即时通讯平台。 系统主要由以下部分组成:一是服务器,负责管理...

刘遇安 ⋅ 2014/08/28 ⋅ 0

Java IO:BIO和NIO区别及各自应用场景

引言 BIO和NIO是两种不同的网络通信模型,现如今NIO已经大量应用在Jetty、ZooKeeper、Netty等开源框架中。 一个面向流、一个面向缓冲区 一个是阻塞式的、一个非阻塞 一个没有io多路复用器、一...

Zero零_度 ⋅ 2016/08/27 ⋅ 0

NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战

前言 本文将演示一个iOS客户端程序,通过UDP协议与两个典型的NIO框架服务端,实现跨平台双向通信的完整Demo。服务端将分别用MINA2和Netty4进行实现,而通信时服务端你只需选其一就行了。同时...

JackJiang- ⋅ 2016/06/28 ⋅ 0

Java远程通讯技术及原理分析

聊聊远程通信 Java远程通讯技术及原理分析 聊聊Socket、TCP/IP、HTTP、FTP及网络编程 RMI原理及实现 RPC原理及实现 轻量级分布式 RPC 框架 使用 RMI + ZooKeeper 实现远程调用框架 深入浅出S...

陶邦仁 ⋅ 2016/02/23 ⋅ 2

大型网站系统与 Java 中间件实践

原文出处:wanglizhi 第一章 分布式系统介绍 分布式系统的定义:组件分布在网络计算机上,组件间仅仅通过消息传递来通信并协调行动。 分布式系统的意义: 升级单机处理能力的性价比越来越低 ...

wanglizhi ⋅ 2017/12/29 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

BS与CS的联系与区别【简】

C/S是Client/Server的缩写。服务器通常采用高性能的PC、工作站或小型机,并采用大型数据库系统,如Oracle、Sybase、InFORMix或 SQL Server。客户端需要安装专用的客户端软件。 B/S是Brower/...

anlve ⋅ 50分钟前 ⋅ 0

发生了什么?Linus 又发怒了?

在一个 Linux 内核 4.18-rc1 的 Pull Request 中,开发者 Andy Shevchenko 表示其在对设备属性框架进行更新时,移除了 union 别名,这引发了 Linus 的暴怒。 这一次 Linus Torvalds 发怒的原...

问题终结者 ⋅ 今天 ⋅ 0

在树莓派上搭建一个maven仓库

在树莓派上搭建一个maven仓库 20180618 lambo init 项目说明 家里有台树莓派性能太慢。想搭建一个maven私服, 使用nexus或者 jfrog-artifactory 运行的够呛。怎么办呢,手写一个吧.所在这个...

林小宝 ⋅ 今天 ⋅ 0

Spring发展历程总结

转自与 https://www.cnblogs.com/RunForLove/p/4641672.html 目前很多公司的架构,从Struts2迁移到了SpringMVC。你有想过为什么不使用Servlet+JSP来构建Java web项目,而是采用SpringMVC呢?...

onedotdot ⋅ 今天 ⋅ 0

Python模块/包/库安装(6种方法)

Python模块/包/库安装(6种方法) 冰颖机器人 2016-11-29 21:33:26 一、方法1: 单文件模块 直接把文件拷贝到 $python_dir/Lib 二、方法2: 多文件模块,带setup.py 下载模块包(压缩文件zip...

cswangyx ⋅ 今天 ⋅ 0

零基础学习大数据人工智能,学习路线篇!系统规划大数据之路?

大数据处理技术怎么学习呢?首先我们要学习Python语言和Linux操作系统,这两个是学习大数据的基础,学习的顺序不分前后。 Python:Python 的排名从去年开始就借助人工智能持续上升,现在它已经...

董黎明 ⋅ 今天 ⋅ 0

openJdk和sun jdk的区别

使用过LINUX的人都应该知道,在大多数LINUX发行版本里,内置或者通过软件源安装JDK的话,都是安装的OpenJDK, 那么到底什么是OpenJDK,它与SUN JDK有什么关系和区别呢? 历史上的原因是,Ope...

jason_kiss ⋅ 今天 ⋅ 0

梳理

Redux 是 JavaScript 状态容器,提供可预测化的状态管理。 它是JS的状态容器,是一种解决问题的方式,所以即可以用于 react 也可以用于 vue。 需要理解其思想及实现方式。 应用中所有的 stat...

分秒 ⋅ 今天 ⋅ 0

Java 后台判断是否为ajax请求

/** * 是否是Ajax请求 * @param request * @return */public static boolean isAjax(ServletRequest request){return "XMLHttpRequest".equalsIgnoreCase(((HttpServletReques......

JavaSon712 ⋅ 今天 ⋅ 0

Redis 单线程 为何却需要事务处理并发问题

Redis是单线程处理,也就是命令会顺序执行。那么为什么会存在并发问题呢? 个人理解是,虽然redis是单线程,但是可以同时有多个客户端访问,每个客户端会有 一个线程。客户端访问之间存在竞争...

码代码的小司机 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部