文档章节

Netty 4 常用 handler 分析

o
 ovirtKg
发布于 2016/11/29 16:31
字数 1823
阅读 169
收藏 0

这里我们就要分析下netty4 中 hanlder从注册到执行用户消息事件的流程。

handler的注册

在server端或是client端,都需要注册handler,才能使用。通过如下方式为channel设置相应的handler。

channel.pipeline().addLast(new RpcDecoder(MomResponse.class));

或者
 ChannelPipeline cp = socketChannel.pipeline();
 cp.addLast(new RpcEncoder(RpcRequest.class));等

 一般是要通过channel获取pipeline,因为Channel的构造函数中会 new DefaultChannelPipeline(this);

而这个pipeline内部又维护了一个 双向链表,

 public DefaultChannelPipeline(AbstractChannel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }

而在addLast的过程中,如下 ,会将handler 加到内部链表的尾部。但是在add之前 ,会将其封装到一个DefaultChannelHandlerContext中,而这个context就 做为链表中的一个节点。通过链表表实现每个handler的顺序执行。 

//DefaultChannelPipeline
    @Override
    public ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

    @Override
    public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
        synchronized (this) {
            checkDuplicateName(name);

            AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
            addLast0(name, newCtx);
        }

        return this;
    }

    private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
        checkMultiplicity(newCtx);

        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;

        name2ctx.put(name, newCtx);

        callHandlerAdded(newCtx);
    }

handler 的执行  

netty中使用nio  中 selector也就是linux的epoll系统调用,来实现IO的多路复用。

channel监听 SelectionKey.OP_READ | SelectionKey.OP_ACCEPT 事件,而后会调用channel中构造的内部类 nioUafe 的read方法 。unsafe.read();  而后通过channel中pipeline串联整个msg的消息处理。核心是Context中的 fireChannelRead,因为每个handler都封装到一个Context中,通过如下的方法

//DefaultChannelHandlerContext
  @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        final AbstractChannelHandlerContext next = findContextInbound();//找到链表中下一个处理读的handler
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(msg);
        } else {
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    next.invokeChannelRead(msg);
                }
            });
        }
        return this;
    }

    private void invokeChannelRead(Object msg) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg); //执行handler的逻辑。
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

可以完成整个handler链的执行。

而在handler的执行流中,一般在中间的handler中,在执行channelRead的业务逻辑中,会将 context自身传到方法里,并会通过调用该context的 fireChannelRead 将处理后的msg  通过 查找下一个handler(因为context是一个双向链表),找到相应的handler(In or  Out)中的业务逻辑,直到最后一个hanlder不在调用 ctx.fireChannelRead 而整个handler链可以分为几类,对于In 也就是收到消息的处理handlers来说,主要是分隔handler,Decoderhandler,业务逻辑handler。这三大类,是将收到的字节符按照设定的协议,执行完结束。

常用handle 分析 

除了用户定义的业务逻辑的handler之外,netty也为我们提供了很多十分有用的handler。我们下面是以in类型的为主进行介绍,out逻辑。常用的有ByteToMessageDecoder 、SimpleChannelInboundHandler、ChannelInboundHandlerAdapter、SslHandler、DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等,这些handler之间有继承的关系,在使用中我们可以直接用,有些也可以通过 继承 来扩展达到我们的业务功能。从基类开始介绍

1.  ChannelInboundHandlerAdapter  

相对来说比较底层的handler,可以直接继承,通常用在处理handler的register,unregister等事件,最为核心的就是继承  channelRead,通过之前的handler对msg的处理,直接可以转换为java类,执行业务。

2. SimpleChannelInboundHandler

继承自 ChannelInboundHandlerAdapter  ,为我们做了一次转换,将msg先转换为java类,而我们可以通过继承,直接调用channelRead0,参数就是转换好的java类。使用十分简单。前提是前面的handler要先解码。通常放在最后一个handler。

   @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg); // 实现业务逻辑
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

 

3. ByteToMessageDecoder

这个也是比较重要的handler,用户解码的基类handler,从名字也可猜出,其核心将接收的byte转换为用户定义的mssage, 用户需要 实现 decode方法,完成具体的转换。

如下是其channelRead源码

  @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null; //上一次是否有累积的字节
                if (first) {
                    cumulation = data;
                } else {
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
                            || cumulation.refCnt() > 1) {
                        // Expand cumulation (by replace it) when either there is not more room in the buffer
                        // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                        // duplicate().retain().
                        //
                        // See:
                        // - https://github.com/netty/netty/issues/2327
                        // - https://github.com/netty/netty/issues/1764
                        expandCumulation(ctx, data.readableBytes());//剩余的空间不足写下本次的数据,扩充累积区,
                    }
                    cumulation.writeBytes(data);//将本次读取的data附加到上一次剩余的数据中。
                    data.release(); 
                }
                callDecode(ctx, cumulation, out); //解码过程
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release(); //当累积区已经不可读了,释放。
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;

                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));  //执行下一个handler。
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

主要完成两个事件

    1)解码(byte 转成object)

过程相对简单,通过定义了一个 cumulation 的累积缓存区,用以保存本次没有处理完的buf,并等待下一个tcp包的到来。一起传递到decode方法解决执行,如此反复。解决了粘包的问题。不过注意一点,这个handler是非线程安全的,一个channle对应一个该handler。所以通常我们在加入到piepelie中都是重新new的。

