文档章节

基于NIO的消息路由的实现(六)报文队列的处理

皮鞋铮亮
 皮鞋铮亮
发布于 2015/08/26 15:19
字数 770
阅读 145
收藏 5

一、报文队列的处理:

如果将多路复用器获取到的所有事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,所以我把从客户端接收到的大部分消息,都放入了队列中,然后另外启动队列的消费线程对消息进行异步的处理;具体如下:

1.通讯报文队列消费者:在selector对read事件的处理过程中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义如下,CQUEUE是所有客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另一个队列MQUEUE。

public class GVQueue {
    //通讯级别报文的队列
    public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000);
    //短消息级别报文的队列
    public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000);
}

2.CQUEUE队列的消费者线程,专门针对通讯层面的消息进行处理,比如:客户端链路维护的回应等;如下:

public class CQueueConsumer extends Thread {

    private int waitTime;

    private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName());

    public CQueueConsumer(int waitTime) {
        this.waitTime = waitTime;
    }

    public void run() {
        logger.info("通讯队列消费者线程启动……");
        boolean isRunning = true;
        try {
            while (isRunning) {
                IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
                    handleQueue(packet);
/*                    if (logger.isDebugEnabled()) {
                        logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                    }*/
                    logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                } else {
                    Thread.sleep(waitTime);
                    if (logger.isDebugEnabled()) {
                        logger.debug("消息队列中没有消息,休息一会儿……");
                    }
                }
            }
        } catch (InterruptedException e) {
            logger.info("通讯队列消费者处理线程终止……");
            e.printStackTrace();
        }
    }

    /**
     * 通讯层处理(对除了M报文之外的报文进行处理)
     * @param packet
     */
    private void handleQueue(IPacket packet) {

        //如果是短消息类报文,则直接放入短消息队列等待短消息消费者处理;
        if (packet.getHeader().equals(MsgPacket.HEADER)){
            GVQueue.MQUEUE.offer(packet);
        }
        if (!packet.getHeader().equals(ReplyPacket.HEADER)) {
            //需要更新通道的最后访问时间
            GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken());
            if (gvConn!=null){
                //更改最后访问时间
                GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc());
                SocketChannel socketChannel = gvConn.getChannel();
                //对客户端的报文做出R相应
                if (socketChannel != null) {
                    ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER);
                    replyOrder.initReplyOrder(packet.getRid());
                    GVServer.write2Client(replyOrder, socketChannel);
                }
            }

        }

    }
}


3.而MQUEUE队列的消费者线程,则专门针对M类报文进行处理,它的工作是拿出M报文,找到目标通道,然后将报文内容转入目标通道(目前离线存储尚未实现)。如下:

public class MQueueConsumer extends Thread {
    private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName());
    public void run() {
        logger.info("短消息队列消费者线程启动……");
        while (true) {

            try {
                Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
//                    Logs.info("读出消息队列收到的客户端消息:" + packet.getPacketStr());
                    MsgInfo msgInfo = new MsgInfo();
                    msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody());

                    SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver());

                    if(channel!=null && channel.isOpen()) {
                        MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER);
                        msgOrder.initMsgOrder(packet.getPacketBody());
                        GVServer.write2Client(msgOrder, channel);
                        if (logger.isDebugEnabled()) {
                            logger.debug("短消息发送至:<" + msgInfo.getReceiver() + ">");
                        }
                    }else{
                        /*
                        此处将数据放入离线存储队列
                         */
                        if(logger.isDebugEnabled()) {
                            logger.debug("短消息放入离线短消息队列:<" + msgInfo.getReceiver() + ">");
                        }
                    }


                } else {
                    Thread.sleep(200);
                    if(logger.isDebugEnabled()) {
                        logger.debug("消息队列中没有消息,休息一会儿……");
                    }
                }
            } catch (InterruptedException e) {
                logger.info("短消息队列消费者处理线程终止……");
                e.printStackTrace();
            }

        }
    }
}


© 著作权归作者所有

共有 人打赏支持
皮鞋铮亮
粉丝 36
博文 12
码字总数 11603
作品 0
沈阳
基于NIO的消息路由的实现(三)服务端与客户端结构

一、服务器端结构: 如图所示: 指令类和报文类:对下行的指令和上行的报文进行了类的封装,分别实现IOrder和IPacket接口,继承Order,Packet基类; 服务主线程:接受客户端连接,将客户端发...

皮鞋铮亮
2015/08/18
0
3
基于NIO的消息路由的实现(四) 服务端通讯主线程(1)

一、简单介绍: 服务端通讯主线程是消息路由服务的启动类,其主要作用如下: 1、初始化相关配置; 2、根据配置的ip和port创建tcp服务; 3、接收客户端连接,并给客户端分配令牌; 4、接收客户...

皮鞋铮亮
2015/08/18
0
5
基于Mina实现的一个简单数据采集中间件

一、前言 该数据据采集中间件需要实现与多个终端的长连接,并定时给所有终端发送指令,终端在接收到相关指令后,返回相关信息给中间件。中间件需要一直监测所有终端的在线状态,并一直监听、...

ytangdigl
2017/09/23
0
0
如何使用Netty开发实现高性能的RPC服务器

RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的...

vshcxl
2017/10/20
0
0
NOVA源码分析——NOVA中的RabbitMQ解析

本篇文章是由本人阅读NOVA源码过程中的心得、RabbitMQ的官方文档以及网上的一些资料整理总结而成的,也为了方便以后对这部分内容的复习。 NOVA是OpenStack系统的核心模块,主要负责虚拟机实例...

youyu2299
2013/12/19
0
1

没有更多内容

加载失败,请刷新页面

加载更多

VSCode 搭建Vue开发环境之Vue CLI

一、简介说明 1.关于VS Code开发工具,安装和配置,更多可以参考以前文章 2.关于Vue.js,Vue是一个优秀的渐进式前端框架,不仅易于上手,还便于与第三方库或既有项目整合。 3.关于Vue是使用方...

tianma3798
24分钟前
2
0
MySQL 相关博客整理

1. 《深入理解 MySQL 底层实现》 简评:文章从硬盘底层存储原理讲解到MySQL存储原理,其中涉及InnoDB 和 Myisam 中 B+Tree 的应用,以及常见数据库优化思路,算是一片很不错的讲解MySQL原理的...

科陆李明
34分钟前
2
0
pada rabbitmq server mangage

查看配置文件 ubuntu@node4:/etc/rabbitmq$ lltotal 28drwxr-xr-x 2 rabbitmq rabbitmq 4096 Jun 6 13:52 ./drwxr-xr-x 104 root root 12288 Sep 26 11:39 ../-rw-r--r-- ......

qwfys
42分钟前
0
0
SpringBoot进阶

慕课网链接 表单数据的验证 在pojo类属性的上面添加注解 @Entitypublic class Girl { @Id @GeneratedValue private Integer id; @NotBlank(message = "这个字段...

踏破铁鞋无觅处
49分钟前
1
0
【SylixOS】QT-QWS流程介绍

QWS简介 QWS(QT Windows System)是QT自行开发的窗口系统,体系结构类似X Windows的C/S结构。QWS Server在物理设备上显示,QWS Client实现界面,两者通过socket进行彼此的通讯。在很多嵌入式系...

suokin
49分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部