Netty-登堂篇

原创
2017/10/02 07:59
阅读数 340

简介:

        Netty是一款高效的基于reactor线程模型、异步非阻塞事件驱动的网络编程框架,主要用于网络编程,适用于中间件开发组建、传统应用通讯开发。

       Reactor线程模型:线程模型线程模型中有两种角色,一个是主处理线程(组),一个是工作线程(组),在netty中主处理线程和工作线程都由线程组来维护,主处理线程主要用于处理网络事件中的Accept事件,工作线程主要用来处理主线程下发下来的网络链接处理处理任务,如对channel中数据的读写等。

使用Netty开发网络应用程序的优势:

    (1)采用java NIO原生开发网络应用,成本相对较大,需要熟练掌握NIO编程中各组件的正确使用(如:多路复用器、channel、缓存机制等)、架构设计上线程模型的选择及实现、编解码技术、TCP半包处理等等。

    (2)Netty已解决或规避原生java NIO的各种问题或bug,比较醒目的就是多路复用的空轮询。

    (3)Netty趋于成熟,已成为很多大型底层中间件网络通讯组件(如:Hadoop的RPC框架avro)。

   (4)易用性,netty已尽可能地对外屏蔽了底层网络实现的具体细节,已相对友好的方式对外提供API(具体为ServerBootstrap、Bootstrap、自实现编解码抽象类、自定义Handler等)。

开发案例:

        Client发出查询时间的指令,Server端接收指令并简单判断后,将当前时间写入channel。

maven pom.xml中引入:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.0.Final</version>
</dependency>

辅助类:

package com.best.diamond.netty.time;

/**
 * Created by hengluwen on 17/10/1.
 */
public class NettyConstants {

    public static final String BAD_ORDER = "BAD ORDER";

    public static final String QUERY_TIME_ORDER = "QUERY TIME ORDER";

}

Netty Server端:

package com.best.diamond.netty.time;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;


/**
 * Created by hengluwen on 17/10/1.
 */
public class TimeServer {

    private static final StringDecoder DECODER = new StringDecoder(CharsetUtil.UTF_8);

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                            //服务端的ServerBootstrap中增加了一个方法childHandler,它的目的是添加handler(包括用户自定义的handler),用来监听已经连接的客户端的Channel的动作和状态。
                    .childHandler(new ChildChannelHandler());
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = b.bind(port).sync();

            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            //以下两行代码为了解决半包读问题
            pipeline.addLast("framer", new LineBasedFrameDecoder(1024));
            pipeline.addLast("decoder", DECODER);
            pipeline.addLast(new TimeServerHandler());
        }
    }


    public static void main(String[] args) throws Exception {
        new TimeServer().bind(8080);
    }
}

       服务端代码中首先创建了两个NioEventLoopGroup实例,NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,实际上它们就是Reactor线程组,创建两个的原因是:一个用于服务端接收客户端的链接,一个用于进行SocketChannel的读写。设置创建Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类,然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024(该参数表示未完成TCP三次握手的链接数与完成TCP三次握手的连接数的总和,这个参数会限制client的总链接数),ServerBootstrap是netty服务端启动辅助类,目的为降低服务端开发的复杂度。绑定I/O事件的处理类ChailChannelHandler,它的作用类似于Reactor模式中的Handler类,主要用于处理网络I/O事件。

package com.best.diamond.netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Date;

/**
 * Created by hengluwen on 17/10/1.
 */
public class TimeServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
        System.out.println("The time server receive order : " + request);
        String currentTime = NettyConstants.QUERY_TIME_ORDER.equalsIgnoreCase(request) ? new Date().toString() : NettyConstants.BAD_ORDER;
        sendMessage(ctx, currentTime);
    }

    private void sendMessage(ChannelHandlerContext ctx, String data) {
        ByteBuf byteBuf = Unpooled.copiedBuffer(data.getBytes());
        ctx.writeAndFlush(byteBuf);
    }
}

       当接收到client发送的数据时,由于在启动时已经加入了String类型的消息解码器(StringDecoder,此消息解码器由Netty提供,不需要自定义),因此用户自定义的Handler中接收到的request的数据类型为String,不必再次进行手工解码。

Netty Client端:

package com.best.diamond.netty.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * Created by hengluwen on 17/10/1.
 */
public class TimeClient {

    public void connect(String host, int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            ChannelFuture channelFuture = b.connect(new InetSocketAddress(host, port)).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        try {
            new TimeClient().connect("127.0.0.1", 8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

client端的启动代码与server类似,由于client更多的是要处理网络事件的I/O,且正常情况下client只会处理一个或两个channel(channel失效重连时),因此只需要一个线程组即可。

package com.best.diamond.netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Created by hengluwen on 17/10/1.
 */
public class TimeClientHandler extends SimpleChannelInboundHandler<String> {

    private final ByteBuf firstMessage;

    public TimeClientHandler() {
        firstMessage = Unpooled.buffer().writeBytes(NettyConstants.QUERY_TIME_ORDER.getBytes());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    System.out.println("error" + future.cause());
                }
            }
        });
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String request) throws Exception {
        System.out.println("Now is : " + request);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

channel激活时,发出指定,等待服务端的相应。

总结:

    (1)netty是一个高效的异步非阻塞的网络通讯框架,开发简单。

    (2)netty易用性强,提供了大量的编码器/解码器,已经处理TCP半包问题的辅助类。

    (3)netty的api设计的强大而友好。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部