而对于转换的逻辑来说,就需要根据逻辑,转换成相应的对象了。通过callDecode,将重新组合后的cumulation,进行解码。将解码后的信息加到out中,该方法会通过循环,每次解码后的out大小与解码前大小是否一致,以此来决定是否结束本次解码过程。因为一次callDecode可能会 携带多个msg。

    2) 下一个handler 

将转换后的信息传递到下一个handler,    通过ctx.fireChannelRead。上面已经分析handler执行链的过程。

4.   DelimiterBasedFrameDecoder

继承自ByteToMessageDecoder,只需完成解码的工作,同样从名字看出,起到分隔的作用,就是将收到的字节以特殊的字符分隔。一般要指定最大长度,以及分隔符。超过最大长度抛异常。

一般用在如下,以换行符结尾,转化为string。

5. FixedLengthFrameDecoder 

同上,解码成固定长度的字节。

 protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readSlice(frameLength).retain();
        }
    }

handler组合

接下来我们给出几组常用的handler组合

 1) 定义协议,decode为java对象,

channel.pipeline().addLast(new RpcDecoder(Response.class));
channel.pipeline().addLast(new RpcEncoder(Request.class));
channel.pipeline().addLast(handle);

这种在Decoder中需要用户去实现协议,最简单的过程如下,信息头部指定有效字节数,先读取头部长度。而后在读取相应长度的字节,反序列化。而复杂的可能设定magic、type、length等。

@Override
	public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		int HEAD_LENGTH=4;
        if (in.readableBytes() < HEAD_LENGTH) {
            return;
        }
        in.markReaderIndex();               
        int dataLength = in.readInt();       
        if (dataLength < 0) { 				 
            ctx.close();
        }
        if (in.readableBytes() < dataLength) { 
            in.resetReaderIndex();
            return;
        }
        byte[] body = new byte[dataLength]; 
        in.readBytes(body);  				      
        Object obj=SerializeUtils.Deserialize(body);//通过定义序列化工具将字节数组转换为指定类的实例
        out.add(obj);
	}

2) 以换行符结尾分隔,转化为string。

 // 以("\n")为结尾分割的 解码器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); 
         // 字符串解码 和 编码
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());

         // 自己的逻辑Handler
pipeline.addLast("handler", new SelfHandler());

 

 

 

 

http://blog.csdn.net/langzi7758521/article/details/52712159

© 著作权归作者所有

共有 人打赏支持
下一篇: 事务与锁
o
粉丝 3
博文 57
码字总数 48798
作品 0
景德镇
私信 提问
Netty版本升级及线程模型详解

作者 李林锋 发布于 2015年2月7日 | 注意:GTLC全球技术领导力峰会,500+CTO技聚重新定义技术领导力!18 讨论 分享到:微博微信FacebookTwitter有道云笔记邮件分享 稍后阅读 我的阅读清单 1....

tantexian
2016/07/06
45
0
netty文章收藏

1.林峰的关于netty文章 不错,遇到问题反复看了几遍总有启发,还说收藏记录一下。 http://www.infoq.com/cn/netty Netty案例集锦之多线程篇(续)作者 李林锋 发布于 2015年11月25日 9 Nett...

GREKI
2016/04/05
604
0
Netty实现原理浅析

Netty是JBoss出品的高效的Java NIO开发框架,关于其使用,可参考我的另一篇文章 netty使用初步。本文将主要分析Netty实现方面的东西,由于精力有限,本人并没有对其源码做了极细致的研 究。如...

JavaGG
2010/07/19
18.6K
18
Netty源码分析系列1:BootStrap的分析

Bootstrap 是 Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化.利用BootStrap我们可以实现创建channel,把channel注册在EventLoop上,发起连接等...

maskwang520
2018/05/13
0
0
Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (一)

目录 源码之下无秘密 ── 做最好的 Netty 源码分析教程 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO...

永顺
2017/11/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

重磅发布:阿里 OpenJDK终于开源啦! 将长期支持版本 Dragonwell

前几天的北京阿里云峰会,阿里巴巴正式宣布对外开源 OpenJDK 长期支持版本 Alibaba Dragonwell。作为 Java 全球管理组织 Java Community Process (JCP) 的最高执行委员会的唯一中国代表,以及...

终端研发部
32分钟前
4
0
十年再出发!阿里云智能总裁张建锋演讲全记录

3月21日,“十年再出发”阿里云峰会于国家会议中心正式召开。阿里云智能总裁张建锋(行癫)携核心管理团队首次悉数亮相,并对阿里云做了全面的战略解读。以下是演讲全文。 张建锋:今天是我们...

阿里云官方博客
32分钟前
2
0
vue实现通告文字向下推动效果

<!--一定保证key的唯一性--><div class='box'><transition-group name="list" tag="div" mode="out-in" ><div v-for="item in items" v-bind:key="item" class="list-item">{{ item }......

Js_Mei
36分钟前
1
0
如何自学编程?学习方法在这里!

如何自学编程?学习方法在这里! 目前工作4年,当初是自学安卓的,资料网上整理的都非常全面了,我想补充的是现在已经不是当初资料匮乏的时代了,免费课程,付费课程,技术博客真的太多了,所...

Mr_zebra
50分钟前
1
0
Cloud 团队:让 TiDB 在云上跳舞 | PingCAP 招聘季

TiDB 是 Cloud Native 的数据库,对于 TiDB 来说,如何用 Cloud 的思想和技术让 TiDB 在云上跳舞,是 Cloud 团队研究的重要课题,本期我司商业产品副总裁刘寅老师将为大家介绍 Cloud 团队,E...

TiDB
51分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部