文档章节

Netty5入门学习笔记004-使用Netty传输POJO对象(上)

山东-小木
 山东-小木
发布于 2014/12/26 23:14
字数 2397
阅读 10424
收藏 140

使用Netty传输POJO对象,重点在于对象的序列化,序列化后的对象可以通过TCP流进行网络传输,结合Netty提供的对象编解码器,可以做到远程传输对象。

下面我们来看一个例子:模拟订票

首先Java序列化的POJO对象需要实现java.io.Serializable接口。

说明:还有很多种序列化的方式要比JDK自带的序列化要好 体积小利于保存和传输 例如google的protobuf和jboss的Marshalling 

火车车次和余票量POJO:

package bookticket;

import java.io.Serializable;
/**
 * 火车pojo对象
 * @author xwalker
 */
public class Train implements Serializable {
	private static final long serialVersionUID = 1510326612440404416L;
	private String number;//火车车次
	private int ticketCounts;//余票数量
	public Train(String number,int ticketCounts){
		this.number=number;
		this.ticketCounts=ticketCounts;
	}
	public String getNumber() {
		return number;
	}
	public void setNumber(String number) {
		this.number = number;
	}
	public int getTicketCounts() {
		return ticketCounts;
	}
	public void setTicketCounts(int ticketCounts) {
		this.ticketCounts = ticketCounts;
	}

}

 

车票POJO:

package bookticket;

import java.io.Serializable;
import java.util.Date;
/**
 * 订票POJO对象
 * @author xwalker
 */
public class Ticket implements Serializable {
	private static final long serialVersionUID = 4228051882802183587L;
	private String trainNumber;//火车车次
	private int carriageNumber;//车厢编号
	private String seatNumber;//座位编号
	private String number;//车票编号
	private User user;//订票用户
	private Date bookTime;//订票时间
	private Date startTime;//开车时间
	public String getNumber() {
		return number;
	}
	public void setNumber(String number) {
		this.number = number;
	}

	public Date getBookTime() {
		return bookTime;
	}
	public void setBookTime(Date bookTime) {
		this.bookTime = bookTime;
	}
	public Date getStartTime() {
		return startTime;
	}
	public void setStartTime(Date startTime) {
		this.startTime = startTime;
	}
	public User getUser() {
		return user;
	}
	public void setUser(User user) {
		this.user = user;
	}
	public String getTrainNumber() {
		return trainNumber;
	}
	public void setTrainNumber(String trainNumber) {
		this.trainNumber = trainNumber;
	}
	public int getCarriageNumber() {
		return carriageNumber;
	}
	public void setCarriageNumber(int carriageNumber) {
		this.carriageNumber = carriageNumber;
	}
	public String getSeatNumber() {
		return seatNumber;
	}
	public void setSeatNumber(String seatNumber) {
		this.seatNumber = seatNumber;
	}
}

 

用户POJO:

package bookticket;

import java.io.Serializable;
/**
 * 用户POJO对象
 * @author xwalker
 */
public class User implements Serializable {
	private static final long serialVersionUID = -3845514510571408376L;
	private String userId;//身份证
	private String userName;//姓名
	private String phone;//电话
	private String email;//邮箱
	public String getUserId() {
		return userId;
	}
	public void setUserId(String userId) {
		this.userId = userId;
	}
	public String getUserName() {
		return userName;
	}
	public void setUserName(String userName) {
		this.userName = userName;
	}
	public String getPhone() {
		return phone;
	}
	public void setPhone(String phone) {
		this.phone = phone;
	}
	public String getEmail() {
		return email;
	}
	public void setEmail(String email) {
		this.email = email;
	}
}

 

请求指令集:通讯使用的固定指令集 服务器和客户端统一

package bookticket;

/**
 * 指令集
 * @author xwalker
 *
 */
public class Code {
	public static final int CODE_SEARCH=1;//查询车票余量
	public static final int CODE_BOOK=2;//订票
	public static final int CODE_NONE=-1;//错误指令 无法处理
}

 

客户端发送的请求信息:客户端发送一条请求信息 根据code属性确定此消息需要服务器做出如何响应 依据Code.java中定义的查票还是订票等

package bookticket;

import java.io.Serializable;
import java.util.Date;
/**
 * 订票人发送查询余票和订票使用的请求信息
 * @author xwalker
 *
 */
