文档章节

基于NIO的消息路由的实现(一) 前言

皮鞋铮亮
 皮鞋铮亮
发布于 2015/08/17 14:25
字数 1551
阅读 4509
收藏 209
点赞 22
评论 16

一、前言:

已经很久没有碰编码了,大概有9年的时间,日新月异的框架和新东西让我眼花缭乱。之前一直在做web相关的应用。由于项目不大,分布式开发在我编码的那个年代里没有做过,后来走上管理岗位才接触到,仅限于沟通交流和方案的策划,并没有真正的做过。如今我有了一点时间和精力,决定自己学习一下,先从简单的消息通讯开始吧。

好,背景完毕!下面说说我想做的东西,我想做一个基于NIO的消息路由,而并不基于目前已有的各种优秀框架(mina,netty等等),这么做的初衷也许跟我个人的习惯有关,我总是觉得如果不明白原理,即使再好的框架当遭遇问题的时候,我也会无从下手,如果我懂得了原理,再选用其他的框架,也会更得心应手。所以才没有使用现今那些优秀的框架,或许是我的一点点偏见吧。

我的代码已经发布在 http://git.oschina.net/java616

目已经完成根据客户端的标识进行消息的异步转发,仍会持续的迭代和增加。有兴趣的可以下载回去,如果我有做的不好或者不对的地方,敬请指出。

二、一些概念和例程

NIO是啥我就不说了,我们来看一下我理解的NIO工作流程,如图:

上图为我所理解的NIO的工作过程,如果存在问题,请批评斧正。概括一下我的理解:

  • SocketChannel:为NIO工作过程中,数据传输的通道,客户端与服务端的每次交互都是通过此通道进行的;

  • Selector(多路复用器):会监控其注册的通道上面的任何事件,获得SelectionKey,事件分为OP_ACCEPT,OP_CONNECT,OP_WRITE,OP_READ(这是SelectionKey的四个属性),OP_ACCEPT应该为服务端接收到客户端连接时的一种状态,我在客户端并没有用到此状态;OP_CONNECT则为客户端已经连接上服务端的一种状态,我在服务端并没有使用这个状态;

  • Buffer:我的应用中,我一直使用ByteBuffer,此类是整个NIO通讯的关键,必须理解才能进行通讯的开发,否则可能产生问题;所有的通讯内容都需要在此类中写入和读出;


如果想做nio相关的应用,那么一些概念上的东西是不可回避的,在这里推荐:http://www.iteye.com/magazines/132-Java-NIO 。

下面三段代码,分别完成了服务的创建、服务对事件的监听以及客户端对事件的监听(不可直接拷贝使用,有一些变量没有声明,如有兴趣,可以去下载我的源码)。

  • 服务的创建

//打开一个serversocket通道,ServerSocketChannel是一个监控是否有新连接进入的通道。
serverSocketChannel = ServerSocketChannel.open();
//将这个serversokect通道设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//绑定serversokect的ip和端口
serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort()));
//打开选择器
selector = Selector.open();
//将此通道注册给选择器selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  • 服务对事件的监听

                //监听事件key
                selector.select(2000);
                //迭代一组事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    //定义一个socket通道
                    SocketChannel socketChannel = null;

                    int count = 0;

                    SelectionKey key = keys.next();
                    //  Logs.info("有网络事件被触发,事件类型为:" + key.interestOps());
                    //删除Iterator中的当前key,避免重复处理
                    keys.remove();
                    if (!key.isValid()) {
                        continue;
                    } else if (key.isAcceptable()) {
                        //从客户端送来的key中获取ServerSocket通道
                        serverSocketChannel = (ServerSocketChannel) key.channel();
                        //接收此ServerSocket通道中的Socket通道,accept是一个阻塞方法,一直到获取到连接才会继续
                        socketChannel = serverSocketChannel.accept();
                        //将此socket通道设置为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //将此通道注册到selector,并等待接收客户端的读入数据
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        allocToken(socketChannel);

                    } else if (key.isReadable()) {

                        //获取事件key中的channel
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
                        //清理缓冲区,便于使用
                        byteBuffer.clear();
                        //将channel中的字节流读入缓冲区
                        count = socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        //处理粘包
                        if (count > 0) {
                            try {
                                handlePacket(socketChannel, byteBuffer);
                            } catch (Exception e) {
                                e.printStackTrace();
//                                continue;//如果当前包存在非法抛出异常,那么不再进行处理直接跳出循环,处理下一个包;此处存疑,测试阶段暂时注释
                            }
                        } else if (count == 0) {
                            continue;
                        } else {
                            socketChannel.close();

                        }

                    } else if (key.isWritable()) {
                        ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
                    }
                }


  • 客户端对事件的监听

            while (true) {
                try {

                    selector.select(3000);

                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    for (int i = 0; keys.hasNext(); i++) {

                        SelectionKey key = keys.next();
                        keys.remove();
                        if (key.isConnectable()) {
                            socketChannel = (SocketChannel) key.channel();
                            if (socketChannel.isConnectionPending()) {
                                if (socketChannel.finishConnect()){
                                    Client.IS_CONNECT =true;
                                    logger.info("-------成功连接服务端!-------");
                                }

                            }
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isReadable()) {
                            //获取事件key中的channel
                            socketChannel = (SocketChannel) key.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK);
                            //清理缓冲区,便于使用
                            byteBuffer.clear();
                            //将channel中的字节流读入缓冲区
                            String readStr = "";
                            int count = socketChannel.read(byteBuffer);
                            //务必要把buffer的position重置为0
                            byteBuffer.flip();

                            handlePacket(byteBuffer, count);
//                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isWritable()) {
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }

                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    continue;
                }

            }

