Netty ChannelPipeline

原创
2020/12/19 16:19
阅读数 260

Netty ChannelPipeline

本篇文章基于Netty 4.1.56.Final

链式结构

Netty的每个channel都会拥有一个ChannelPipeline,ChannelPipeline组织编排了channel的入站和出站数据处理逻辑,是Netty处理数据的核心数据结构。所谓入站逻辑就是channel读取数据处理逻辑,出站逻辑就是channel发送数据处理逻辑。Netty的处理逻辑被封装在ChannelHandler里。ChannelHandler有2个子接口,ChannelInboundHandler用来处理入站数据并提供了状态改变相关的回调方法;ChannelOutboundHandler用来处理出站数据并提供了IO处理相关的回调方法。Netty还提供了一些常用的抽象实现类,例如_ByteToMessageDecoder_和_MessageToMessageDecoder_是一种ChannelInboundHandler,用来将从网络读取的字节流解码为java对象并释放相应内存和将一种java对象转换为另一种java对象;_MessageToMessageEncoder_和_MessageToByteEncoder_是一种ChannelOutboundHandler,用来java对象之间的转换和将java对象编码为字节流;实现类ChannelDuplexHandler可以处理2个方向的数据。ChannelPipeline将Netty的处理器ChannelHandler组织成一个双向链表,结构如下图所示: ChannelPipeline结构图

组成链表节点的是AbstractChannelHandlerContext,它不仅封装了ChannelHandler还提供了出入站相关方法。AbstractChannelHandlerContext有2个特殊的实现类TailContextHeadContext,分别代表了双向链表的最后一个节点和第一个节点。 TailContext还实现了ChannelInboundHandler因此它是ChannelPipline中最后一个入站处理逻辑,其处理逻辑就是帮助我们释放掉在ChannelPipeline中传播的数据。HeadContext实现了ChannelInboundHandler和ChannelOutboundHandler,因此它是第一个入站逻辑和最后一个出站逻辑,它不仅负责发起读取,还将读取事件向后传播,同时他负责将数据写入到网络。

调用传递

调用是如何在节点之间传递的呢?在AbstractChannelHandlerContext中为ChannelInboundHandler和ChannelOutboundHandler提供了相应的方法来触发下一个节点的调用。例如AbstractChannelHandlerContext为ChannelInboundHandler中的回调方法提供了相应的fire方法。例如在fireChannelRead方法中,会按入站顺序找到下一个ChannelInboundHandler节点并调用它的channelRead方法,因此要想触发后续节点的调用,只需要在我们编写的代码里调用AbstractChannelHandlerContext提供的相应方法即可。Netty为我们提供了ChannelOutboundHandlerAdapter和ChannelInboundHandlerAdapter,为每个方法提供了基础实现,实现逻辑就是想调用传递到下一个节点,因此我们编写netty处理器的时候只需要继承适配器类,在需要传递调用的时候调用父类相同方法即可。那context是如何找到下一个context的呢?context通过findContextInbound和findContextOutbound方法分别查找下一个节点,例如fireChannelRead方法调用findContextInbound查找下一个节点并且传入一个MASK_CHANNEL_READ。mask的定义在ChannelHandlerMask里,它是将1左移来表示ChannelInboundHandler和ChannelOutboundHandler中不同的方法的,即一个int的二进制上0和1来表示不同的方法的。

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    传入一个mask
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.next;    InBound事件向后查找
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}
private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.prev;    OutBound事件向前查找
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
    return ctx;
}

那skipContext的作用是什么呢?原来这里的下一个节点并不是下一个ChannelInboundHandler或者ChannelOutboundHandler,而是下一个重写了相同方法的handler。拿fireChannelRead来说,下一个节点是下一个重写了channelRead方法的ChannelInboundHandlerAdapter。这要说说@Skip的作用了,在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter的所有默认实现都加了@Skip,我们编写自己的handler时会重写相关方法,这样重写的方法就没有@Skip注解了。在将ChannelHandler加入到pipeline,创建AbstractChannelHandlerContext的时候,会调用mask方法来计算该handler的executionMask,就是找出hander中没有@Skip的方法并用二进制位来表示。skipContext就是利用executionMask和传入的mask来找到下一个节点的。

链表的创建

Netty支持NIO,Epoll,kqueue和oio等SocketChannel,每种SocketChannel都有相应的ServerSocketChannel,我们仅以NioSocketChannelNioServerSocketChannel来说明。我们使用Bootstrap来创建NioSocketChannel并connect到一个服务端,我们会编写一个ChannelInitializer来向创建的NioSocketChannel的ChannelPipeline中添加ChannelHandler,在connect方法中会调用initAndRegister方法来创建NioSocketChannel并初始化,在初始化操作中会将ChannelInitializer添加到ChannelPipeline中。代码如下:

void init(Channel channel) {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());    将ChannelInitializer加入ChannelPipeline
省略


同理,我们使用ServerBootstrap来创建NioServerSocketChannel并bind到监听端口,我们也会编写一个ChannelInitializer来向NioServerSocketChannel创建的channel添加处理器,这一点与客户端不同,创建服务端编写的ChannelInitializer不是用来添加NioServerSocketChannel的处理器的。在bind方法中会调用initAndRegister方法来创建NioServerSocketChannel并初始化,在初始化操作中会向NioServerSocketChannel的ChannelPipeline中插入ServerBootstrapAcceptor,这个才是NioServerSocketChannel的处理器,这个处理器会将编写的ChannelInitializer加入到创建的新channel里并会将新channel注册到EventLoop上。

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);   将ChannelInitializer添加到新channel的ChannelPipeline中

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        将新channel绑定到EventLoop上
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}


到此为止,ChannelInitializer被插入到了Channel的ChannelPipeline上了,那它是如何构建我们的处理链的呢?ChannelInitializer会在channel被注册到EventLoop中时,也就是channelRegistered方法中,调用initChannel方法执行我们编写的添加处理器的逻辑来向ChannelPipeline插入我们自己的处理器,最后将ChannelInitializer自己从ChannelPipeline中删除。代码如下

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());         执行我们编写的添加处理器逻辑
        } catch (Throwable cause) {
            省略注释
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {   将ChannelInitializer自己从ChannelPipeline中删除
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}


总结一下创建流程: NioSocketChannel的pipeline构建流程大致如下: 客户端Pipeline构建流程

NioServerSocketChannel的pipeline构建流程大致如下: 服务端Pipeline构建流程 

除此外,NioServerSocketChannel会为建立的连接新建一个Channel并且ServerBootstrapAcceptor会将我们编写的ChannelInitializer插入到新建Channel的ChannelPipeline中,之后将新建Channel注册到一个EventLoop上,注册时会触发ChannelInitializer的执行逻辑,构建新建Channel的处理链。

展开阅读全文
加载中
点击加入讨论🔥(1) 发布并加入讨论🔥
打赏
1 评论
0 收藏
0
分享
返回顶部
顶部