Netty(四)

原创
2016/12/28 18:59
阅读数 382

       我们知道,基于JDK默认的序列化机制可以避免操作底层的字节数组,从而提升了开发效率。也便是java序列化机制的用武之地。

       那么这和编解码有什么关系呢?由于java序列化的目的有两个:一是对象持久化,另一个是网络传输。而编解码技术是贯穿在网络传输之中的。在进行远程跨进程服务调用时,需要把被传输的Java对象编码为字节数组或者ByteBuffer对象。当读取到ByteBuffer对象或字节数组时,需要将其编码为发送时的Java对象。这被称为Java对象编解码技术。

     由于Java序列化的种种缺陷,衍生出了多种编解码技术和框架,其中MessagePack、JBoss Marshalling和Google Protobuf是比较流行的,本文只着重介绍Protobuf的原理和使用。

一、Java序列化的缺点

1  无法跨语言

       这也是Java序列化最致命的问题。对于跨进程的服务调用,当我们需要和异构语言进程交互时,Java序列化就难以胜任。并且经Java序列化后的字节数组,别的语言无法进行反序列化,就严重阻碍了它的应用。

2  码流太大

       看下列代码

public class UserInfo implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	private String userName;
	
	public final String getUserName() {
		return userName;
	}

	public final void setUserName(String userName) {
		this.userName = userName;
	}

	public final int getUserID() {
		return userID;
	}

	public final void setUserID(int userID) {
		this.userID = userID;
	}

	private int userID;
	
	public UserInfo buildUserName(String userName) {
		this.userName = userName;
		return this;
	}
	
	public UserInfo buildUserID(int userID) {
		this.userID = userID;
		return this;
	}

	public byte[] codeC() {
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		byte[] value = this.userName.getBytes();
		buffer.putInt(value.length);
		buffer.put(value);
		buffer.putInt(this.userID);
		buffer.flip();
		value = null;
		byte[] result = new byte[buffer.remaining()];
		buffer.get(result);
		return result;
	}
}

public class TestUserInfo {

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {

		UserInfo info = new UserInfo();
		info.buildUserID(100).buildUserName("Welcome to Netty");
		ByteArrayOutputStream bos = new ByteArrayOutputStream();
		ObjectOutputStream os = new ObjectOutputStream(bos);
		os.writeObject(info);
		os.flush();
		os.close();
		byte[] b = bos.toByteArray();
		System.out.println("The jdk serializable length is : " + b.length);
		bos.close();
		System.out.println("-------------------------------------");
		System.out.println("The byte array serializable length is : " + info.codeC().length);
	}
}

执行TestUserInfo,结果如下:

这表明,采用JDK序列化机制编码后的二进制数组大小竟然是二进制编码的4.75倍!

3  序列化性能太低

       将之前的代码稍作修改,改造成性能测试版本

public byte[] codeC(ByteBuffer buffer) {

    buffer.clear();
    byte[] value = this.userName.getBytes();
    buffer.putInt(value.length);
    buffer.put(value);
    buffer.putInt(this.userID);
    buffer.flip();
    value = null;
    byte[] result = new byte[buffer.remaining()];
    buffer.get(result);
    return result;

}

对UserInfo进行改造,新增以上所示方法,并创建一个性能测试版本的UserInfo测试程序。如下:

public class PerformTestUserInfo {

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		
		UserInfo info = new UserInfo();
		info.buildUserID(100).buildUserName("Welcome to Netty");
		int loop  = 1000000;
		ByteArrayOutputStream bos = null;
		ObjectOutputStream os = null;
		long startTime = System.currentTimeMillis();
		for(int i=0;i<loop;i++) {
			bos = new ByteArrayOutputStream();
			os = new ObjectOutputStream(bos);
			os.writeObject(info);
			os.flush();
			os.close();
			byte[] b = bos.toByteArray();
			bos.close();
		}
		long endTime = System.currentTimeMillis();
		System.out.println("The jdk serializable cost time is : " + (endTime - startTime) + " ms");
		System.out.println("----------------------------------------------");
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		startTime = System.currentTimeMillis();
		for(int i=0;i<loop;i++) {
			byte[] b = info.codeC(buffer);
		}
		endTime = System.currentTimeMillis();
		System.out.println("The byte array serializable cost time is : " + (endTime - startTime) + " ms");
	}
}

结果为:

可见,Java序列化的性能只有二进制编码的51.4%左右。

二、业界主流的编解码框架

Google Protobuf介绍

特点:

1  结构化数据存储格式

2  高效的编解码性能

3  语言无关、平台无关、扩展性好

4  官方支持Java、C++和Python三种语言

Facebook Thrift介绍

Thrift适用于搭建大型数据交换及存储的通用工具,对于大型系统中的内部数据传输,相对于JSON和XML在性能和传输大小上都有明显的优势。

组件:语言系统及IDL编译器,TProtocol,TTransport,TProcessor以及TServer

我们重点关注的是编解码框架,与之对应的就是TProtocol。由于通常使用Thrift的时候都会采取RPC框架的方式。但是,它的TProtocol编解码框架还是可以以类库的方式独立使用。

Thrift支持三种比较典型的编解码方式

1  通用的二进制编解码

2  压缩二进制编解码

3  优化的可选字段压缩编解码

JBoss Marshalling介绍

优点:

1  可插拔的类解析器

2  可插拔的对象替换技术

3  可插拔的预定义类缓存表

4  无须实现java.io.Serializable接口,即可实现Java序列化

5  通过缓存技术提升对象的序列化性能