三、我要做的是个啥?

根据我个人对NIO的理解,我的初步想法是要实现一个这样的东西,如图:

但在我的不断深入开发中,发现上面的图中很多不成熟的内容,作为一个完整的消息通讯的服务,必须包含如下的内容:

1、对接入连接的管理;

2、对连接身份的确认;

3、对异常关闭连接的回收;

4、根据身份对消息的转发;

5、链路的维持;

6、自动重连;

7、消息的异步处理;

8、消息的响应机制;

9、粘包和断包的处理;

9、配置体系;

10、通讯层与业务层的分离;

………………

网上很多的NIO实例都是可以运行的,但并不能满足我的工作需要,以上的那些肯定还有没有考虑全的东西,随着我一点点的开发会逐渐的浮出水面。

在未来的文章中,我会逐步把我自己制定的通讯协议,各个模块的结构,以及代码贴出来,希望大家能够互相学习,互相帮助。(待续)


© 著作权归作者所有

共有 人打赏支持
皮鞋铮亮
粉丝 36
博文 12
码字总数 11603
作品 0
沈阳
加载中

评论(16)

皮鞋铮亮
皮鞋铮亮

引用来自“猫哥-u”的评论

Server :
else if (key.isWritable()) {
socketChannel.register(selector, SelectionKey.OP_READ);
}
不做判断是否已经register OP_READ ,这样很容易导致Selector失效 CPU 100%,正确的做法可以intrestOps置成OP_READ同时要对selector wakeUp
另外ByteBuffer要池化复用不能频繁的alloc,收包放在线程池完成,主事件线程只做事件派发

在吗?我想问您一点儿问题
疯狂的骑士
疯狂的骑士
如果做业务建议用netty,netty已经把连接,线程池管理好了 ,只需要关注业务。如果想提高技术,netty源码是个不错的选择
皮鞋铮亮
皮鞋铮亮

引用来自“猫哥-u”的评论

Server :
else if (key.isWritable()) {
socketChannel.register(selector, SelectionKey.OP_READ);
}
不做判断是否已经register OP_READ ,这样很容易导致Selector失效 CPU 100%,正确的做法可以intrestOps置成OP_READ同时要对selector wakeUp
另外ByteBuffer要池化复用不能频繁的alloc,收包放在线程池完成,主事件线程只做事件派发
谢谢您的指导。我在没有注册为OP_READ的时候会有死循环。后面您说的我不大懂,能具体一点儿吗?
猫哥-u
猫哥-u
Server :
else if (key.isWritable()) {
socketChannel.register(selector, SelectionKey.OP_READ);
}
不做判断是否已经register OP_READ ,这样很容易导致Selector失效 CPU 100%,正确的做法可以intrestOps置成OP_READ同时要对selector wakeUp
另外ByteBuffer要池化复用不能频繁的alloc,收包放在线程池完成,主事件线程只做事件派发
许雷神
许雷神
好文
純白陰影
純白陰影
mark
小白天宇
小白天宇
楼主,粉字很6啊
皮鞋铮亮
皮鞋铮亮

引用来自“Shmiluyuu”的评论

看了一下楼主的源码,楼主是多路复用一个selector,如果开多个selector线程会不会好一些,现在都是多核了.线程切换上下文的开销已经相对很小了

引用来自“皮鞋铮亮”的评论

我对此进行了一下测试,开14个客户端每隔5毫秒发送一个报文,发送10万次,14个客户端均能得到服务端的响应。服务端cpu利用率在10%左右和内存20M左右。您说的想法是一个非常好的想法,但是,我不会。。呵呵,敬请指导。如果开多个selector,会不会有同步的问题,我其实也想过,但是没有想明白。希望得到您的指导,不胜感激。

引用来自“Shmiluyuu”的评论

楼主太谦虚了,我也是只是理论上认为会好一些,没做过测试.我早前看过znet的源码,他是使用了一个selector线程池,可以参考下
哪儿有呀,我去瞧瞧。
Shmiluyuu
Shmiluyuu

引用来自“Shmiluyuu”的评论

看了一下楼主的源码,楼主是多路复用一个selector,如果开多个selector线程会不会好一些,现在都是多核了.线程切换上下文的开销已经相对很小了

引用来自“皮鞋铮亮”的评论

我对此进行了一下测试,开14个客户端每隔5毫秒发送一个报文,发送10万次,14个客户端均能得到服务端的响应。服务端cpu利用率在10%左右和内存20M左右。您说的想法是一个非常好的想法,但是,我不会。。呵呵,敬请指导。如果开多个selector,会不会有同步的问题,我其实也想过,但是没有想明白。希望得到您的指导,不胜感激。
楼主太谦虚了,我也是只是理论上认为会好一些,没做过测试.我早前看过znet的源码,他是使用了一个selector线程池,可以参考下
皮鞋铮亮
皮鞋铮亮

引用来自“Shmiluyuu”的评论

看了一下楼主的源码,楼主是多路复用一个selector,如果开多个selector线程会不会好一些,现在都是多核了.线程切换上下文的开销已经相对很小了
我对此进行了一下测试,开14个客户端每隔5毫秒发送一个报文,发送10万次,14个客户端均能得到服务端的响应。服务端cpu利用率在10%左右和内存20M左右。您说的想法是一个非常好的想法,但是,我不会。。呵呵,敬请指导。如果开多个selector,会不会有同步的问题,我其实也想过,但是没有想明白。希望得到您的指导,不胜感激。
郑大侠/jetty

##Ketty 基于netty实现的服务端Nio MVC业务开发平台,提供性能监控,日志分析,动态扩展的功能。 ###ketty-srv模块 基于netty实现支持自定义协议扩展的Nio MVC高性能业务框架 ####协议 Http...

郑大侠 ⋅ 2015/11/02 ⋅ 0

基于netty实现的服务端Nio MVC业务开发--ketty

Ketty 基于 netty 实现的服务端 Nio MVC 业务开发平台,提供性能监控,日志分析,动态扩展的功能。 ketty-srv模块 基于netty实现支持自定义协议扩展的Nio MVC高性能业务框架 协议 Http Ketty...

郑大侠 ⋅ 2015/11/09 ⋅ 0

NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战

前言 本文将演示一个iOS客户端程序,通过UDP协议与两个典型的NIO框架服务端,实现跨平台双向通信的完整Demo。服务端将分别用MINA2和Netty4进行实现,而通信时服务端你只需选其一就行了。同时...

JackJiang- ⋅ 2016/06/28 ⋅ 0

NIO框架入门(二):服务端基于MINA2的UDP双向通信Demo演示

