文档章节

Netty模拟redis服务器

ksfzhaohui
 ksfzhaohui
发布于 2016/08/28 10:23
字数 2078
阅读 4111
收藏 110

Redis的客户端与服务端采用叫做 RESP(Redis Serialization Protocol)的网络通信协议交换数据,客户端和服务器通过 TCP 连接来进行数据交互, 服务器默认的端口号为 6379 。客户端和服务器发送的命令或数据一律以 \r\n (CRLF)结尾。

RESP支持五种数据类型:

状态回复(status reply):以“+”开头,表示正确的状态信息,”+”后就是具体信息​,比如:

redis 127.0.0.1:6379> set ss sdf
OK

其实它真正回复的数据是:+OK\r\n
错误回复(error reply):以”-“开头,表示错误的状态信息,”-“后就是具体信息,比如:

redis 127.0.0.1:6379> incr ss
(error) ERR value is not an integer or out of range

整数回复(integer reply):以”:”开头,表示对某些操作的回复比如DEL, EXISTS, INCR等等

redis 127.0.0.1:6379> incr aa
(integer) 1

批量回复(bulk reply):以”$”开头,表示下一行的字符串长度,具体字符串在下一行中

多条批量回复(multi bulk reply):以”*”开头,表示消息体总共有多少行(不包括当前行)”*”是具体行数

redis 127.0.0.1:6379> get ss
"sdf"

客户端->服务器
*2\r\n
$3\r\n
get\r\n
$2\r\n
ss\r\n
服务器->客户端
$3\r\n
sdf\r\n

注:以上写的都是XX回复,并不是说协议格式只是适用于服务器->客户端,客户端->服务器端也同样使用以上协议格式,其实双端协议格式的统一更加方便扩展

回到正题,我们这里是通过netty来模拟redis服务器,可以整理一下思路大概分为这么几步:

1.需要一个底层的通信框架,这里选择的是netty4.0.25
2.需要对客户端穿过来的数据进行解码(Decoder),其实就是分别处理以上5种数据类型
3.解码以后我们封装成更加利于理解的命令(Command),比如:set<name> foo hello<params>
4.有了命令以后就是处理命令(execute),其实我们可以去连接正在的redis服务器,不过这里只是简单的模拟
5.处理完之后就是封装回复(Reply),然后编码(Encoder),需要根据不同的命令分别返回以后5种数据类型
6.测试验证,通过redis-cli去连接netty模拟的redis服务器,看能否返回正确的结果

以上思路参考github上的一个项目:https://github.com/spullara/redis-protocol,测试代码也是在此基础上做了一个简化

第一步:通信框架netty

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.0.25.Final</version>
</dependency>

第二步:数据类型解码

public class RedisCommandDecoder extends ReplayingDecoder<Void> {

	public static final char CR = '\r';
	public static final char LF = '\n';

	public static final byte DOLLAR_BYTE = '$';
	public static final byte ASTERISK_BYTE = '*';

	private byte[][] bytes;
	private int arguments = 0;

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in,
			List<Object> out) throws Exception {
		if (bytes != null) {
			int numArgs = bytes.length;
			for (int i = arguments; i < numArgs; i++) {
				if (in.readByte() == DOLLAR_BYTE) {
					int l = RedisReplyDecoder.readInt(in);
					if (l > Integer.MAX_VALUE) {
						throw new IllegalArgumentException(
								"Java only supports arrays up to "
										+ Integer.MAX_VALUE + " in size");
					}
					int size = (int) l;
					bytes[i] = new byte[size];
					in.readBytes(bytes[i]);
					if (in.bytesBefore((byte) CR) != 0) {
						throw new RedisException("Argument doesn't end in CRLF");
					}
					// Skip CRLF(\r\n)
					in.skipBytes(2);
					arguments++;
					checkpoint();
				} else {
					throw new IOException("Unexpected character");
				}
			}
			try {
				out.add(new Command(bytes));
			} finally {
				bytes = null;
				arguments = 0;
			}
		} else if (in.readByte() == ASTERISK_BYTE) {
			int l = RedisReplyDecoder.readInt(in);
			if (l > Integer.MAX_VALUE) {
				throw new IllegalArgumentException(
						"Java only supports arrays up to " + Integer.MAX_VALUE
								+ " in size");
			}
			int numArgs = (int) l;
			if (numArgs < 0) {
				throw new RedisException("Invalid size: " + numArgs);
			}
			bytes = new byte[numArgs][];
			checkpoint();
			decode(ctx, in, out);
		} else {
			in.readerIndex(in.readerIndex() - 1);
			byte[][] b = new byte[1][];
			b[0] = in.readBytes(in.bytesBefore((byte) CR)).array();
			in.skipBytes(2);
			out.add(new Command(b, true));
		}
	}

}

