文档章节

基于NIO的消息路由的实现(六)报文队列的处理

皮鞋铮亮
 皮鞋铮亮
发布于 2015/08/26 15:19
字数 770
阅读 153
收藏 5

一、报文队列的处理:

如果将多路复用器获取到的所有事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,所以我把从客户端接收到的大部分消息,都放入了队列中,然后另外启动队列的消费线程对消息进行异步的处理;具体如下:

1.通讯报文队列消费者:在selector对read事件的处理过程中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义如下,CQUEUE是所有客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另一个队列MQUEUE。

public class GVQueue {
    //通讯级别报文的队列
    public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000);
    //短消息级别报文的队列
    public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000);
}

2.CQUEUE队列的消费者线程,专门针对通讯层面的消息进行处理,比如:客户端链路维护的回应等;如下:

public class CQueueConsumer extends Thread {

    private int waitTime;

    private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName());

    public CQueueConsumer(int waitTime) {
        this.waitTime = waitTime;
    }

    public void run() {
        logger.info("通讯队列消费者线程启动……");
        boolean isRunning = true;
        try {
            while (isRunning) {
                IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
                    handleQueue(packet);
/*                    if (logger.isDebugEnabled()) {
                        logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                    }*/
                    logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                } else {
                    Thread.sleep(waitTime);
                    if (logger.isDebugEnabled()) {
                        logger.debug("消息队列中没有消息,休息一会儿……");
                    }
                }
            }
        } catch (InterruptedException e) {
            logger.info("通讯队列消费者处理线程终止……");
            e.printStackTrace();
        }
    }

    /**
     * 通讯层处理(对除了M报文之外的报文进行处理)
     * @param packet
     */
    private void handleQueue(IPacket packet) {

        //如果是短消息类报文,则直接放入短消息队列等待短消息消费者处理;
        if (packet.getHeader().equals(MsgPacket.HEADER)){
            GVQueue.MQUEUE.offer(packet);
        }
        if (!packet.getHeader().equals(ReplyPacket.HEADER)) {
            //需要更新通道的最后访问时间
            GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken());
            if (gvConn!=null){
                //更改最后访问时间
                GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc());
                SocketChannel socketChannel = gvConn.getChannel();
                //对客户端的报文做出R相应
                if (socketChannel != null) {
                    ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER);
                    replyOrder.initReplyOrder(packet.getRid());
                    GVServer.write2Client(replyOrder, socketChannel);
                }
            }

        }

    }
}


3.而MQUEUE队列的消费者线程,则专门针对M类报文进行处理,它的工作是拿出M报文,找到目标通道,然后将报文内容转入目标通道(目前离线存储尚未实现)。如下:

public class MQueueConsumer extends Thread {
    private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName());
    public void run() {
        logger.info("短消息队列消费者线程启动……");
        while (true) {

            try {
                Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
//                    Logs.info("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                    MsgInfo msgInfo = new MsgInfo();
                    msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody());

                    SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver());

                    if(channel!=null && channel.isOpen()) {
                        MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER);
                        msgOrder.initMsgOrder(packet.getPacketBody());
                        GVServer.write2Client(msgOrder, channel);
                        if (logger.isDebugEnabled()) {
                            logger.debug("短消息发送至:<" + msgInfo.getReceiver() + ">");
                        }
                    }else{
                        /*
                        此处将数据放入离线存储队列
                         */
                        if(logger.isDebugEnabled()) {
                            logger.debug("短消息放入离线短消息队列:<" + msgInfo.getReceiver() + ">");
                        }
                    }


                } else {
                    Thread.sleep(200);
                    if(logger.isDebugEnabled()) {
                        logger.debug("消息队列中没有消息,休息一会儿……");
                    }
                }
            } catch (InterruptedException e) {
                logger.info("短消息队列消费者处理线程终止……");
                e.printStackTrace();
            }

        }
    }
}


© 著作权归作者所有

共有 人打赏支持
皮鞋铮亮
粉丝 36
博文 12
码字总数 11603
作品 0
沈阳
私信 提问
基于NIO的消息路由的实现(三)服务端与客户端结构

一、服务器端结构: 如图所示: 指令类和报文类:对下行的指令和上行的报文进行了类的封装,分别实现IOrder和IPacket接口,继承Order,Packet基类; 服务主线程:接受客户端连接,将客户端发...

皮鞋铮亮
2015/08/18
0
3
基于NIO的消息路由的实现(四) 服务端通讯主线程(1)

一、简单介绍: 服务端通讯主线程是消息路由服务的启动类,其主要作用如下: 1、初始化相关配置; 2、根据配置的ip和port创建tcp服务; 3、接收客户端连接,并给客户端分配令牌; 4、接收客户...

皮鞋铮亮
2015/08/18
0
5
基于Mina实现的一个简单数据采集中间件

一、前言 该数据据采集中间件需要实现与多个终端的长连接,并定时给所有终端发送指令,终端在接收到相关指令后,返回相关信息给中间件。中间件需要一直监测所有终端的在线状态,并一直监听、...

ytangdigl
2017/09/23
0
0
如何使用Netty开发实现高性能的RPC服务器

RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的...

vshcxl
2017/10/20
0
0
NOVA源码分析——NOVA中的RabbitMQ解析

本篇文章是由本人阅读NOVA源码过程中的心得、RabbitMQ的官方文档以及网上的一些资料整理总结而成的,也为了方便以后对这部分内容的复习。 NOVA是OpenStack系统的核心模块,主要负责虚拟机实例...

youyu2299
2013/12/19
0
1

没有更多内容

加载失败,请刷新页面

加载更多

深入理解JVM—JVM内存模型

深入理解JVM—JVM内存模型 我们知道,计算机CPU和内存的交互是最频繁的,内存是我们的高速缓存区,用户磁盘和CPU的交互,而CPU运转速度越来越快,磁盘远远跟不上CPU的读写速度,才设计了内存...

onedotdot
33分钟前
1
0
MVC、MVCS、MVVM、MVP、VIPER等这么多架构模式哪一个好呢?

在项目开启阶段,其中一个很重要的环节就是选架构。 那么面对目前已知的这么多架构模式我们该怎么选择呢?这确实是个很让人头疼的问题! 下面我就在这里梳理一下目前常见的一些架构模式。 先...

Java干货分享
今天
4
0
简单模仿配置文件的反射机制

//Student类 public class Student { public void love() { System.out.println("python"); } } //Tesy类 public class Tesy { public static void main(String[] args) throws Exceptio......

南桥北木
今天
2
0
你真的需要了解一下CSS变量 var()的用法

当Web项目变得越来越大时,他的CSS会变得像天文数字那么大而且还变得混乱。为了帮助我们解决这个问题,新的CSS变量很快就会出现在主流浏览器中,它让开发人员能够重用并轻松编辑重复出现的C...

前端小攻略
今天
1
0
嵌入式应用选择合适的微控制器

为嵌入式应用选择微控制器有几个原因,即低成本,高集成度,增加可靠性,节省空间等。 准备所需硬件接口列表使用微控制器的基本硬件框图,准备一份微控制器需要支持的所有外设接口的列表。微...

linux-tao
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部