基于NIO的消息路由的实现(四) 服务端通讯主线程(1)
基于NIO的消息路由的实现(四) 服务端通讯主线程(1)
皮鞋铮亮 发表于2年前
基于NIO的消息路由的实现(四) 服务端通讯主线程(1)
  • 发表于 2年前
  • 阅读 1006
  • 收藏 49
  • 点赞 2
  • 评论 5

【腾讯云】如何购买服务器最划算?>>>   

摘要: NIO 服务端通讯主线程

一、简单介绍:

服务端通讯主线程是消息路由服务的启动类,其主要作用如下:

1、初始化相关配置;

2、根据配置的ip和port创建tcp服务;

3、接收客户端连接,并给客户端分配令牌;

4、接收客户端的登录请求,并将客户端相关信息(令牌、客户端登录标识、最后访问时间、当前token所使用的通道,保存到缓冲区)

5、接收客户端的报文请求,并添加到通讯队列,等待处理;

6、接收来自各处的指令发送请求,并发送至相关通道;

二、详细介绍:

1、启动方法:首先加载配置信息;然后启动主线程、通讯报文消费线程(处理通讯类报文)、超时、失效通道回收线程(进行超时和失效通道的回收工作)、短消息消费者线程(专门针对短消息队列进行处理的线程)。尤其是OP_WRITE,在OP_WRITE之后,必须将selector注册为OP_READ,否则会一直循环下去,死循环。

public static void main(String arg[]) throws Exception {
    //初始化配置数据
    Config cfg = new Config(arg[0]);
    final GVServer gvServer = new GVServer();
    //启动ServerSocket通道
    if (gvServer.initServer(cfg)) {

        ExecutorService threadPool = Executors.newCachedThreadPool();
        //启动通讯服务主线程
        threadPool.execute(gvServer);
        //启动通讯报文消费线程
        threadPool.execute(new CQueueConsumer(cfg.getWaitTime()));
        //启动超时通道、失效通道回收线程
        threadPool.execute(new ConnTimeoutCleanThread(cfg.getCleanThreadOutTime(), cfg.getCleanThreadCycle()));
        threadPool.execute(new MQueueConsumer());
    }
}

2、初始化配置:打开tcp服务等待连接(略);

