第八篇 - 手写Redis(Java实现)

06/12 10:35
阅读数 192

在这里插入图片描述
在这里插入图片描述
Github源码下载:https://github.com/chenxingxing6/sourcecode/tree/master/code-redis


一、前言

Redis是一个开源的使用ANSI C语言编写、遵守BSD协议、支持网络、可基于内存亦可持久化的日志型、key-value数据库。它通常被称为数据结构服务器,因为值(value)可以是字符串(String),哈希(Hash),列表(List),集合(Set)和有序集合(sorted Set)等类型。

1.1 实现内容

1.基本指令操作
2.多用户并发
3.数据持久化

1.2 实现思路

1.将字节流依照Redis的协议反列化为Java能够识别的对象,识别指令。
2.然后通过反射机制,找到对于处理类进行处理。
3.最后将结果依照协议序列化为字节流写到客户端。

在这里插入图片描述

Redis的RESP通信协议描述(二进制)

RESP实际上是一个支持以下数据类型的序列化协议:简单字符串(Simple Strings),错误(Errors),整数(Integers), 块字符串(Bulk Strings)和数组(Arrays)。

在Redis中,RESP用作 请求-响应 协议的方式如下:

1、客户端将命令作为批量字符串的RESP数组发送到Redis服务器。
2、服务器(Server)根据命令执行的情况返回一个具体的RESP类型作为回复。

在RESP协议中,有些的数据类型取决于第一个字节:

1、对于简单字符串,回复的第一个字节是“+”
2、对于错误,回复的第一个字节是“ - ”
3、对于整数,回复的第一个字节是“:”
4、对于批量字符串,回复的第一个字节是“$”
5、对于数组,回复的第一个字节是“*”

Redis在TCP端口6379上监听到来的连接(本质就是socket),客户端连接到来时,Redis服务器为此创建一个TCP连接。在客户端与服务器端之间传输的每个Redis命令或者数据都以\r\n结尾。
在这里插入图片描述


二、Redis客户端测试

软件自己去下载:

1.下载wget http://download.redis.io/redis-stable.tar.gz
2.解压,编译 cd redis-stable; make;
3.cd src/ 可以看到新生成redis-cli,redis-server
4.启动服务 redis-cli
在这里插入图片描述


1.检测 redis 服务是否启动
avatar

2.字符串(String)
avatar

3.哈希(Hash)
avatar

4.列表(List)
avatar

5.集合(Set)
avatar

持久化的文件:
avatar


2.1 执行日志
{key={field=update}}从文件中读数据
数据加载完成....
执行命令:PINGCommand
执行命令:SETCommand
执行命令:GETCommand
执行命令:HSETCommand
向文件中写入数据{mykey={field=value}, key={field=update}}
执行命令:HGETCommand
执行命令:HGETCommand
执行命令:HGETCommand
执行命令:LPUSHCommand
向文件中写入数据{lxh=[value], key=[]}
执行命令:LPUSHCommand
向文件中写入数据{lxh=[value1, value], key=[]}
执行命令:LPUSHCommand
向文件中写入数据{lxh=[value2, value1, value], key=[]}
执行命令:LPOPCommand
向文件中写入数据{lxh=[value1, value], key=[]}
执行命令:LPOPCommand
向文件中写入数据{lxh=[value], key=[]}
执行命令:LPOPCommand
向文件中写入数据{lxh=[], key=[]}
执行命令:SADDCommand
执行命令:SADDCommand
执行命令:SCARDCommand
执行命令:SADDCommand
执行命令:SCARDCommand
2.2 Jedis测试
import redis.clients.jedis.Jedis;
/** * @Author: cxx * @Date: 2019/11/3 23:28 */
public class com.demo.JedisTest {
    public static void main(String[] args) {
        Jedis jedis=new Jedis("localhost", 6379);
        String result = jedis.ping();
        System.out.println(result);
        //关闭jedis
        jedis.close();
    }
}

开启redis服务

{lxh=[], key=[]}从文件中读数据
{mykey={field=value}, key={field=update}}从文件中读数据
数据加载完成....

avatar


三、核心代码

3.1 MyRedisService

package com.demo;

import com.demo.command.ICommand;
import com.demo.data.PermanentData;
import com.demo.procotol.MyDecode;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/** * @Author: cxx * @Date: 2019/11/3 19:48 */
public class MyRedisService {
    public static void main(String[] args) {
        new MyRedisService().start(6379);
    }
    public void start(int port){
        // 1.加载持久化数据
        loadData();
        ExecutorService executor = Executors.newFixedThreadPool(20000);
        ServerSocket serverSocket = null;
        try {
            // 循环接受客户端连接
            serverSocket = new ServerSocket(port);
            System.out.println("等待客户端连接....");
            while (true){
                final Socket socket = serverSocket.accept();
                executor.execute(() -> {
                    try {
                        // 持续提供业务服务
                        while (true){
                            InputStream is = socket.getInputStream();
                            OutputStream os = socket.getOutputStream();
                            // 解析命令
                            ICommand command = new MyDecode(is, os).getCommand();
                            if (command != null){
                                command.run(os);
                            }
                            TimeUnit.MILLISECONDS.sleep(10);
                        }
                    }catch (Exception e){
                        try {
                            socket.close();
                        }catch (Exception e1){
                        }
                    }
                });
            }
        }catch (Exception e){
            try {
                serverSocket.close();
            }catch (Exception e1){
            }
        }
    }
    public void loadData(){
        PermanentData.getInstance().readFromListProfile();
        PermanentData.getInstance().readFromMapProfile();
        System.out.println("数据加载完成....");
    }
}

3.2 ICommand

package com.demo.command;

import java.io.OutputStream;
import java.util.List;