public class BookRequestMsg implements Serializable {
	private static final long serialVersionUID = -7335293929249462183L;
	private User user;//发送订票信息用户
	private String trainNumber;//火车车次
	private int code;//查询命令
	private Date startTime;//开车时间
	public User getUser() {
		return user;
	}
	public void setUser(User user) {
		this.user = user;
	}
	public String getTrainNumber() {
		return trainNumber;
	}
	public void setTrainNumber(String trainNumber) {
		this.trainNumber = trainNumber;
	}
	public Date getStartTime() {
		return startTime;
	}
	public void setStartTime(Date startTime) {
		this.startTime = startTime;
	}
	public int getCode() {
		return code;
	}
	public void setCode(int code) {
		this.code = code;
	}

}

 

服务器接收订票和查票后处理完业务反馈客户端的信息:

package bookticket;

import java.io.Serializable;
import java.util.Date;
/**
 * 订票成功与否反馈信息
 * @author xwalker
 */
public class BookResponseMsg implements Serializable {
	private static final long serialVersionUID = -4984721370227929766L;
	private boolean success;//是否操作成功
	private User user;//请求用户
	private String msg;//反馈信息
	private int code;//请求指令
	private Train train;//火车车次
	private Date startTime;//出发时间
	private Ticket ticket;//订票成功后具体出票票据
	public boolean getSuccess() {
		return success;
	}
	public void setSuccess(boolean success) {
		this.success = success;
	}
	public String getMsg() {
		return msg;
	}
	public void setMsg(String msg) {
		this.msg = msg;
	}
	public Ticket getTicket() {
		return ticket;
	}
	public void setTicket(Ticket ticket) {
		this.ticket = ticket;
	}
	public int getCode() {
		return code;
	}
	public void setCode(int code) {
		this.code = code;
	}
	public Train getTrain() {
		return train;
	}
	public void setTrain(Train train) {
		this.train = train;
	}
	public Date getStartTime() {
		return startTime;
	}
	public void setStartTime(Date startTime) {
		this.startTime = startTime;
	}
	public User getUser() {
		return user;
	}
	public void setUser(User user) {
		this.user = user;
	}
	
}

 

订票服务器:主要是配置对象解码器 netty会自动将序列化的pojo对象编码 解码 无需自己额外处理 只需要依据配置即可

package bookticket;

import java.util.ArrayList;
import java.util.List;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * 订票服务器端
 * @author xwalker
 *
 */
public class BookTicketServer {
	public static List<Train> trains;
	/**
	 * 初始化 构造车次和车票余数
	 */
	public BookTicketServer() {
		trains=new ArrayList<Train>();
		trains.add(new Train("G242",500));
		trains.add(new Train("G243",200));
		trains.add(new Train("D1025",100));
		trains.add(new Train("D1235",0));
	}
	public void bind(int port) throws Exception{
		//配置NIO线程组
		EventLoopGroup bossGroup=new NioEventLoopGroup();
		EventLoopGroup workerGroup=new NioEventLoopGroup();
		try{
			//服务器辅助启动类配置
			ServerBootstrap b=new ServerBootstrap();
			b.group(bossGroup, workerGroup)
			.channel(NioServerSocketChannel.class)
			.option(ChannelOption.SO_BACKLOG, 100)
			.handler(new LoggingHandler(LogLevel.INFO))
			.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					//添加对象解码器 负责对序列化POJO对象进行解码 设置对象序列化最大长度为1M 防止内存溢出
					//设置线程安全的WeakReferenceMap对类加载器进行缓存 支持多线程并发访问  防止内存溢出 
					ch.pipeline().addLast(new ObjectDecoder(1024*1024,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
					//添加对象编码器 在服务器对外发送消息的时候自动将实现序列化的POJO对象编码
					ch.pipeline().addLast(new ObjectEncoder());
					ch.pipeline().addLast(new BookTicketServerhandler());
				}
			});
			//绑定端口 同步等待绑定成功
			ChannelFuture f=b.bind(port).sync();
			//等到服务端监听端口关闭
			f.channel().closeFuture().sync();
		}finally{
			//优雅释放线程资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	public static void main(String[] args) throws Exception {
		int port =8000;
		new BookTicketServer().bind(port);
	}

}

 