然而JBoss Marshalling更多是在JBoss内部使用,应用范围有限。

三、Google Protobuf编解码

       这里通过一个简单的例程来学习介绍怎样使用Protobuf对POJO对象进行编解码,然后讲解如何在Netty中对POJO对象进行Protobuf编解码,并在两个进程之间进行通信和数据交换。

       此处略过Protobuf环境搭建过程,直接看通过.proto文件编译后的java代码来进行Protobuf的使用。

测试代码如下:

public class TestSubscribeReqProto {

	private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
		return req.toByteArray();
	}
	
	private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException {
		return SubscribeReqProto.SubscribeReq.parseFrom(body);
	}
	
	private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
		SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
		builder.setSubReqID(1);
		builder.setUserName("Lilinfeng");
		builder.setProductName("Netty Book");
		List<String> address = new ArrayList<>();
		address.add("NanJing YuHuaTai");
		address.add("BeiJing LiuLiChang");
		address.add("ShenZhen HongShuLin");
		builder.setAddress(address.toString());
		return builder.build();
	}
	
	/**
	 * @param args
	 * @throws InvalidProtocolBufferException
	 */
	public static void main(String[] args) throws InvalidProtocolBufferException {
		SubscribeReqProto.SubscribeReq req = createSubscribeReq();
		System.out.println("Before encode : " + req.toString());
		SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
		System.out.println("After decode : " + req.toString());
		System.out.println("Assert equal : --> " + req2.equals(req));
	}
}

运行后,输出结果:

运行结果表明,经过Protobuf编码后,生成的SubscribeReqProto.SubscribeReq与编码前原始的SubscribeReqProto.SubscribeReq等价。下面,使用Netty的Protobuf编解码框架试试看。

服务端代码:

public class SubReqServer {

	public void bind(int port) throws Exception {
		//配置服务端的NIO线程组
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 100)
				.handler(new LoggingHandler(LogLevel.INFO))
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) {
						ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
						ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
						ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
						ch.pipeline().addLast(new ProtobufEncoder());
						ch.pipeline().addLast(new SubReqServerHandler());
					}
				});
			
			//绑定端口,同步等待成功
			ChannelFuture f = b.bind(port).sync();
			
			//等待服务端监听端口关闭
			f.channel().closeFuture().sync();
		} finally {
			//优雅退出,释放线程池资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	
	public static void main(String[] args) throws Exception {
		int port = 8080;
		if(args != null && args.length > 0) {
			try {
				port = Integer.valueOf(args[0]);
			} catch (NumberFormatException e) {
				
			}
		}
		new SubReqServer().bind(port);
	}
}

@Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter {

	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		SubscribeReqProto.SubscribeReq req = (SubscribeReq) msg;
		if("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
			System.out.println("Service accept client subscribe req : [" + req + "]");
			ctx.writeAndFlush(resp(req.getSubReqID()));
		}
	}
	
	private SubscribeRespProto.SubscribeResp resp(int subReqID) {
		SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
		builder.setSubReqID(subReqID);
		builder.setRespCode(0);
		builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
		return builder.build();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

       由于使用了ProtobufEncoder,所以不需要对SubscribeRespProto.SubscribeResp进行手工编码。另外,ProtobufDecoder已经对消息进行了自动解码,因此接收到的请求消息可以直接使用。

客户端代码如下:

public class SubReqClient {

	public void connect(int port, String host) throws Exception {
		//配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
						ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
						ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
						ch.pipeline().addLast(new ProtobufEncoder());
						ch.pipeline().addLast(new SubReqClientHandler());
					}
				});
			
			//发起异步连接操作
			ChannelFuture f =b.connect(host, port).sync();
			
			//等待客户端链路关闭
			f.channel().closeFuture().sync();
		} finally {
			//优雅退出,释放NIO线程组
			group.shutdownGracefully();
		}
	}
	
	public static void main(String[] args) throws Exception {
		int port = 8080;
		if (args!=null && args.length > 0) {
			try{
				port = Integer.valueOf(args[0]);
 			} catch(NumberFormatException e) {
 				//use default value
 			}
		}
		new SubReqClient().connect(port, "127.0.0.1");
	}
}

public class SubReqClientHandler extends ChannelHandlerAdapter {

	
	public SubReqClientHandler() {
		super();
		// TODO Auto-generated constructor stub
	}

	public void channelActive(ChannelHandlerContext ctx) {
		for(int i=0;i<10;i++) {
			ctx.write(subReq(i));
		}
		ctx.flush();
	}
	
	private SubscribeReqProto.SubscribeReq subReq(int i) {
		SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
		builder.setSubReqID(i);
		builder.setUserName("Lilinfeng");
		builder.setProductName("Netty Book For Protobuf");
		List<String> address = new ArrayList<>();
		address.add("NanJing YuHuaTai");
		address.add("BeiJing LiuLiChang");
		address.add("ShenZhen HongShuLin");
		builder.setAddress(address.toString());
		return builder.build();
	}
	
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("Receive server response : [" + msg + "]");
	}
	
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}

}

       客户端接收到服务端的应答消息之后会直接打印,按照统计,应该打印10次。下面就测试下Protobuf的服务端和客户端,看它是否能正常运行。

       可以看出利用Netty提供的Protobuf编解码能力,我们再不需要了解Protobuf实现和使用细节的情况下就能轻松支持Protobuf编解码,可以方便地实现跨语言的远程服务调用和与周边的异构系统进行通信对接。

本节完结。

展开阅读全文
打赏
0
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部