文档章节

基于NIO的消息路由的实现(六)报文队列的处理

皮鞋铮亮
 皮鞋铮亮
发布于 2015/08/26 15:19
字数 770
阅读 150
收藏 5

一、报文队列的处理:

如果将多路复用器获取到的所有事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,所以我把从客户端接收到的大部分消息,都放入了队列中,然后另外启动队列的消费线程对消息进行异步的处理;具体如下:

1.通讯报文队列消费者:在selector对read事件的处理过程中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义如下,CQUEUE是所有客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另一个队列MQUEUE。

public class GVQueue {
    //通讯级别报文的队列
    public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000);
    //短消息级别报文的队列
    public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000);
}

2.CQUEUE队列的消费者线程,专门针对通讯层面的消息进行处理,比如:客户端链路维护的回应等;如下:

public class CQueueConsumer extends Thread {

    private int waitTime;

    private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName());

    public CQueueConsumer(int waitTime) {
        this.waitTime = waitTime;
    }

    public void run() {
        logger.info("通讯队列消费者线程启动……");
        boolean isRunning = true;
        try {
            while (isRunning) {
                IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
                    handleQueue(packet);
/*                    if (logger.isDebugEnabled()) {
                        logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                    }*/
                    logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                } else {
                    Thread.sleep(waitTime);
                    if (logger.isDebugEnabled()) {
                        logger.debug("消息队列中没有消息,休息一会儿……");
                    }
                }
            }
        } catch (InterruptedException e) {
            logger.info("通讯队列消费者处理线程终止……");
            e.printStackTrace();
        }
    }

    /**
     * 通讯层处理(对除了M报文之外的报文进行处理)
     * @param packet
     */
    private void handleQueue(IPacket packet) {

        //如果是短消息类报文,则直接放入短消息队列等待短消息消费者处理;
        if (packet.getHeader().equals(MsgPacket.HEADER)){
            GVQueue.MQUEUE.offer(packet);
        }
        if (!packet.getHeader().equals(ReplyPacket.HEADER)) {
            //需要更新通道的最后访问时间
            GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken());
            if (gvConn!=null){
                //更改最后访问时间
                GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc());
                SocketChannel socketChannel = gvConn.getChannel();
                //对客户端的报文做出R相应
                if (socketChannel != null) {
                    ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER);
                    replyOrder.initReplyOrder(packet.getRid());
                    GVServer.write2Client(replyOrder, socketChannel);
                }
            }

        }

    }
}


3.而MQUEUE队列的消费者线程,则专门针对M类报文进行处理,它的工作是拿出M报文,找到目标通道,然后将报文内容转入目标通道(目前离线存储尚未实现)。如下:

public class MQueueConsumer extends Thread {
    private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName());
    public void run() {
        logger.info("短消息队列消费者线程启动……");
        while (true) {

            try {
                Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
//                    Logs.info("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                    MsgInfo msgInfo = new MsgInfo();
                    msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody());

                    SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver());

                    if(channel!=null && channel.isOpen()) {
                        MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER);
                        msgOrder.initMsgOrder(packet.getPacketBody());
                        GVServer.write2Client(msgOrder, channel);
                        if (logger.isDebugEnabled()) {
                            logger.debug("短消息发送至:<" + msgInfo.getReceiver() + ">");
                        }
                    }else{
                        /*
                        此处将数据放入离线存储队列
                         */
                        if(logger.isDebugEnabled()) {
                            logger.debug("短消息放入离线短消息队列:<" + msgInfo.getReceiver() + ">");
                        }
                    }


                } else {
                    Thread.sleep(200);
                    if(logger.isDebugEnabled()) {
                        logger.debug("消息队列中没有消息,休息一会儿……");
                    }
                }
            } catch (InterruptedException e) {
                logger.info("短消息队列消费者处理线程终止……");
                e.printStackTrace();
            }

        }
    }
}


© 著作权归作者所有

共有 人打赏支持
皮鞋铮亮
粉丝 36
博文 12
码字总数 11603
作品 0
沈阳
私信 提问
基于NIO的消息路由的实现(三)服务端与客户端结构

一、服务器端结构: 如图所示: 指令类和报文类:对下行的指令和上行的报文进行了类的封装,分别实现IOrder和IPacket接口,继承Order,Packet基类; 服务主线程:接受客户端连接,将客户端发...

皮鞋铮亮
2015/08/18
0
3
基于NIO的消息路由的实现(四) 服务端通讯主线程(1)

一、简单介绍: 服务端通讯主线程是消息路由服务的启动类,其主要作用如下: 1、初始化相关配置; 2、根据配置的ip和port创建tcp服务; 3、接收客户端连接,并给客户端分配令牌; 4、接收客户...

皮鞋铮亮
2015/08/18
0
5
基于Mina实现的一个简单数据采集中间件

一、前言 该数据据采集中间件需要实现与多个终端的长连接,并定时给所有终端发送指令,终端在接收到相关指令后,返回相关信息给中间件。中间件需要一直监测所有终端的在线状态,并一直监听、...

ytangdigl
2017/09/23
0
0
如何使用Netty开发实现高性能的RPC服务器

RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的...

vshcxl
2017/10/20
0
0
NOVA源码分析——NOVA中的RabbitMQ解析

本篇文章是由本人阅读NOVA源码过程中的心得、RabbitMQ的官方文档以及网上的一些资料整理总结而成的,也为了方便以后对这部分内容的复习。 NOVA是OpenStack系统的核心模块,主要负责虚拟机实例...

youyu2299
2013/12/19
0
1

没有更多内容

加载失败,请刷新页面

加载更多

“敏捷开发”怎么就“敏捷”了

什么是敏捷开发 传统的软件开发过程中,我们往往会针对特定的用户需求,采用“瀑布模型”,从用户的需求开始一步步进行需求分析、软件设计、软件开发、软件测试以及软件交付与维护。 然而,这...

SamYjy
38分钟前
2
0
聊聊我怎么系统学习Linux技能并快速提高的

随着电子信息科技时代的发展,学会使用计算机在我们的生活中成为了必不可少的一项技能。而作为计算机中的三大操作系统之一的Linux更是饱受计算机爱好者们的喜爱。今天我们就来和大家一起聊一...

linuxprobe16
50分钟前
3
0
MySQL专题—— 从认识索引到理解索引【索引优化】

认识索引 认识索引是什么东西非常关键,一个非常恰当的比喻就是书的目录页与书的正文内容之间的关系,为了方便查找书中的内容,通过对内容建立索引形成目录。因此,首先你要明白的一点就是,...

架构师springboot
54分钟前
2
0
Java-怎样构造方法和匿名对象

前言 在编写程序时不安全的初始化会导致程序发生发生重大错误。为了使程序可以被安全地初始化,C++引入了构造器(也可以成为构造方法)的概念,这是一个在创建对象时被自动调用的特殊方法。J...

小刀爱编程
今天
2
0
7、MyBaties 增删改

事务 : 从数据库角度出发,完成业务时需要执行的 SQL 集合,统称一个事务. 1、在 mybatis 中默认是关闭了 JDBC 的自动提交功能 每一个 SqlSession 默认都是不自动提交事务. session.commit()提...

KingFightingAn
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部