/** * @Author: cxx * @Date: 2019/11/3 19:49 */
public interface ICommand {
    public void run(OutputStream out);
    public void setArgs(List<Object> args);
}

实现类:exit,get,hget,hset,info,lpop,lpush,lrange,ping,sadd,scard,select,set


3.3 MyDecode解析命令

package com.demo.procotol;

import com.demo.command.ICommand;
import com.demo.exception.RedisException;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

/** * @Author: cxx * @Date: 2019/11/3 20:05 */
public class MyDecode {
    private InputStream is;
    private OutputStream os;
    private ProtocolInputStream pis;

    public MyDecode(InputStream is, OutputStream os) {
        this.is = is;
        this.os = os;
        pis = new ProtocolInputStream(is);
    }

    public ICommand getCommand(){
        try {
            Object o = process();
            // 客户端将命令作为批量字符串的RESP数组发送到Redis服务器,如果解析出不是list就有问题了
            if (!(o instanceof List)){
                Protocolcode.writeBulkString(os, "Server too tired,please wait .....");
                throw new RedisException("内部解析错误,服务器故障");
            }

            List<Object> list = (List<Object>) o;
            if (list.size() < 1) {
                Protocolcode.writeBulkString(os, "Server too tired,please wait .....");
                throw new RedisException("内部解析错误,服务器故障");
            }
            Object o2 = list.remove(0);
            if (!(o2 instanceof byte[])) {
                Protocolcode.writeBulkString(os, "Server too tired,please wait .....");
                throw new RedisException("内部解析错误,服务器故障");
            }
            String commandName = String.format("%sCommand", new String((byte[]) (o2)).trim().toUpperCase());
            System.out.println("执行命令:" + commandName);

            Class<?> cls = null;
            ICommand command = null;
            cls = Class.forName("com.demo.command." + commandName);
            if (cls == null || !ICommand.class.isAssignableFrom(cls)){
                Protocolcode.writeError(os, "Wrong Input,Please try again");
            }else {
                command = (ICommand) cls.newInstance();
            }
            command.setArgs(list);
            return command;
        }catch (Exception e){
            try {
                Protocolcode.writeError(os, "Wrong Input,Please try again");
            }catch (Exception e1){
            }
        }
        return null;
    }

    public String processError() throws IOException {
        return pis.readLine();
    }

    public String processSimpleString() throws IOException {
        return pis.readLine();
    }

    public long processInteger() throws IOException {
        return pis.readInteger();
    }

    //'$6\r\nfoobar\r\n' 或者 '$-1\r\n'
    public byte[] processBulkString() {
        int len = 0;
        byte[] bytes;
        String str = null;
        try {
            len = (int) pis.readInteger();
            bytes = new byte[len];
            str = pis.readLine();

        } catch (IOException e) {
            e.printStackTrace();
        }
        return str.getBytes();
    }

    public List<byte[]> processArray() {
        int len = 0;
        List<byte[]> list = new ArrayList<byte[]>();
        try {
            len = (int) pis.readInteger();
            //"*5\r\n
            // 5\r\nlpush\r\n$3\r\nkey\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n";
            for (int i = 0; i < len; i++) {
                byte[] bytes = (byte[]) process();
                list.add(bytes);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return list;
    }

    public Object process() throws IOException {
        int b = 0;
        try {
            // 从输入流中读取数据的下一个字节,没可用字节返回-1
            b = is.read();
        } catch (IOException e) {
        }
        if (b == -1) {
            throw new RuntimeException("程序错误..........");
        }

        // 数据类型取决于第一个字节,redis resp通信协议规定
        switch ((char) b) {
            case '+':
                return processSimpleString();
            case '-':
                return processError();
            case ':':
                return processInteger();
            case '$':
                return processBulkString();
            case '*':
                return processArray();
            default:
                Protocolcode.writeError(os, "Unresolve this commond");
        }
        return null;
    }
}

3.4 Protocolcode

package com.demo.procotol;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

/** * @Author: cxx * @Date: 2019/11/3 20:21 */
public class Protocolcode {
    public static void writeInteger(OutputStream write, long length) throws Exception {
        write.write(':');
        write.write(String.valueOf(length).getBytes());
        write.write("\r\n".getBytes());
        write.flush();

    }

    //'+OK\r\n'
    public static void writeString(OutputStream write, String str) throws Exception {
        write.write('+');
        write.write(str.getBytes());
        write.write("\r\n".getBytes());
        write.flush();
    }

    //字节流'$6\r\nfoobar\r\n' 或者 '$-1\r\n'
    public static void writeBulkString(OutputStream write, String str) throws Exception {
        byte[] b = new byte[0];
        if (str != null){
            b = str.getBytes();
        }
        write.write('$');
        write.write(String.valueOf(b.length).getBytes());
        write.write("\r\n".getBytes());
        write.write(b);
        write.write("\r\n".getBytes());
        write.flush();
    }

    //'2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n'
    public static void writeArray(OutputStream write, List<?> list) throws Exception {
        write.write('*');
        write.write(String.valueOf(list.size()).getBytes());
        write.write("\r\n".getBytes());
        for (Object o : list) {
            if ((o instanceof String)) {
                writeBulkString(write, (String) o);
            } else if (o instanceof Integer) {
                writeInteger(write, (long) o);
            } else if (o instanceof Long) {
                writeInteger(write, (Long) o);
            } else if (o instanceof List<?>) {
                writeArray(write, (List<?>) o);
            }
        }
    }

    //'-WRONGTYPE Operation against a key holding the wrong kind of value'
    public static void writeError(OutputStream write, String message) throws IOException {
        write.write('-');
        write.write(message.getBytes());
        write.write("\r\n".getBytes());
        write.flush();
        write.close();
    }
}

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部