netty扫盲
本文目的是在于对netty有一个大致印象,便于深入了解netty前有一些基本概念
简介
- NIO通信框架
- 单点支持的连接数与机器内存有关(1G内存10W+左右),不在受制于系统的最大句柄数(65536个)
- API接口简单(相比于JDK的NIO)
- 事件驱动
- 常用用于游戏、金融等需要长链接,实时性要求高的业务
NIO中的数据流程
sequenceDiagram
Channel->>Buffer: 读取
Buffer->>程序:读取
程序->>Buffer:写入
Buffer->>Channel: 写入
- 在NIO中,所有的数据都是从Channel中读取并缓存到Buffer中,用户自己的代码再从Buffer中读取
- 要想写数据,则必须先写入Buffer中,然后在把Buffer写入到Channnel中。
netty中几个重要的对象
- Channel
- ByteBuf
- ChannelHandler
- ChannelHandlerContext
- ChannelPipeline
Channel
Channel 一个连接的抽象。所有的读写数据,最终都是通过Channel流通的。主要有NioSocketChannel、NioServerSocketChannel
ByteBuf
netty自己实现的一个缓存Buf,相比于JDK的ByteBuffer有一下优点
- 长度可以动态拓展(在写入的时候,判断到capacity不够,会重新开辟一段大容量的buf,然后把之前buf中的数据拷贝到新的buf中)
- 操作简单。JDK的ByteBuffer只有一个位置指针position。netty的ByteBuf则有readIndex(度位置)、writeIndex(写位置)。在对buf操作时候,不需要flip()和rewind();操作简单。
ChannelHandler
ChannelHandler 从Channel中读取到数据后,就需要把数据交给ChannelHandler进行处理
ChannelHandlerContext
记录当前ChannelHandler的环境上下文,大致有以下信息。 每一个ChannelHandler都会有一个ChannelHandlerContext与之对应(一对一关系)
- 记录当前ChannelHandler对象
- 标识是inbound还是outbound
- 所属的ChannelPipeline
- 前一个ChannelHandlerContext、后一个ChannelHandlerContext。 这样形成一个处理链,类似于SpringMVC中的拦截器链。
ChannelPipeline
netty是事件驱动的。在获取到不同的事件后(数据),会做不同的业务逻辑处理,这时候有的可能需要多个Handler协作完成,有的Handler可能对当前的事件不做关心,有的可能处理完了,不想要后面的Handler处理了。<br/> 这时候如何对事件进行传播处理,这时候就需要用到ChannelPipeline了。ChannelPipeline中保存了头部的ChannelHandlerContext(进来的类型事件会从头开始)和尾部的ChannelHandlerContext(出去的类型事件会从尾部开始),他们是一个串行链接。
ChannelHandler、ChannelHandlerContext、ChannelPipeline的关系
![]()
![]()
Channel和Channel之间都是通过ChannelHandlerContext联系起来的。
具体内容这篇博客讲的很详细
netty的数据流程图
netty官方demo
server端
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
client端
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}