首先通过接受到以“*”开头的多条批量类型初始化二维数组byte[][] bytes,以读取到第一个以\r\n结尾的数据作为数组的长度,然后再处理以“$”开头的批量类型。
以上除了处理我们熟悉的批量和多条批量类型外,还处理了没有任何标识的数据,其实有一个专门的名字叫Inline命令:
有些时候仅仅是telnet连接Redis服务,或者是仅仅向Redis服务发送一个命令进行检测。虽然Redis协议可以很容易的实现,但是使用Interactive sessions 并不理想,而且redis-cli也不总是可以使用。基于这些原因,Redis支持特殊的命令来实现上面描述的情况。这些命令的设计是很人性化的,被称作Inline 命令。

第三步:封装command对象

由第二步中可以看到不管是commandName还是params都统一放在了字节二维数组里面,最后封装在command对象里面

public class Command {
	public static final byte[] EMPTY_BYTES = new byte[0];

	private final Object name;
	private final Object[] objects;
	private final boolean inline;

	public Command(Object[] objects) {
		this(null, objects, false);
	}

	public Command(Object[] objects, boolean inline) {
		this(null, objects, inline);
	}

	private Command(Object name, Object[] objects, boolean inline) {
		this.name = name;
		this.objects = objects;
		this.inline = inline;
	}

	public byte[] getName() {
		if (name != null)
			return getBytes(name);
		return getBytes(objects[0]);
	}

	public boolean isInline() {
		return inline;
	}

	private byte[] getBytes(Object object) {
		byte[] argument;
		if (object == null) {
			argument = EMPTY_BYTES;
		} else if (object instanceof byte[]) {
			argument = (byte[]) object;
		} else if (object instanceof ByteBuf) {
			argument = ((ByteBuf) object).array();
		} else if (object instanceof String) {
			argument = ((String) object).getBytes(Charsets.UTF_8);
		} else {
			argument = object.toString().getBytes(Charsets.UTF_8);
		}
		return argument;
	}

	public void toArguments(Object[] arguments, Class<?>[] types) {
		for (int position = 0; position < types.length; position++) {
			if (position >= arguments.length) {
				throw new IllegalArgumentException(
						"wrong number of arguments for '"
								+ new String(getName()) + "' command");
			}
			if (objects.length - 1 > position) {
				arguments[position] = objects[1 + position];
			}
		}
	}

}

所有的数据都放在了Object数组里面,而且可以通过getName方法知道Object[0]就是commandName

第四步:执行命令

在经历了解码和封装之后,下面需要实现handler类,用来处理消息

public class RedisCommandHandler extends SimpleChannelInboundHandler<Command> {

	private Map<String, Wrapper> methods = new HashMap<String, Wrapper>();

	interface Wrapper {
		Reply<?> execute(Command command) throws RedisException;
	}

	public RedisCommandHandler(final RedisServer rs) {
		Class<? extends RedisServer> aClass = rs.getClass();
		for (final Method method : aClass.getMethods()) {
			final Class<?>[] types = method.getParameterTypes();
			methods.put(method.getName(), new Wrapper() {
				@Override
				public Reply<?> execute(Command command) throws RedisException {
					Object[] objects = new Object[types.length];
					try {
						command.toArguments(objects, types);
						return (Reply<?>) method.invoke(rs, objects);
					} catch (Exception e) {
						return new ErrorReply("ERR " + e.getMessage());
					}
				}
			});
		}
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Command msg)
			throws Exception {
		String name = new String(msg.getName());
		Wrapper wrapper = methods.get(name);
		Reply<?> reply;
		if (wrapper == null) {
			reply = new ErrorReply("unknown command '" + name + "'");
		} else {
			reply = wrapper.execute(msg);
		}
		if (reply == StatusReply.QUIT) {
			ctx.close();
		} else {
			if (msg.isInline()) {
				if (reply == null) {
					reply = new InlineReply(null);
				} else {
					reply = new InlineReply(reply.data());
				}
			}
			if (reply == null) {
				reply = ErrorReply.NYI_REPLY;
			}
			ctx.write(reply);
		}
	}
}

