文档章节

基于NIO的消息路由的实现(六)报文队列的处理

皮鞋铮亮
 皮鞋铮亮
发布于 2015/08/26 15:19
字数 770
阅读 140
收藏 5
点赞 0
评论 0

一、报文队列的处理:

如果将多路复用器获取到的所有事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,所以我把从客户端接收到的大部分消息,都放入了队列中,然后另外启动队列的消费线程对消息进行异步的处理;具体如下:

1.通讯报文队列消费者:在selector对read事件的处理过程中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义如下,CQUEUE是所有客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另一个队列MQUEUE。

public class GVQueue {
    //通讯级别报文的队列
    public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000);
    //短消息级别报文的队列
    public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000);
}

2.CQUEUE队列的消费者线程,专门针对通讯层面的消息进行处理,比如:客户端链路维护的回应等;如下:

public class CQueueConsumer extends Thread {

    private int waitTime;

    private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName());

    public CQueueConsumer(int waitTime) {
        this.waitTime = waitTime;
    }

    public void run() {
        logger.info("通讯队列消费者线程启动……");
        boolean isRunning = true;
        try {
            while (isRunning) {
                IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
                    handleQueue(packet);
/*                    if (logger.isDebugEnabled()) {
                        logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                    }*/
                    logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                } else {
                    Thread.sleep(waitTime);
                    if (logger.isDebugEnabled()) {
                        logger.debug("消息队列中没有消息,休息一会儿……");
                    }
                }
            }
        } catch (InterruptedException e) {
            logger.info("通讯队列消费者处理线程终止……");
            e.printStackTrace();
        }
    }

    /**
     * 通讯层处理(对除了M报文之外的报文进行处理)
     * @param packet
     */
    private void handleQueue(IPacket packet) {

        //如果是短消息类报文,则直接放入短消息队列等待短消息消费者处理;
        if (packet.getHeader().equals(MsgPacket.HEADER)){
            GVQueue.MQUEUE.offer(packet);
        }
        if (!packet.getHeader().equals(ReplyPacket.HEADER)) {
            //需要更新通道的最后访问时间
            GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken());
            if (gvConn!=null){
                //更改最后访问时间
                GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc());
                SocketChannel socketChannel = gvConn.getChannel();
                //对客户端的报文做出R相应
                if (socketChannel != null) {
                    ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER);
                    replyOrder.initReplyOrder(packet.getRid());
                    GVServer.write2Client(replyOrder, socketChannel);
                }
            }

        }

    }
}


3.而MQUEUE队列的消费者线程,则专门针对M类报文进行处理,它的工作是拿出M报文,找到目标通道,然后将报文内容转入目标通道(目前离线存储尚未实现)。如下:

public class MQueueConsumer extends Thread {
    private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName());
    public void run() {
        logger.info("短消息队列消费者线程启动……");
        while (true) {

            try {
                Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
//                    Logs.info("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                    MsgInfo msgInfo = new MsgInfo();
                    msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody());

                    SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver());

                    if(channel!=null && channel.isOpen()) {
                        MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER);
                        msgOrder.initMsgOrder(packet.getPacketBody());
                        GVServer.write2Client(msgOrder, channel);
                        if (logger.isDebugEnabled()) {
                            logger.debug("短消息发送至:<" + msgInfo.getReceiver() + ">");
                        }
                    }else{
                        /*
                        此处将数据放入离线存储队列
                         */
                        if(logger.isDebugEnabled()) {
                            logger.debug("短消息放入离线短消息队列:<" + msgInfo.getReceiver() + ">");
                        }
                    }


                } else {
                    Thread.sleep(200);
                    if(logger.isDebugEnabled()) {
                        logger.debug("消息队列中没有消息,休息一会儿……");
                    }
                }
            } catch (InterruptedException e) {
                logger.info("短消息队列消费者处理线程终止……");
                e.printStackTrace();
            }

        }
    }
}


© 著作权归作者所有

共有 人打赏支持
皮鞋铮亮
粉丝 36
博文 12
码字总数 11603
作品 0
沈阳
基于NIO的消息路由的实现(三)服务端与客户端结构

一、服务器端结构: 如图所示: 指令类和报文类:对下行的指令和上行的报文进行了类的封装,分别实现IOrder和IPacket接口,继承Order,Packet基类; 服务主线程:接受客户端连接,将客户端发...

皮鞋铮亮
2015/08/18
0
3
基于NIO的消息路由的实现(四) 服务端通讯主线程(1)

一、简单介绍: 服务端通讯主线程是消息路由服务的启动类,其主要作用如下: 1、初始化相关配置; 2、根据配置的ip和port创建tcp服务; 3、接收客户端连接,并给客户端分配令牌; 4、接收客户...

皮鞋铮亮
2015/08/18
0
5
基于Mina实现的一个简单数据采集中间件