3、通讯事件处理:通讯主线程的run方法,主要对接收到的事件分别处理。这个地方尤其要注意的是,我第一篇文章提到的,所有触发的时间都必须被消费,否则会一直循环下去。

    public void run() {

        while (true) {
            try {

                //监听事件key
                selector.select(2000);
                //迭代一组事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    //定义一个socket通道
                    SocketChannel socketChannel = null;

                    int count = 0;

                    SelectionKey key = keys.next();
                    //  Logs.info("有网络事件被触发,事件类型为:" + key.interestOps());
                    //删除Iterator中的当前key,避免重复处理
                    keys.remove();
                    if (!key.isValid()) {
                        continue;
                    } else if (key.isAcceptable()) {
                        //从客户端送来的key中获取ServerSocket通道
                        serverSocketChannel = (ServerSocketChannel) key.channel();
                        //接收此ServerSocket通道中的Socket通道,accept是一个阻塞方法,一直到获取到连接才会继续
                        socketChannel = serverSocketChannel.accept();
                        //将此socket通道设置为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //将此通道注册到selector,并等待接收客户端的读入数据
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        allocToken(socketChannel);

                    } else if (key.isReadable()) {

                        //获取事件key中的channel
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
                        //清理缓冲区,便于使用
                        byteBuffer.clear();
                        //将channel中的字节流读入缓冲区
                        count = socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        //处理粘包
                        if (count > 0) {
                            try {
                                handlePacket(socketChannel, byteBuffer);
                            } catch (Exception e) {
                                e.printStackTrace();
//                                continue;//如果当前包存在非法抛出异常,那么不再进行处理直接跳出循环,处理下一个包;此处存疑,测试阶段暂时注释
                            }
                        } else if (count == 0) {
                            continue;
                        } else {
                            socketChannel.close();
                            logger.info("客户端"+socketChannel.toString()+"连接关闭!");

                        }

                    } else if (key.isWritable()) {
                        ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
                    }
                }

            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

4、将合法报文放入通讯队列:我在配置初始化的时候,明确规定了几种报文类,并在协议中限定了报文的格式,凡是不符合我报文格式的报文均视为非法报文,直接会给客户端回复一个错误指令。

/**
 * 处理不合法报文以及将合法报文放入队列等待处理
 *
 * @param socketChannel
 * @param strPacket
 */
private void offerPacket(SocketChannel socketChannel, String strPacket) {
    IPacket packet = AnalyseTools.analysePacket(strPacket);
    if (packet.getHeader().equals(LoginPacket.HEADER)) {
        handleLoginPacket(socketChannel, packet);
    }
    //如果类为空或者从handle单例map中无法取到类,则证明报文非法
    if (packet == null || Config.getPacketInstance(packet.getHeader()) == null) {
        //不在服务端识别范围内的报文,回复E响应,告知客户端不合法
        ErrorOrder errorOrder = (ErrorOrder) Config.getOrderInstance(ErrorOrder.HEADER);
        errorOrder.initErrorOrder(errorOrder.INVAILD_REQ_CODE, errorOrder.INVAILD_REQ_MSG);

        logger.info("客户端发送非法报文:" + strPacket);
        GVServer.write2Client(errorOrder, socketChannel);
        //将合法报文放入消息队列中等待处理
    } else {
        if (!GVQueue.CQUEUE.offer(packet)) {
            logger.error("消息队列已满,请增加队列容量!");

        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("将添加至消息队列,队列内消息数量为:" + GVQueue.CQUEUE.size());
            }
        }
    }
}

5、分配客户端的令牌:客户端连接之后,服务端马上会给客户端回复一个T指令,告诉客户端它的令牌,从此以后,客户端每次报文,都必须携带此令牌;在通讯这一层里,服务端根据token确定客户端;

    private void allocToken(SocketChannel socketChannel) {

        //给连接上来的通道分配token
        TokenOrder tokenOrder = (TokenOrder) Config.getOrderInstance(TokenOrder.HEADER);
        String token = UUID.randomUUID().toString();
        tokenOrder.initTokenOrder(token);
        //返回给客户端token
        write2Client(tokenOrder, socketChannel);
        logger.info("客户端:<" + token + ">已经连接!");
        //将连接后的channel保存到全局map中
        //2015.8.13修改,先把userId存为null,等待用户登录后,在将,
//        GVConnection gvConnection = new GVConnection(token,null, socketChannel, CommonTools.systemTimeUtc());
//        GVConnTools.addConn2Cache(gvConnection);

    }

6、客户端登录处理:仅凭借客户端token,我没法将服务用于业务中,业务中往往会存在一个用户的用户标记,我需要能够根据用户的标记,往通道里面写入消息;所以,有了客户端登录过程,客户端将自己唯一的业务标记提交到服务端。服务端建立一个token、用户标记、用户最后访问时间、通道的缓冲区(统一成了一个类GVConnection),专门用语指令的发送,并且保持几项内容的同步,GVConnTools为操作这些内容的唯一入口;

/**
 * 专门处理客户端登录报文,保存GVConn到缓冲区
 * 【注】对于userId重复的情况,在这里不做处理了,由业务系统自己处理,
 * 这里对userId重复相当于后登录的用户替换了先登录用户的通道。
 *
 * @param socketChannel
 * @param packet
 */
private void handleLoginPacket(SocketChannel socketChannel, IPacket packet) {

    GVConnection gvConn = new GVConnection(packet.getClientToken(),
            packet.getPacketBody(), socketChannel, CommonTools.systemTimeUtc());
    GVConnTools.addConn2Cache(gvConn);

}

7、消息写入通道:其实我完全可以在奔雷的外部提供一个专门的写入方法,但是当时好像脑子进水了,这个等以后迭代的时候在考虑如何处理吧。暂时放到这里。需要注意的是,这个方法是唯一对协议的前四位包头进行封装的方法,在所有其他的类中,都不需要对报文的前四位予以考虑。在客户端读取的时候,也会将前四位截取掉之后,或者将字符串放入队列,或者将一个报文(指令)对象放入队列。(为什么需要这四位,我将在下一个小部分——粘包、断包中讲解)

/**
 * 向客户端写入信息的方法
 *
 * @param iOrder        报文处理类接口
 * @param socketChannel
 */
public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) {
    try {
        socketChannel.register(selector, SelectionKey.OP_WRITE);
        //创建一个byteBuffer用来存储要写入的buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock());
        //得出整个包体的长度
        String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length);
        //讲包体长度放入buffer的前四位
        byteBuffer.put(packetSize.getBytes());
        //移动buffer的postion指针到第四位,包体将从第四位开始写入
        byteBuffer.position(PACKET_HEAD_LENGTH);
        String str = iOrder.generateOrderStr();
        //写入包体
        if (logger.isDebugEnabled()) {
            logger.debug("服务端写入通道的包体:" + str);
        }
        byteBuffer.put(str.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

上面的代码是有问题的,我修改了一下,但是仍然不确定是否存在问题。就是在此处将iOrder放入队列,如下:

public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) {
    try {
        iOrder.setChannel(socketChannel);
        writeOrderQueue.offer(iOrder);
        socketChannel.register(selector, SelectionKey.OP_WRITE);
            } catch (IOException e) {
        e.printStackTrace();
    }
}

然后,在主线程run方法中

} else if (key.isWritable()) {
    doWrite();
}