在实例化handler的时候传入了一个RedisServer对象,这个方法是真正用来处理redis命令的,理论上这个对象应该支持redis的所有命令,不过这里只是测试所有只提供了2个方法:

public interface RedisServer {

	public BulkReply get(byte[] key0) throws RedisException;

	public StatusReply set(byte[] key0, byte[] value1) throws RedisException;

}

在channelRead0方法中我们可以拿到之前封装好的command方法,然后通过命令名称执行操作,这里的RedisServer也很简单,只是用简单的hashmap进行临时的保存数据。

第五步:封装回复

第四步种我们可以看到处理完命令之后,返回了一个Reply对象

public interface Reply<T> {

	byte[] CRLF = new byte[] { RedisReplyDecoder.CR, RedisReplyDecoder.LF };

	T data();

	void write(ByteBuf os) throws IOException;
}

根据上面提到的5种类型再加上一个inline命令,根据不同的数据格式进行拼接,比如StatusReply:

public void write(ByteBuf os) throws IOException {
	os.writeByte('+');
	os.writeBytes(statusBytes);
	os.writeBytes(CRLF);
}

所以对应Decoder的Encoder就很简单了:

public class RedisReplyEncoder extends MessageToByteEncoder<Reply<?>> {
	@Override
	public void encode(ChannelHandlerContext ctx, Reply<?> msg, ByteBuf out)
			throws Exception {
		msg.write(out);
	}
}

只需要将封装好的Reply返回给客户端就行了

最后一步:测试

启动类:

public class Main {
	private static Integer port = 6379;

	public static void main(String[] args) throws InterruptedException {
		final RedisCommandHandler commandHandler = new RedisCommandHandler(
				new SimpleRedisServer());

		ServerBootstrap b = new ServerBootstrap();
		final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1);
		try {
			b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
					.channel(NioServerSocketChannel.class)
					.option(ChannelOption.SO_BACKLOG, 100).localAddress(port)
					.childOption(ChannelOption.TCP_NODELAY, true)
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel ch)
								throws Exception {
							ChannelPipeline p = ch.pipeline();
							p.addLast(new RedisCommandDecoder());
							p.addLast(new RedisReplyEncoder());
							p.addLast(group, commandHandler);
						}
					});

			ChannelFuture f = b.bind().sync();
			f.channel().closeFuture().sync();
		} finally {
			group.shutdownGracefully();
		}
	}
}

ChannelPipeline分别添加了RedisCommandDecoder、RedisReplyEncoder和RedisCommandHandler,同时我们启动的端口和Redis服务器端口是一样的也是6379

打开redis-cli程序:

redis 127.0.0.1:6379> get dsf
(nil)
redis 127.0.0.1:6379> set dsf dsfds
OK
redis 127.0.0.1:6379> get dsf
"dsfds"
redis 127.0.0.1:6379>

从结果可以看出和正常使用redis服务器没有差别

总结

这样做的意义其实就是可以把它当做一个redis代理,由这个代理服务器去进行sharding处理,客户端不直接访问redis服务器,对客户端来说,后台redis集群是完全透明的。

个人博客:codingo.xyz

© 著作权归作者所有

ksfzhaohui

ksfzhaohui

粉丝 394
博文 147
码字总数 210335
作品 3
南京
高级程序员
私信 提问
加载中

评论(10)

ksfzhaohui
ksfzhaohui 博主

引用来自“hzhelifu”的评论

引用来自“hzhelifu”的评论

试试这个:<a href='https://github.com/helifu/twemproxy-163' rel='nofollow' target='_blank'>https://github.com/helifu/twemproxy-163</a>

支持redis节点自动切换的版本

https://github.com/helifu/twemproxy-163
mark一下
h
hzhelifu

引用来自“hzhelifu”的评论

试试这个:<a href='https://github.com/helifu/twemproxy-163' rel='nofollow' target='_blank'>https://github.com/helifu/twemproxy-163</a>

支持redis节点自动切换的版本

https://github.com/helifu/twemproxy-163
h
hzhelifu
试试这个:<a href='https://github.com/helifu/twemproxy-163' rel='nofollow' target='_blank'>https://github.com/helifu/twemproxy-163</a>

支持redis节点自动切换的版本
ksfzhaohui
ksfzhaohui 博主

引用来自“purple_grape”的评论

上代理,直接twemproxy
是的,只是模拟了解一下具体流程
purple_grape
purple_grape
上代理,直接twemproxy
ksfzhaohui
ksfzhaohui 博主

引用来自“抗争的诗人”的评论

啥时候开发一个redis代理服务器出来玩玩
可以看看这个:https://github.com/spullara/redis-protocol
lnwazg
lnwazg
啥时候开发一个redis代理服务器出来玩玩
ksfzhaohui
ksfzhaohui 博主

引用来自“Sky”的评论

玩C/C++ 的可以试下这个C++开发的redis服务器框架库,https://github.com/0xsky/xredis-server
可以
Sky
Sky
玩C/C++ 的可以试下这个C++开发的redis服务器框架库,https://github.com/0xsky/xredis-server
SimonYe
SimonYe
Proxy 是非常重要的! 13
通过netty实现协议来实现代理服务器。

今天再oschina中看到了http://my.oschina.net/OutOfMemory/blog/738865(Netty模拟redis服务器) 前段时间看到了用vert.x来代理mysql服务器。想想mycat这个服务器中间件不也是类似的功能吗?...

miaojiangmin
2016/09/01
210
0
Java|BIO、NIO、AIO

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 https://blog.csdn.net/darlingwood2013/article/details/100066366 在很多面经里都有这样的...

叶晚林
08/25
0
0
使用 DotNetty 实现 Redis 的一个控制台应用程序

零:Demo 跑出来的结果如图 上图说明 图中左边蓝色的命令行界面,是用windows powershell 命令行链接的。   1.打开powershell命令行界面,输入命令【telnet 127.0.0.1 6379】。    如果...

入夏
2018/08/15
0
0
Netty4遇到的几个问题.

服务器上使用 Netty 作为socket 长连接的服务器. 遇到几个问题, 已经纠结了好一段时间, 今天提出来. 希望能够被大家启发下. 1. 接收服务使用 Netty 接收设备的报文消息. 如果要做到推送, 那么...

采蘑菇的大叔
2017/05/17
486
5
Netty 源码中对 Redis 协议的实现

近期一直在做网络协议相关的工作,所以博客也就与之相关的比较多,今天楼主结合 Redis的协议 RESP 看看在 Netty 源码中是如何实现的。 RESP 协议 RESP 是 Redis 序列化协议的简写。它是一种直...

haifeiWu
2018/08/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Leetcode PHP题解--D118 350. Intersection of Two Arrays II

D118 350. Intersection of Two Arrays II 题目链接 350. Intersection of Two Arrays II 题目分析 返回给定两个数组的交集。 思路 从数量较多的那个数组开始去另一个数组寻找是否元素存在,...

skys215
18分钟前
2
0
从源码上分析Android View保存数据状态

在Android开发旅途中,经常会遇到系统控件无法满足我们的视觉,交互效果,这个时候我们常常需要自己自定义控件来满足我们的需求。在这个开发探索过程中,我们不可避免得遇到View要保存状态信...

shzwork
19分钟前
2
0
请问AD603AQ和AD603AR有什么区别?

  AD603AQ和AD603AR只是在封装上的区别,前者是双列直插式,后者是贴片式,AD603A系列的温度都是在—40摄氏度到+85摄氏度之间,AD603还有一个系列是AD603S,它的温度是在—55摄氏度到+125摄...

仙溪
20分钟前
2
0
Linux /etc/profile 配置文件修改

1. 执行命令: vi /etc/profile 去类似windows 配置环境变量, 2.修改完,立即生效命令: source /etc/profile

kuchawyz
21分钟前
3
0
对于小白来说素描怎么入门?怎么学习?

素描初学者怎样入门?初学者怎样才能画好素描绘画?画好素描绘画有哪些技巧?想必这些问题都是绘画初学者们比较伤脑筋的问题,那么初学者到底怎样才能画好素描绘画呢?今天收集整理了关于素描...

huihuajiaocheng
23分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部