文档章节

Netty源码分析 服务器端1

hyssop
 hyssop
发布于 2015/11/25 09:53
字数 1698
阅读 82
收藏 3


Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

做过NIO开发的人都遇到很多难以解决的问题,比如半、粘包,超时,安全性、性能等等,Netty成功的解决了原始NIO开发过程中遇到的各种问题。那么Netty的内部框架是如何解决这些问题呢?本文初步介绍Netty服务器端开发源码,并且从源码分析的角度来分析下各个步骤都做了什么。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * @author lilinfeng
 * @date 2014年2月14日
 * @version 1.0
 */
public class TimeServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
   EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
       ServerBootstrap b = new ServerBootstrap();
       b.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
   } finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
       workerGroup.shutdownGracefully();
   }
    }
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
       arg0.pipeline().addLast(new TimeServerHandler());
   }
    }
/**
     * @param args
* @throws Exception
     */
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
      port = Integer.valueOf(args[0]);
       } catch (NumberFormatException e) {
// 采用默认值
}
   }
new TimeServer().bind(port);
    }
}
Netty框架的最大特点就是写起来非常的简单,这也是他牛叉的原因。
第一个方法就是一个简单的对象实例化:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();


NioEventLoopGroup是Nio线程组。Netty是基于Nio线程池模型实现的。Netty服务器端生成了两个线程组。第一个bossGroup用于接收连接的线程组,第二个是用于处理连接请求的线程组,包括读、写、业务逻辑等。
第二个方法是启动类实例化。看到boot我觉得window启动是不是也用到这个词儿了,呲牙!

ServerBootstrap b = new ServerBootstrap();
第三个方法就是将新生成的两个线程组组成一组。说白了就是将其赋值给了group、childGroup两个成员变量中。

b.group(bossGroup, workerGroup)
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
    }
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
    }
this.childGroup = childGroup;
return this;
}

第四步:指定通道类型,服务器端的通道类型是NioServerSocketChannel  .channel(NioServerSocketChannel.class)

 public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
   }
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));

}

这块实例化了一个反射工厂,后面会使用这个工厂去实例化channel

第五步:设置TCP的配置项。
       .option(ChannelOption.SO_BACKLOG, 1024)
源码来看将参数放置到了AbstractBootstrap中的变量options中去了。
第六步:添加事件处理句柄。
         .childHandler(new ChildChannelHandler());*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
   }
this.childHandler = childHandler;
return this;

}

到此,我们也没有发现Netty的神奇之处,其实Netty的重头戏只简简单单的一行代码:

// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();我们来翻翻源码看看这句话有多神奇.*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}第一层我们看出他最终返回了一个ChannelFuture ,异步调用的神器。他是netty异步调用接口,实现了java的异步调用类java.util.concurrent.Future<V>
第二层:public ChannelFuture bind(SocketAddress localAddress) {
//还记得刚刚的初始化么,如果有异常这个方法会抛出    validate();
//检查ip地址是否配置
if (localAddress == null) {
throw new NullPointerException("localAddress");
   }
return doBind(localAddress);

}

第三层:

private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和注册渠道  (1)final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
   }

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
   } else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
       regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
               Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                   // IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
               } else {
// Registration was successful, so set the correct executor to use.
                   // See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
               }
doBind0(regFuture, channel, localAddress, promise);
           }
       });
return promise;
   }

}

第三层开始就到了关键的步骤了:初始化和注册通道final ChannelFuture initAndRegister() {

//还记得刚才的渠道工厂么,到这里使用了这个工厂数理化了一个渠道。(工厂模式)

final Channel channel = channelFactory().newChannel();
try {//初始化渠道
       init(channel);
   } catch (Throwable t) {
       channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
   }
//注册channel 注册到了parent线程组中了
   ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
           channel.close();
       } else {
           channel.unsafe().closeForcibly();
       }
   }
工厂模式使用了反射的方法去实例化了一个channel
@Override
public T newChannel() {
try {
return clazz.newInstance();
   } catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
   }

}

初始化渠道代码如下:void init(Channel channel) throws Exception {

//获取你编码设置的option配置参数
final Map<ChannelOption<?>, Object> options = options();

synchronized (options) {  

  //将配置参数设置到了渠道参数中 SctpServerChannelConfig config;

       channel.config().setOptions(options);
   }
final Map<AttributeKey<?>, Object> attrs = attrs();

synchronized (attrs) {

//设置channle属性

for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
           AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
           channel.attr(key).set(e.getValue());
       }
   }//

   ChannelPipeline p = channel.pipeline();
if (handler() != null) {

//句柄放到管道      

p.addLast(handler());

   }

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
       currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
   }
synchronized (childAttrs) {
       currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
   }

   p.addLast(new ChannelInitializer<Channel>() {
@Override

public void initChannel(Channel ch) throws Exception {

//将服务端注册的handler放置到pipeline中。    

  ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
       }
   });
}

注册渠道。

ChannelFuture regFuture = group().register(channel);

// AbstractNioChannel

protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
return;
       } catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
               // cached and not removed because no Select.select(..) operation was called yet.
((NioEventLoop) eventLoop().unwrap()).selectNow();
               selected = true;
           } else {
// We forced a select operation on the selector before but the SelectionKey is still cached
               // for whatever reason. JDK bug ?
throw e;
           }
       }
   }
}
注册成功后,执行doBind0(regFuture, channel, localAddress, promise);
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
   // the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {

if (regFuture.isSuccess()) {

//渠道绑定监听事件

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
           } else {
promise.setFailure(regFuture.cause());
           }
       }
   });

}

到此netty服务器端就实现了渠道初始化和注册。如果渠道初始化和注册都成功,接下来的任务就是建立监听,等待客户端的连接请求。

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
   // the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {

if (regFuture.isSuccess()) {

//打开渠道监听事件,并注册了一个关闭渠道的监听事件。

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
           } else {
promise.setFailure(regFuture.cause());
           }
       }
   });
}
参考文献

https://github.com/code4craft/netty-learning

http://netty.io/wiki/index.html


© 著作权归作者所有

hyssop
粉丝 20
博文 102
码字总数 111521
作品 0
昌平
程序员
私信 提问
spark2.1.0之源码分析——RPC管道初始化

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81197447 提示:阅读本文前最好先阅读: 《Spark2.1.0之内置RPC框架》 《spark2.1....

泰山不老生
2018/07/25
0
0
Netty 源码分析(一):引言和 Java NIO 介绍

为什么要介绍 Netty 如今优秀的开源项目非常多,仅在 Java 服务器端开发领域,优秀的开源项目就不胜枚举。比如从十年前就开始流行到现在依旧十分活跃的 Spring Framework,如今已经发展为一个...

编走编想
2015/10/01
564
2
Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)

目录 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO 的前生今世 之三 NIO Buffer 详解 Java NIO 的前生...

永顺
2017/11/29
0
0
源码之下无秘密 ── 做最好的 Netty 源码分析教程

背景 在工作中, 虽然我经常使用到 Netty 库, 但是很多时候对 Netty 的一些概念还是处于知其然, 不知其所以然的状态, 因此就萌生了学习 Netty 源码的想法. 刚开始看源码的时候, 自然是比较痛苦...

永顺
2017/11/29
0
0
spark2.1.0之源码分析——RPC服务器TransportServer

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81062342 提示:阅读本文前最好先阅读: 《Spark2.1.0之内置RPC框架》 《spark2.1....

泰山不老生
2018/07/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

UAVStack功能上新:新增JVM监控分析工具

UAVStack推出的JVM监控分析工具提供基于页面的展现方式,以图形化的方式展示采集到的监控数据;同时提供JVM基本参数获取、内存dump、线程分析、内存分配采样和热点方法分析等功能。 引言 作为...

宜信技术学院
30分钟前
6
0
MySQL的5种时间类型的比较

日期时间类型 占用空间 日期格式 最小值 最大值 零值表示 DATETIME 8 bytes YYYY-MM-DD HH:MM:SS 1000-01-01 00:00:00 9999-12-31 23:59:59 0000-00-00 00:00:00 TIMESTAMP 4 bytes YYYY-MM......

物种起源-达尔文
37分钟前
7
0
云服务OpenAPI的7大挑战,架构师如何应对?

阿里妹导读:API 是模块或者子系统之间交互的接口定义。好的系统架构离不开好的 API 设计,而一个设计不够完善的 API 则注定会导致系统的后续发展和维护非常困难。比较好的API设计样板可以参...

阿里云官方博客
40分钟前
5
0
Rancher + VMware PKS实现全球数百站点的边缘K8S集群管理

Sovereign Systems是一家成立于2007年的技术咨询公司,帮助客户将传统数据中心技术和应用程序转换为更高效的、基于云的技术平台,以更好地应对业务挑战。曾连续3年提名CRN,并且在2012年到2...

RancherLabs
45分钟前
5
0
6、根据坐标,判断该坐标是否在地图区域范围内

最近在写配送区域相关的代码,具体需求如下: 根据腾讯地图划分配送区域,总站下边设多个配送分站,然后将订单中的收货地址将其分配给不同的配送分站。 1、地图区域划分(腾讯地图) 1.1、H...

有一个小阿飞
46分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部