Netty常用方法介绍

原创
2018/09/09 13:45
阅读数 199
    Bootstrap or ServerBootstrap
    EventLoop
    EventLoopGroup
    ChannelPipeline
    Channel
    Future or ChannelFuture
    ChannelInitializer
    ChannelHandler

     Bootstrap,一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件。

     Handler,为了支持各种协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。

     ChannelInboundHandler,一个最常用的Handler。这个Handler的作用就是处理接收到数据时的事件,也就是说,我们的业务逻辑一般就是写在这个Handler里面的,ChannelInboundHandler就是用来处理我们的核心业务逻辑。

     ChannelInitializer,当一个链接建立时,我们需要知道怎么来接收或者发送数据,当然,我们有各种各样的Handler实现来处理它,那么ChannelInitializer便是用来配置这些Handler,它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline。

     ChannelPipeline,一个Netty应用基于ChannelPipeline机制,这种机制需要依赖于EventLoop和EventLoopGroup,因为它们三个都和事件或者事件处理相关。

     EventLoops的目的是为Channel处理IO操作,一个EventLoop可以为多个Channel服务。

     EventLoopGroup会包含多个EventLoop。

     Channel代表了一个Socket链接,或者其它和IO操作相关的组件,它和EventLoop一起用来参与IO处理。

     Future,在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发。总之,所有的操作都会返回一个ChannelFuture。
ChannelInboundHandlerAdapter是普通类,而SimpleChannelInboundHandler<T>是抽象类,

----------------------------------------------------------------
继承SimpleChannelInboundHandler的类必须实现channelRead0方法。
----------------------------------------------------------------
ChannelInboundHandlerAdapter 不用实现方法
    //这里是接受数据和回写的
public class EchoServerHandler2 extends ChannelInboundHandlerAdapter {	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf bb=(ByteBuf)msg;
		System.out.println("服务端回传11:"+bb.toString(CharsetUtil.UTF_8));
		ByteBuf heapBuf = Unpooled.buffer(8);
		heapBuf.writeBytes("最后一次11".getBytes());
		ctx.write(heapBuf);
		ctx.fireChannelRead(msg);
	}
	
	//数据处理完成关闭
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
	    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
	}
	//异常处理
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
	     cause.printStackTrace();
	     ctx.close();
	}
}
-------------------------------------------------------------------------

SimpleChannelInboundHandler<T>有一个重要特性,就是消息被读取后,会自动释放资源,常见的IM聊天软件的机制就类似这种
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 服务端 channel
 */
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { 
	
	
	public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

   //添加一个回话通道
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        channels.add(ctx.channel());
    }

    //回话移除
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
        // A closed Channel is automatically removed from ChannelGroup,
        // so there is no need to do "channels.remove(ctx.channel());"
    }
	
	//这里是一个获取数据的并处理的 默认实现的方法
    @Override
	protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
		Channel incoming = ctx.channel();
		for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
            } else {
            	channel.writeAndFlush("[you]" + s + "\n");
            }
        }
	}
  
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
		System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");
	}
	
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
		System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线");
	}
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
    	Channel incoming = ctx.channel();
		System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}
 public void initChannel(SocketChannel ch) throws Exception {
		 ChannelPipeline pipeline = ch.pipeline();
        //这里的ChannelInboundHandlerAdapter按照顺序执行的。
		//ChannelOutboundHandlerAdapter是按照逆序来的
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatServerHandler());
		System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上");
    }
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部