一、前言 该数据据采集中间件需要实现与多个终端的长连接,并定时给所有终端发送指令,终端在接收到相关指令后,返回相关信息给中间件。中间件需要一直监测所有终端的在线状态,并一直监听、...

ytangdigl
2017/09/23
0
0
如何使用Netty开发实现高性能的RPC服务器

RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的...

vshcxl
2017/10/20
0
0
NOVA源码分析——NOVA中的RabbitMQ解析

本篇文章是由本人阅读NOVA源码过程中的心得、RabbitMQ的官方文档以及网上的一些资料整理总结而成的,也为了方便以后对这部分内容的复习。 NOVA是OpenStack系统的核心模块,主要负责虚拟机实例...

youyu2299
2013/12/19
0
1
Netty高性能架构之道

引言 Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的...

无寄语
2016/09/09
147
0
基于NIO的消息路由的实现(四) 服务端通讯主线程(2)断包和粘包的处理

本来我打算单独开一章,专门说明粘包和断包,但是觉得这个事儿我在做的时候挺头疼的,但是对于别人或许不那么重要,于是就在这里写吧。 那么何谓粘包、何谓断包呢? 粘包:我们知道客户端在写...

皮鞋铮亮
2015/08/18
0
1
消息中间件和AMQP

消息中间件也叫消息代理,是专门用来处理消息的代理组件,消息代理从消息生产者那里接收消息,然后按规则分发消息到消息队列,消息消费者从队列获取消息并确认消息处理。 消息中间件隔离了消...

LeonalGU
2017/11/12
0
0
SLG手游Java服务器的设计与开发——网络通信

前言 上文分析了我们这款SLG的架构,本章着重讲解我们的网络通信架构,由上文的功能分析我们可以得知,游戏的所有功能基本上属于非及时的通信机制,所以依靠HTTP短连接就能够基本满足游戏的通...

umgsai
2016/09/14
0
0
SLG手游Java服务器的设计与开发——网络通信

前言 上文分析了我们这款SLG的架构,本章着重讲解我们的网络通信架构,由上文的功能分析我们可以得知,游戏的所有功能基本上属于非及时的通信机制,所以依靠HTTP短连接就能够基本满足游戏的通...

umgsai
2016/09/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Centos7通过yum安装nginx

添加源地址(直接install可能不是最新版本的) sudo rpm -Uvh http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm 安装 sudo yum install -y ng......

iplusx
1分钟前
0
0
ef .core Dapper Helper

using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Data.SqlClient; using System.Threading.Tasks; using Dapper; using Dap......

Lytf
3分钟前
0
0
iOS 小笔记

1.以下代码打印什么     __block int val = 10;    void (^blk)(void) = ^{        printf("val=%d\n",val);        };       val = 2;    blk(); /...

风了个1
5分钟前
0
0
【Spring Boot 系列 Spring Boot示例程序】

入门程序步骤,创建一个Maven项目。继承Spring Boot官方提供的父工程。再引入一个Web的应用启动器。 1、选择一个合适的IDEA工具 创建一个Maven工程,并添加如下配置 <parent> <...

HansonReal
6分钟前
0
0
217. Contains Duplicate - LeetCode

Question 217. Contains Duplicate Solution 题目大意:判断数组中是否有重复元素 思路:构造一个set,不重复就加进去,重复返回true,如果数据量大的话,可以用布隆过滤器 Java实现: publ...

yysue
11分钟前
0
0
istio 处理失败 (理论)

Envoy提供了一套开箱即用的选择加入故障恢复功能,可以通过应用程序中的服务进行利用。功能包括: 超时 具有超时预算和重试之间的可变抖动的有界重试 限制并发连接数和对上游服务的请求 对负...

xiaomin0322
12分钟前
0
0
eclipse解决git冲突举例

本地修改了两个文件,提交时提示有冲突,想来应该是没有从远程仓库下载最新代码导致的。通过右击项目 -> Team -> Sychronized WorkSpace,比较本地仓库和远程仓库的异同:   此时没有更好的...

Code辉
20分钟前
0
0
运行.jar后缀的文件

前提必须安装了jdk,正确配置环境变量。 在dos窗口执行以下命令即可。 java -jar C:\Users\10492\Desktop\turn.jar

haha360
23分钟前
0
0
Java程序员如何做代码压力测试?【JWordPress前台项目实战】

代码 pom.xml文件引入包 <dependency><groupId>com.taobao.stresstester</groupId><artifactId>stresstester</artifactId><version>1.0</version></dependency> 编写测试代码 /**......

迷你芊宝宝
27分钟前
0
0
面试宝典-什么是缓存穿透?

缓存穿透是说收到了一个请求,但是该请求缓存里没有,只能去数据库里查询,然后放进缓存。 这里面有两个风险,一个是同时有好多请求访问同一个数据,然后业务系统把这些请求全发到了数据库;...

suyain
33分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部