netty的一些基本概念

原创
2017/05/14 13:56
阅读数 777

netty扫盲

本文目的是在于对netty有一个大致印象,便于深入了解netty前有一些基本概念

简介

  1. NIO通信框架
  2. 单点支持的连接数与机器内存有关(1G内存10W+左右),不在受制于系统的最大句柄数(65536个)
  3. API接口简单(相比于JDK的NIO)
  4. 事件驱动
  5. 常用用于游戏、金融等需要长链接,实时性要求高的业务

NIO中的数据流程

sequenceDiagram
Channel->>Buffer: 读取 
Buffer->>程序:读取
程序->>Buffer:写入
Buffer->>Channel: 写入

  1. 在NIO中,所有的数据都是从Channel中读取并缓存到Buffer中,用户自己的代码再从Buffer中读取
  2. 要想写数据,则必须先写入Buffer中,然后在把Buffer写入到Channnel中。

netty中几个重要的对象

  1. Channel
  2. ByteBuf
  3. ChannelHandler
  4. ChannelHandlerContext
  5. ChannelPipeline

Channel

Channel 一个连接的抽象。所有的读写数据,最终都是通过Channel流通的。主要有NioSocketChannel、NioServerSocketChannel


ByteBuf

netty自己实现的一个缓存Buf,相比于JDK的ByteBuffer有一下优点

  1. 长度可以动态拓展(在写入的时候,判断到capacity不够,会重新开辟一段大容量的buf,然后把之前buf中的数据拷贝到新的buf中)
  2. 操作简单。JDK的ByteBuffer只有一个位置指针position。netty的ByteBuf则有readIndex(度位置)、writeIndex(写位置)。在对buf操作时候,不需要flip()和rewind();操作简单。

ChannelHandler

ChannelHandler 从Channel中读取到数据后,就需要把数据交给ChannelHandler进行处理


ChannelHandlerContext

记录当前ChannelHandler的环境上下文,大致有以下信息。 每一个ChannelHandler都会有一个ChannelHandlerContext与之对应(一对一关系)

  1. 记录当前ChannelHandler对象
  2. 标识是inbound还是outbound
  3. 所属的ChannelPipeline
  4. 前一个ChannelHandlerContext、后一个ChannelHandlerContext。 这样形成一个处理链,类似于SpringMVC中的拦截器链。

ChannelPipeline

netty是事件驱动的。在获取到不同的事件后(数据),会做不同的业务逻辑处理,这时候有的可能需要多个Handler协作完成,有的Handler可能对当前的事件不做关心,有的可能处理完了,不想要后面的Handler处理了。<br/> 这时候如何对事件进行传播处理,这时候就需要用到ChannelPipeline了。ChannelPipeline中保存了头部的ChannelHandlerContext(进来的类型事件会从头开始)和尾部的ChannelHandlerContext(出去的类型事件会从尾部开始),他们是一个串行链接。

ChannelHandler、ChannelHandlerContext、ChannelPipeline的关系

image image image

Channel和Channel之间都是通过ChannelHandlerContext联系起来的。

具体内容这篇博客讲的很详细


netty的数据流程图

netty的数据流图

netty官方demo

server端


package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        try {
            // Do something with msg
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

client端

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部