前言 NIO框架的流行,使得开发大并发、高性能的互联网服务端成为可能。这其中最流行的无非就是MINA和Netty了,MINA目前的主要版本是MINA2、而Netty的主要版本是Netty3和Netty4(Netty5已经被...

JackJiang- ⋅ 2016/06/24 ⋅ 0

Java分布式应用简介

大型应用通常会拆分为多个子系统,对于java来说,这些子系统可能部署在同一台机器上的多个不同的JVM中,也可能部署在不同的 电脑上,但这些子系统有不是完全独立的,要相互通信来实现业务功能...

ksfzhaohui ⋅ 2013/02/19 ⋅ 0

SLG手游Java服务器的设计与开发——网络通信

前言 上文分析了我们这款SLG的架构,本章着重讲解我们的网络通信架构,由上文的功能分析我们可以得知,游戏的所有功能基本上属于非及时的通信机制,所以依靠HTTP短连接就能够基本满足游戏的通...

umgsai ⋅ 2016/09/14 ⋅ 0

SLG手游Java服务器的设计与开发——网络通信

前言 上文分析了我们这款SLG的架构,本章着重讲解我们的网络通信架构,由上文的功能分析我们可以得知,游戏的所有功能基本上属于非及时的通信机制,所以依靠HTTP短连接就能够基本满足游戏的通...

umgsai ⋅ 2016/09/14 ⋅ 0

SLG手游Java服务器的设计与开发——网络通信

前言 上文分析了我们这款SLG的架构,本章着重讲解我们的网络通信架构,由上文的功能分析我们可以得知,游戏的所有功能基本上属于非及时的通信机制,所以依靠HTTP短连接就能够基本满足游戏的通...

umgsai ⋅ 2016/09/14 ⋅ 0

基于Mina实现的一个简单数据采集中间件

一、前言 该数据据采集中间件需要实现与多个终端的长连接,并定时给所有终端发送指令,终端在接收到相关指令后,返回相关信息给中间件。中间件需要一直监测所有终端的在线状态,并一直监听、...

ytangdigl ⋅ 2017/09/23 ⋅ 0

Android与MINA2、Netty4的跨平台UDP双向通信实战

概述 本文演示的是一个Android客户端程序,通过UDP协议与两个典型的NIO框架服务端,实现跨平台双向通信的完整Demo。 当前由于NIO框架的流行,使得开发大并发、高性能的互联网服务端成为可能。...

JackJiang- ⋅ 2016/06/30 ⋅ 1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

CENTOS7防火墙命令记录

安装Firewall命令: yum install firewalld firewalld-config Firewall开启常见端口命令: firewall-cmd --zone=public --add-port=80/tcp --permanent firewall-cmd --zone=public --add-po......

cavion ⋅ 31分钟前 ⋅ 0

【C++】【STL】利用chromo来测量程序运行时间与日志时间打印精确到微秒

直接上代码吧,没啥好说的。头疼。 #include <iostream>#include <string>#include <ctime>#include <sstream>#include <iomanip>#include <thread>#include <chrono>using ......

muqiusangyang ⋅ 34分钟前 ⋅ 0

Mac环境下svn的使用

在Windows环境中,我们一般使用TortoiseSVN来搭建svn环境。在Mac环境下,由于Mac自带了svn的服务器端和客户端功能,所以我们可以在不装任何第三方软件的前提下使用svn功能,不过还需做一下简...

故久呵呵 ⋅ 43分钟前 ⋅ 0

破解公司回应苹果“USB限制模式”:已攻破

本周四,苹果发表声明称 iOS 中加入了一项名为“USB 限制模式”的功能,可以防止 iPhone 在连接其他设备的时候被破解,并且强调这一功能并不是针对 FBI 等执法部门,为的是保护用户数据安全。...

六库科技 ⋅ 45分钟前 ⋅ 0

MyBtais整合Spring Boot整合,TypeHandler对枚举类(enum)处理

概要 问题描述 我想用枚举类来表示用户当前状态,枚举类由 code 和 msg 组成,但我只想把 code 保存到数据库,查询处理,能知道用户当前状态,这应该怎么做呢?在 Spring 整合MyBatis 的时候...

Wenyi_Feng ⋅ 今天 ⋅ 0

synchronized与Lock的区别

# <center>王梦龙的读书笔记第一篇</center> ## <center>-synchronized与Lock的区别</centre> ###一、从使用场景来说 + synchronized 是能够注释代码块、类、方法但是它的加锁是和解锁使用一......

我不想加班 ⋅ 今天 ⋅ 0

VConsole的使用

手机端控制台打印输出,方便bug的排查。 首先需要引入vconsole.min.js 文件,然后在文件中创造实例。就能直接使用了。 var vConsole = new VConsole(); vConsole的文件地址...

大美琴 ⋅ 今天 ⋅ 0

Java NIO之字符集

1 字符集和编解码的概念 首先,解释一下什么是字符集。顾名思义,就是字符的集合。它的初衷是把现实世界的符号映射为计算机可以理解的字节。比如我创造一个字符集,叫做sex字符集,就包含两个...

士别三日 ⋅ 今天 ⋅ 0

Spring Bean基础

1、Bean之间引用 <!--如果Bean配置在同一个XML文件中,使用local引用--><ref bean="someBean"/><!--如果Bean配置在不同的XML文件中,使用ref引用--><ref local="someBean"/> 其实两种......

霍淇滨 ⋅ 今天 ⋅ 0

05、基于Consul+Upsync+Nginx实现动态负载均衡

1、Consul环境搭建 下载consul_0.7.5_linux_amd64.zip到/usr/local/src目录 cd /usr/local/srcwget https://releases.hashicorp.com/consul/0.7.5/consul_0.7.5_linux_amd64.zip 解压consu......

北岩 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部