服务器端网络IO处理器,查票订票业务处理和反馈:根据客户端请求信息中的code指令 确定是查票还是订票

package bookticket;

import java.util.Date;
import java.util.Random;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
 * 订票server端处理器
 * @author xwalker
 *
 */
public class BookTicketServerhandler extends ChannelHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		BookRequestMsg requestMsg=(BookRequestMsg) msg;
		BookResponseMsg responseMsg=null;
		switch (requestMsg.getCode()) {
		case Code.CODE_SEARCH://查询余票
			for(Train train:BookTicketServer.trains){
				//找到车次与请求车次相同的 返回车次余票
				if(requestMsg.getTrainNumber().equals(train.getNumber())){
					responseMsg=new BookResponseMsg();
					responseMsg.setUser(requestMsg.getUser());
					responseMsg.setCode(Code.CODE_SEARCH);
					responseMsg.setSuccess(true);
					responseMsg.setTrain(train);
					responseMsg.setStartTime(requestMsg.getStartTime());
					responseMsg.setMsg("火车【"+train.getNumber()+"】余票数量为【"+train.getTicketCounts()+"】");
					break;
				}
			}
			if(responseMsg==null){
				responseMsg=new BookResponseMsg();
				responseMsg.setUser(requestMsg.getUser());
				responseMsg.setCode(Code.CODE_SEARCH);
				responseMsg.setSuccess(false);
				responseMsg.setMsg("火车【"+requestMsg.getTrainNumber()+"】的信息不存在!");
			}
			break;
		case Code.CODE_BOOK://确认订票
			for(Train train:BookTicketServer.trains){
				//找到车次与请求车次相同的 返回车次余票
				if(requestMsg.getTrainNumber().equals(train.getNumber())){
					responseMsg=new BookResponseMsg();
					responseMsg.setUser(requestMsg.getUser());
					responseMsg.setSuccess(true);
					responseMsg.setCode(Code.CODE_BOOK);
					responseMsg.setMsg("恭喜您,订票成功!");
					Ticket ticket=new Ticket();
					ticket.setBookTime(new Date());
					ticket.setUser(requestMsg.getUser());
					ticket.setStartTime(requestMsg.getStartTime());
					ticket.setNumber(train.getNumber()+System.currentTimeMillis());//生成车票编号
					ticket.setCarriageNumber(new Random().nextInt(15));//随机车厢
					ticket.setUser(requestMsg.getUser());//设置订票人信息
					String[] seat=new String[]{"A","B","C","D","E"};
					Random seatRandom=new Random();
					ticket.setSeatNumber(seat[seatRandom.nextInt(5)]+seatRandom.nextInt(100));
					ticket.setTrainNumber(train.getNumber());
					train.setTicketCounts(train.getTicketCounts()-1);//余票减去一张
					responseMsg.setTrain(train);
					responseMsg.setTicket(ticket);
					break;
				}
			}
			if(responseMsg==null){
				responseMsg=new BookResponseMsg();
				responseMsg.setUser(requestMsg.getUser());
				responseMsg.setCode(Code.CODE_BOOK);
				responseMsg.setSuccess(false);
				responseMsg.setMsg("火车【"+requestMsg.getTrainNumber()+"】的信息不存在!");
			}
			break;
		default://无法处理
				responseMsg=new BookResponseMsg();
				responseMsg.setUser(requestMsg.getUser());
				responseMsg.setCode(Code.CODE_NONE);
				responseMsg.setSuccess(false);
				responseMsg.setMsg("指令无法处理!");
			break;
		}
		
		ctx.writeAndFlush(responseMsg);
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

 

客户端:客户端也需要配置对象编码 解码器

package bookticket;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * 订票客户端
 * @author xwalker
 */
public class BookTicketClient {
	public void connect(int port,String host) throws Exception{
		//配置客户端线程组
		EventLoopGroup group=new NioEventLoopGroup();
		try{
			//配置客户端启动辅助类
			Bootstrap b=new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
			.option(ChannelOption.TCP_NODELAY, true)
			.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					//添加POJO对象解码器 禁止缓存类加载器
					ch.pipeline().addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
					//设置发送消息编码器
					ch.pipeline().addLast(new ObjectEncoder());
					//设置网络IO处理器
					ch.pipeline().addLast(new BookTicketClientHandler());
					
				}
			});
			//发起异步服务器连接请求 同步等待成功
			ChannelFuture f=b.connect(host,port).sync();
			//等到客户端链路关闭
			f.channel().closeFuture().sync();
			
		}finally{
			//优雅释放线程资源
			group.shutdownGracefully();
		}
		
	}
	
	public static void main(String[] args) throws Exception{
			new BookTicketClient().connect(8000, "127.0.0.1");
	}

}

 

客户端处理网络IO处理器 发送查票和订票请求:链路创建成功后需要 模拟发送两个车次的查票指令 其中一车有票 一车无票 有票的信息反馈回来后继续发送订票指令 成功订票后输出车票信息

package bookticket;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Calendar;

/**
 * 客户端处理器
 * 
 * @author xwalker
 */
public class BookTicketClientHandler extends ChannelHandlerAdapter {
	private User user;
	public BookTicketClientHandler() {
		user=new User();
		user.setUserName("xwalker");
		user.setPhone("187667*****");
		user.setEmail("909854136@qq.com");
		user.setUserId("3705231988********");
	}
	/**
	 * 链路链接成功
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		
		// 链接成功后发送查询某车次余票的请求
		Calendar c = Calendar.getInstance();
		c.set(Calendar.YEAR, 2015);
		c.set(Calendar.MONTH, 1);
		c.set(Calendar.DATE, 2);
		c.set(Calendar.HOUR, 11);
		c.set(Calendar.MINUTE, 30);
		// G242查询余票
		BookRequestMsg requestMsg1 = new BookRequestMsg();
		requestMsg1.setCode(Code.CODE_SEARCH);
		requestMsg1.setStartTime(c.getTime());
		requestMsg1.setTrainNumber("G242");//设置查询车次
		requestMsg1.setUser(user);//设置当前登陆用户
		ctx.write(requestMsg1);
		// D1235查询余票
		BookRequestMsg requestMsg2 = new BookRequestMsg();
		requestMsg2.setCode(Code.CODE_SEARCH);
		requestMsg2.setStartTime(c.getTime());
		requestMsg2.setTrainNumber("D1235");//设置查询车次
		requestMsg2.setUser(user);
		ctx.write(requestMsg2);
		ctx.flush();
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		BookResponseMsg responseMsg = (BookResponseMsg) msg;
		switch (responseMsg.getCode()) {
		case Code.CODE_SEARCH://收到查询结果
			System.out.println("==========火车【"+responseMsg.getTrain().getNumber()+"】余票查询结果:【"+(responseMsg.getSuccess()?"成功":"失败")+"】=========");
			System.out.println(responseMsg.getMsg());
			//查询发现有余票的话 需要发送订票指令
			if(responseMsg.getTrain().getTicketCounts()>0){
				//构造查询有余票的火车的订票指令
				BookRequestMsg requestMsg = new BookRequestMsg();
				requestMsg.setCode(Code.CODE_BOOK);
				requestMsg.setUser(user);
				requestMsg.setStartTime(responseMsg.getStartTime());
				requestMsg.setTrainNumber(responseMsg.getTrain().getNumber());
				ctx.writeAndFlush(requestMsg);
			}else{
				System.out.println("火车【"+responseMsg.getTrain().getNumber()+"】没有余票,不能订票了!");
			}
			break;
		case Code.CODE_BOOK://收到订票结果
			System.out.println("==========火车【"+responseMsg.getTrain().getNumber()+"】订票结果:【"+(responseMsg.getSuccess()?"成功":"失败")+"】=========");
			System.out.println(responseMsg.getMsg());
			System.out.println("========车票详情========");
			Ticket ticket=responseMsg.getTicket();
			System.out.println("车票票号:【"+ticket.getNumber()+"】");
			System.out.println("火车车次:【"+ticket.getTrainNumber()+"】");
			System.out.println("火车车厢:【"+ticket.getCarriageNumber()+"】");
			System.out.println("车厢座位:【"+ticket.getSeatNumber()+"】");
			System.out.println("预定时间:【"+ticket.getBookTime()+"】");
			System.out.println("出发时间:【"+ticket.getStartTime()+"】");
			System.out.println("乘客信息:【"+ticket.getUser().getUserName()+"】");
			break;
		default:
			System.out.println("==========操作错误结果=========");
			System.out.println(responseMsg.getMsg());
			break;
		}

	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

 

最后测试结果:

 

© 著作权归作者所有

山东-小木

山东-小木

粉丝 241
博文 44
码字总数 27775
作品 2
东营
CEO
私信 提问
加载中

评论(22)

山东-小木
山东-小木 博主

引用来自“太阳j”的评论

为什么先运行服务端在运行客户端数据没出来 求答案
正常配置好以来 运行没问题啊 报什么错误吗
太阳j
为什么先运行服务端在运行客户端数据没出来 求答案
super-d2
super-d2
例子写得不错哦~值得学习13
金贞花
金贞花

引用来自“有毛的汉子”的评论

兄弟,有没有netty5 的断线重连的实现??赐教啊
看example的 uptime
有毛的汉子
兄弟,有没有netty5 的断线重连的实现??赐教啊
山东-小木
山东-小木 博主

引用来自“有毛的汉子”的评论

请教个问题,长连接建立之后,客户端业务逻辑如何调用API发送消息给服务端?

ctx.write
有毛的汉子
请教个问题,长连接建立之后,客户端业务逻辑如何调用API发送消息给服务端?
小白小霸王
小白小霸王
跪等下
jkguowen
jkguowen
太好了,这个Netty系列很不错,进来学习一下
山东-小木
山东-小木 博主

引用来自“石头哥哥”的评论

不建议直接使用netty自带的序列化,只让它作为通信层就可以了,序列化东西直接在应用层处理好,netty封装的pb 也只是对单个pb消息实体来处理。还得有对应的消息结构处理-协议。才能够处理多个。序列化方案有很多,如kryo,fst,thrift(更多是实现了通信和对应的RPC),FlatBuffer,等。具体视应用而定。没有绝对的快。用自己最熟悉的 。欢迎讨论
恩 正在看google的protobuf
OSChina 技术周刊第十五期——每周技术精粹集锦

每周技术抢先看,总有你想要的! 移动开发 【软件】移动基站数据分析 SnoopSnitch 【博客】android自动连接wifi——WifiManager 【资讯】OSC 安卓客户端全面改版 —— 新界面新体验 【资讯】...

OSC编辑部
2014/12/28
1K
0
Netty5入门学习笔记001

Netty官网:http://netty.io/ 本例程使用最新的netty5.x版本编写 服务器端: TimeServer 时间服务器 服务端接收客户端的连接请求和查询当前时间的指令,判断指令正确后响应返回当前服务器的校...

山东-小木
2014/12/17
15.4K
10
dubbo典型协议、传输组件、序列化方式组合性能对比测试

前言 Dubbo作为一个扩展能力极强的分布式服务框架,在实现rpc特性的时候,给传输协议、传输框架和序列化方式提供了多种扩展实现,供开发者根据实际场景进行选择。 1、支持常见的传输协议:R...

杨武兵
2016/06/13
3.5K
6
Netty5源码分析--4.总结

JAVA NIO 复习 请先参考我之前的博文JAVA学习笔记--3.Network IO的 NIO(NonBlocking IO) SOCKET 章节。这里主要讲下JAVA NIO其中几个比较被忽略的细节,不求全,欢迎补充。 API Select 当调...

geecoodeer
2014/01/17
4.9K
18
Happy Netty5 客户端和服务端 自定义协议通信

网上好多连接好多demo都不是netty5的,都是以前的版本,况且还有好多运行时老报错。入门级demo就不写了,估计都是那些老套路。好多公司都会有最佳实践,今天就说说如何自定义协议,一般自定义...

豆芽菜橙
2017/08/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0
轻量级 HTTP(s) 代理 TinyProxy

CentOS 下安装 TinyProxy yum install -y tinyproxy 启动、停止、重启 # 启动service tinyproxy start# 停止service tinyproxy stop# 重启service tinyproxy restart 相关配置 默认...

Anoyi
昨天
2
0
Linux创建yum仓库

第一步、搞定自己的光盘 #创建文件夹 mkdir -p /media/cdrom #挂载光盘 mount /dev/cdrom /media/cdrom #编辑配置文件使其永久生效 vim /etc/fstab 第二步,编辑yun源 vim /ect yum.repos.d...

究极小怪兽zzz
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部