doWrite代码:

private void doWrite() {

        while (true) {
            IOrder iOrder = writeOrderQueue.peek();
            if (iOrder == null) {
                break;
            }
            if (iOrder != null) {
                //创建一个byteBuffer用来存储要写入的buffer
                ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock());
                //得出整个包体的长度
                String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length);
                //讲包体长度放入buffer的前四位
                byteBuffer.put(packetSize.getBytes());
                //移动buffer的postion指针到第四位,包体将从第四位开始写入
                byteBuffer.position(PACKET_HEAD_LENGTH);
                String str = iOrder.generateOrderStr();
                //写入包体
                if (logger.isDebugEnabled()) {
                    logger.debug("服务端写入通道的包体:" + str);
                }
                byteBuffer.put(str.getBytes());

                byteBuffer.flip();
                //如果上次写入的是断包,则将byteBuffer移动到上次写入的位置,
                if (writePos > 0) {
                    byteBuffer.position(writePos);
                }

                SocketChannel channel = iOrder.getChannel();
                try {
                    channel.write(byteBuffer);
                    //如果buffer没有全部被写入,那么记录写入位置后,中断循环,等待下次OP_WRITE
                    if (byteBuffer.remaining() > 0) {
                        writePos = byteBuffer.position();
                        break;
                    } else {
                        writePos = 0;
                        writeOrderQueue.remove();
                        channel.register(selector, SelectionKey.OP_READ);
                    }
//                logger.info("读完的byteBuffer:" + byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }


            }

        }
    }

这么做的目的是nio本身的机制我当初没有理解,通道的OP_WRITE状态代表的是准备好写入,如果我强行写入是可以的,但是不保证其会准备好被写入,当其没准备好的时候,就可以写入失败。所以我要先将指令对象放入队列,等通道准备好写入,再从队列中拿出指令。但是在写入的时候,有可能一次写入不全,所以我判断了buffer的剩余字节,以确定全部写入,如果没有全部写入,那么就记录buffer的位置,下次OP_WRITE的时候再次从这个位置开始写入。不知道这样做有什么问题没有?敬请指导!



共有 人打赏支持
粉丝 35
博文 12
码字总数 11603
评论 (5)
doge_刀戈
这种文章还上推荐。晕。
皮鞋铮亮

引用来自“专治各种BUG”的评论

这种文章还上推荐。晕。

嗯,水平很低,敬请指导。
xtgss007
还期待楼主的更新,怎么不更新了。
皮鞋铮亮

引用来自“xtgss007”的评论

还期待楼主的更新,怎么不更新了。

这两天比较忙,出差了。等我回去再写。
皮鞋铮亮

引用来自“xtgss007”的评论

还期待楼主的更新,怎么不更新了。
我的代码有问题,自从看了一个大神的代码之后,我发现了许多自己的不足,不要直接使用我的代码,有隐患。
×
皮鞋铮亮
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: