Netty ChannelPipeline
本篇文章基于Netty 4.1.56.Final
链式结构
Netty的每个channel都会拥有一个ChannelPipeline,ChannelPipeline组织编排了channel的入站和出站数据处理逻辑,是Netty处理数据的核心数据结构。所谓入站逻辑就是channel读取数据处理逻辑,出站逻辑就是channel发送数据处理逻辑。Netty的处理逻辑被封装在ChannelHandler里。ChannelHandler有2个子接口,ChannelInboundHandler用来处理入站数据并提供了状态改变相关的回调方法;ChannelOutboundHandler用来处理出站数据并提供了IO处理相关的回调方法。Netty还提供了一些常用的抽象实现类,例如_ByteToMessageDecoder_和_MessageToMessageDecoder_是一种ChannelInboundHandler,用来将从网络读取的字节流解码为java对象并释放相应内存和将一种java对象转换为另一种java对象;_MessageToMessageEncoder_和_MessageToByteEncoder_是一种ChannelOutboundHandler,用来java对象之间的转换和将java对象编码为字节流;实现类ChannelDuplexHandler可以处理2个方向的数据。ChannelPipeline将Netty的处理器ChannelHandler组织成一个双向链表,结构如下图所示:
组成链表节点的是AbstractChannelHandlerContext,它不仅封装了ChannelHandler还提供了出入站相关方法。AbstractChannelHandlerContext有2个特殊的实现类TailContext和HeadContext,分别代表了双向链表的最后一个节点和第一个节点。 TailContext还实现了ChannelInboundHandler因此它是ChannelPipline中最后一个入站处理逻辑,其处理逻辑就是帮助我们释放掉在ChannelPipeline中传播的数据。HeadContext实现了ChannelInboundHandler和ChannelOutboundHandler,因此它是第一个入站逻辑和最后一个出站逻辑,它不仅负责发起读取,还将读取事件向后传播,同时他负责将数据写入到网络。
调用传递
调用是如何在节点之间传递的呢?在AbstractChannelHandlerContext中为ChannelInboundHandler和ChannelOutboundHandler提供了相应的方法来触发下一个节点的调用。例如AbstractChannelHandlerContext为ChannelInboundHandler中的回调方法提供了相应的fire方法。例如在fireChannelRead方法中,会按入站顺序找到下一个ChannelInboundHandler节点并调用它的channelRead方法,因此要想触发后续节点的调用,只需要在我们编写的代码里调用AbstractChannelHandlerContext提供的相应方法即可。Netty为我们提供了ChannelOutboundHandlerAdapter和ChannelInboundHandlerAdapter,为每个方法提供了基础实现,实现逻辑就是想调用传递到下一个节点,因此我们编写netty处理器的时候只需要继承适配器类,在需要传递调用的时候调用父类相同方法即可。那context是如何找到下一个context的呢?context通过findContextInbound和findContextOutbound方法分别查找下一个节点,例如fireChannelRead方法调用findContextInbound查找下一个节点并且传入一个MASK_CHANNEL_READ。mask的定义在ChannelHandlerMask里,它是将1左移来表示ChannelInboundHandler和ChannelOutboundHandler中不同的方法的,即一个int的二进制上0和1来表示不同的方法的。
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
传入一个mask
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next; InBound事件向后查找
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.prev; OutBound事件向前查找
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
那skipContext的作用是什么呢?原来这里的下一个节点并不是下一个ChannelInboundHandler或者ChannelOutboundHandler,而是下一个重写了相同方法的handler。拿fireChannelRead来说,下一个节点是下一个重写了channelRead方法的ChannelInboundHandlerAdapter。这要说说@Skip的作用了,在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter的所有默认实现都加了@Skip,我们编写自己的handler时会重写相关方法,这样重写的方法就没有@Skip注解了。在将ChannelHandler加入到pipeline,创建AbstractChannelHandlerContext的时候,会调用mask方法来计算该handler的executionMask,就是找出hander中没有@Skip的方法并用二进制位来表示。skipContext就是利用executionMask和传入的mask来找到下一个节点的。
链表的创建
Netty支持NIO,Epoll,kqueue和oio等SocketChannel,每种SocketChannel都有相应的ServerSocketChannel,我们仅以NioSocketChannel和NioServerSocketChannel来说明。我们使用Bootstrap来创建NioSocketChannel并connect到一个服务端,我们会编写一个ChannelInitializer来向创建的NioSocketChannel的ChannelPipeline中添加ChannelHandler,在connect方法中会调用initAndRegister方法来创建NioSocketChannel并初始化,在初始化操作中会将ChannelInitializer添加到ChannelPipeline中。代码如下:
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler()); 将ChannelInitializer加入ChannelPipeline
省略
同理,我们使用ServerBootstrap来创建NioServerSocketChannel并bind到监听端口,我们也会编写一个ChannelInitializer来向NioServerSocketChannel创建的channel添加处理器,这一点与客户端不同,创建服务端编写的ChannelInitializer不是用来添加NioServerSocketChannel的处理器的。在bind方法中会调用initAndRegister方法来创建NioServerSocketChannel并初始化,在初始化操作中会向NioServerSocketChannel的ChannelPipeline中插入ServerBootstrapAcceptor,这个才是NioServerSocketChannel的处理器,这个处理器会将编写的ChannelInitializer加入到创建的新channel里并会将新channel注册到EventLoop上。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler); 将ChannelInitializer添加到新channel的ChannelPipeline中
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
将新channel绑定到EventLoop上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
到此为止,ChannelInitializer被插入到了Channel的ChannelPipeline上了,那它是如何构建我们的处理链的呢?ChannelInitializer会在channel被注册到EventLoop中时,也就是channelRegistered方法中,调用initChannel方法执行我们编写的添加处理器的逻辑来向ChannelPipeline插入我们自己的处理器,最后将ChannelInitializer自己从ChannelPipeline中删除。代码如下
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel()); 执行我们编写的添加处理器逻辑
} catch (Throwable cause) {
省略注释
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) { 将ChannelInitializer自己从ChannelPipeline中删除
pipeline.remove(this);
}
}
return true;
}
return false;
}
总结一下创建流程: NioSocketChannel的pipeline构建流程大致如下:
NioServerSocketChannel的pipeline构建流程大致如下:
除此外,NioServerSocketChannel会为建立的连接新建一个Channel并且ServerBootstrapAcceptor会将我们编写的ChannelInitializer插入到新建Channel的ChannelPipeline中,之后将新建Channel注册到一个EventLoop上,注册时会触发ChannelInitializer的执行逻辑,构建新建Channel的处理链。