文档章节

etcd raft模块分析--kv存储引擎

黑客画家
 黑客画家
发布于 03/13 09:57
字数 1501
阅读 282
收藏 3
点赞 0
评论 1

概述

     raft有和多种语言实现,其中在go语言中,etcd的实现是公认的典范,本文就是从源码级别探索etcd的raft是如何实现的,这样可以让我们一步一步了解raft论文是如何实现为一个工程的。

     注:不清楚raft是什么的可以先去看我的另一篇文章https://my.oschina.net/fileoptions/blog/883497

例子

      etcd将raft单独抽象、实现为一个模块,同时也为raft模块提供了一个基本例子,在etcd源码中,它就是contrib/raftexample,进到该目录下,我们可以首先看README,里面已经有非常详细的例子使用方法了,我这里就再赘述一次。

       首先,我们在build之后,在目录下会产生一个raftexample的可执行文件,此时可以使用如下命令启动一个raft实例(single-member cluster):

raftexample --id 1 --cluster http://127.0.0.1:12379 --port 12380

      上面这条命令意思是,启动了一个raft实例的kv存储引擎,id选项用于指定本raft实例的id,cluster选项指定集群的成员地址信息,port选项指定kv存储引擎的服务端口。

      启动成功之后,此时我们可以向存储引擎存储一个值:

curl -L http://127.0.0.1:12380/my-key -XPUT -d hello

      然后我们将其取出来验证一下:

curl -L http://127.0.0.1:12380/my-key

       如果我们想启动一个本地集群,那么首先我们先安装goreman(自己google了解吧),然后直接在目录下执行:

goreman start

       goreman会使用Procfile文件定义的信息启动一个集群版本的raft和kv存储(本实例为三个节点副本),配置如下:

raftexample1: ./raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380
raftexample2: ./raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380
raftexample3: ./raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380

      此时,我们可以随机向集群中的三个成员中的任意一个发送存储指令,raft会保证存储值的一致性,同样我们也可以随机从任何一个成员中读取。

       例子中还包括了raft容错测试和集群成员变更测试,本文不再赘述。

       raftexample入口main代码如下:

func main() {
	// 解析集群地址,包括自己
	cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
	// 本节点的id
	id := flag.Int("id", 1, "node ID")
	// 应用端口
	kvport := flag.Int("port", 9121, "key-value server port")
	// 是否是加入一个已存在的集群
	join := flag.Bool("join", false, "join an existing cluster")
	flag.Parse()

	// 用于提议的channel
	proposeC := make(chan string)
	defer close(proposeC)
	// 用于配置变更的channel
	confChangeC := make(chan raftpb.ConfChange)
	defer close(confChangeC)

	// raft provides a commit stream for the proposals from the http api
	var kvs *kvstore
	// 应用提供的获取snapshot的函数(获取应用状态机的快照)
	getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
	// 创建raft实例,RaftNode是根据应用自身进行定义的数据结构
	commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
    // 创建kvstore
	kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

	// 启动http服务,用于处理存储请求
	serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

 kv存储引擎     

     上面代码中,首先定义了一个kv存储kvstore,其具体实现如下:


// a key-value store backed by raft
type kvstore struct {
	proposeC    chan<- string //  channel for proposing updates
	mu          sync.RWMutex // 读写锁,用于在产生快照的时候禁止写
	kvStore     map[string]string // current committed key-value pairs
	snapshotter *snap.Snapshotter // 用于存取raft产生的snapshot
}

type kv struct {
	Key string
	Val string
}

func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {

	s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
	// replay log into key-value map
	s.readCommits(commitC, errorC)
	// read commits from raft into kvStore map until error
    // 启动goroutine监听commit channel
	go s.readCommits(commitC, errorC)
	return s
}

// 查找
func (s *kvstore) Lookup(key string) (string, bool) {
	// 上锁
	s.mu.RLock()
	v, ok := s.kvStore[key]
	s.mu.RUnlock()
	return v, ok
}

// 提议一个kv值
func (s *kvstore) Propose(k string, v string) {
	var buf bytes.Buffer
	// 将kv序列化
	if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
		log.Fatal(err)
	}
	// 进行提议
	s.proposeC <- buf.String()
}

// 读一个raft commit上来的值,它会在一个单独的goroutine一直运行
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
	// 遍历commit channel
	for data := range commitC {
		// data如果为nil就表示要加载snapshot
		if data == nil {
			// done replaying log; new data incoming OR signaled to load snapshot
			// 加载snapshot
			snapshot, err := s.snapshotter.Load()
			// 如果没有snapshot就返回
			if err == snap.ErrNoSnapshot {
				return
			}

			// 如果是其他错误,就抛出异常
			if err != nil && err != snap.ErrNoSnapshot {
				log.Panic(err)
			}

			// 打印出snapshot的一些元信息
			log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
			// 从snapshot上恢复kv
			if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
				log.Panic(err)
			}
			continue
		}

		var dataKv kv
		dec := gob.NewDecoder(bytes.NewBufferString(*data))
		// 解码commit上来的数据到dataKv中
		if err := dec.Decode(&dataKv); err != nil {
			log.Fatalf("raftexample: could not decode message (%v)", err)
		}

		// 将kv值存储kv引擎
		s.mu.Lock()
		s.kvStore[dataKv.Key] = dataKv.Val
		s.mu.Unlock()
	}
	// 如果error channel有错误
	if err, ok := <-errorC; ok {
		log.Fatal(err)
	}
}

// 获取应用状态机的快照
func (s *kvstore) getSnapshot() ([]byte, error) {
	// 产生snapshaot的时候必须加上读写锁
	s.mu.Lock()
	defer s.mu.Unlock()
	// 将kv序列化为Json
	return json.Marshal(s.kvStore)
}

// 从snapshot中恢复kv存储(恢复应用状态机)
func (s *kvstore) recoverFromSnapshot(snapshot []byte) error {
	var store map[string]string
	// 对snapshot反序列化
	if err := json.Unmarshal(snapshot, &store); err != nil {
		return err
	}
	s.mu.Lock()
	s.kvStore = store
	s.mu.Unlock()
	return nil
}

       上面的代码比较简单,和一般的kv存储相比而言,唯一的不同就是,现在的kv存在多个副本(抗容灾能力),多个副本使用raft协议保证一致性,其大致原理如下:

 

             

      由于raft的实现比较复杂,如果将所有细节都写在同一片文章中会显得非常臃肿,因此我打算将其细分为一下几篇文章,以后周末只要有时间就至少写一篇(这件事情拖了很久了,草稿箱里还存了一大堆)。

 

本系列文章

1、etcd raft模块分析--kv存储引擎       

2、etcd raft模块分析--raft snapshot

3、etcd raft模块分析--raft wal日志

4、etcd raft模块分析--raft node

5、etcd raft模块分析--raft 协议

6、etcd raft模块分析--raft transport

7、etcd raft模块分析--raft storage

8、etcd raft模块分析--raft progress

 

© 著作权归作者所有

共有 人打赏支持
黑客画家
粉丝 111
博文 115
码字总数 371564
作品 0
杭州
高级程序员
加载中

评论(1)

梦朝思夕
梦朝思夕
坐等一系列化的的文章出来
Docker学习系列 之etcd(一)etcd简介

Etcd 按照官方介绍 Etcd is a distributed, consistent key-value store for shared configuration and service discovery etcd 是一个分布式一致性键值存储,用于共享配置和服务发现,专注于...

ponpon_ ⋅ 2017/02/16 ⋅ 0

etcd:从应用场景到实现原理的全方位解读

原文出自: http://www.infoq.com/cn/articles/etcd-interpretation-application-scenario-implement-principle 随着CoreOS和Kubernetes等项目在开源社区日益火热,它们项目中都用到的etcd组件...

ponpon_ ⋅ 2017/02/16 ⋅ 0

etcd:用于服务发现的键值存储系统

etcd是一个高可用的键值存储系统,主要用于共享配置和服务发现。etcd是由CoreOS开发并维护的,灵感来自于 ZooKeeper 和 Doozer,它使用Go语言编写,并通过Raft一致性算法处理日志复制以保证强...

夕水溪下 ⋅ 2014/07/29 ⋅ 7

CoreOS 发布 etcd v2.3.0,重点提升稳定性和可靠性

Etcd v2.3.0正式发布了!这次更新不仅伴随着稳定性和可靠性方面的提升,还为我们带来了新的v3版本API的预览版以及新的存储引擎,除此之外还有哪些诱人的特性呢?赶紧来看看吧! 今天,我们很...

oschina ⋅ 2016/03/28 ⋅ 4

etcd安装部署及数据同步MySQL

一、etcd说明及原理 二、etcd安装部署说明 三、etcd操作说明 四、python安装etcd 五、python-etcd使用说明 六、通过脚本获取本地的信息上传到etcd 七、通过脚本将etc的数据同步到mysql 一、e...

ckl893 ⋅ 2017/03/03 ⋅ 0

TiDB 将出席 Percona Live Amsterdam 2016

TiDB 数据库的公司 PingCAP 将出席 Percona Live Amsterdam 2016 日前,专注新型分布式数据库研发的科技公司 PingCAP 确定将受邀参加 “Percona Live Amsterdam 2016”。作为亚洲唯一一家受邀...

TiDB ⋅ 2016/09/28 ⋅ 12

使用 Rust 构建分布式 Key-Value Store

欢迎大家前往腾讯云社区,获取更多腾讯海量技术实践干货哦~ 引子 构建一个分布式 Key-Value Store 并不是一件容易的事情,我们需要考虑很多的问题,首先就是我们的系统到底需要提供什么样的功...

腾讯云社区 ⋅ 2017/11/20 ⋅ 0

etcd 2.0 发布,官方首个主要稳定版

etcd 2.0 发布了,这是官方首个主要的稳定版本,与上一个 0.4.6 版本比较,该版本值得关注的改进有: 内部协议的改进以避免意外的错误配置 命令用于轻松的从集群失败中恢复 命令用于轻松管理...

oschina ⋅ 2015/01/29 ⋅ 7

MySQL · 特性介绍 · 一些流行引擎存储格式简介

概述 本文简要介绍了一些存储引擎存储结构,包括InnoDB, TokuDB, RocksDB, TiDB, CockroachDB, 供大家对比分析 InnoDB InnoDB 底层存储结构为B+树,结构如下 B树的每个节点对应innodb的一个p...

阿里云RDS-数据库内核组 ⋅ 2017/10/05 ⋅ 0

Etcd+Confd实现Nginx配置文件自动管理

一、需求 我们使用Nginx做七层负载均衡,后端是Tomcat。项目采用灰度发布方式,每次项目升级,都要手动先从Nginx下摘掉一组,然后再升级这组,当项目快速迭代时,手动做这些操作显然会增加部...

李振良OK ⋅ 2017/03/24 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

vim基础-编辑模式-命令模式

编辑模式:可以编辑修改文件。编辑模式下 按“esc”键返回一般模式。 按一次“Insert”键 (一般在键盘回格键右边)作用和“i”一样表示“插入”。按两次“Insert”键表示“替换”,作用为:...

ZHENG-JY ⋅ 14分钟前 ⋅ 0

MaxCompute读取分析OSS非结构化数据的实践经验总结

摘要: 本文背景 很多行业的信息系统中,例如金融行业的信息系统,相当多的数据交互工作是通过传统的文本文件进行交互的。此外,很多系统的业务日志和系统日志由于各种原因并没有进入ELK之类...

阿里云云栖社区 ⋅ 18分钟前 ⋅ 0

Linux操作系统有何优势?Linux学习

  当今世界流行的操作系统有3大类,Linux、Mac OS和Windows操作系统,Linux操作系统因其开源、免费、跨平台、良好的界面等特性,深受广大程序员们的青睐!   Linux操作系统被广泛的应用于...

老男孩Linux培训 ⋅ 20分钟前 ⋅ 0

Spring Cloud Spring Boot mybatis分布式微服务云架构 开发Web应用

静态资源访问 在我们开发Web应用的时候,需要引用大量的js、css、图片等静态资源。 默认配置 Spring Boot默认提供静态资源目录位置需置于classpath下,目录名需符合如下规则: /static /pub...

itcloud ⋅ 24分钟前 ⋅ 0

6月19日任务 设置更改root密码、连接mysql、mysql常用命令

13.1 设置更改root密码 1. /usr/local/mysql/bin/mysql -uroot 设置环境变量 : export PATH=$PATH:/usr/local/mysql/bin/ 永久生效: vim /etc/profile 加入 export PATH=$PATH:/usr/local/m......

吕湘颖 ⋅ 26分钟前 ⋅ 0

MaxCompute读取分析OSS非结构化数据的实践经验总结

摘要: 本文背景 很多行业的信息系统中,例如金融行业的信息系统,相当多的数据交互工作是通过传统的文本文件进行交互的。此外,很多系统的业务日志和系统日志由于各种原因并没有进入ELK之类...

猫耳m ⋅ 27分钟前 ⋅ 0

Spring MVC controller,return重定向redirect:

@RequestMapping(value="/save",method=RequestMethod.POST)public String doSave(Course course) {log.debug("Info of Course");log.debug(ReflectionToStringBuilder.toStr......

颖伙虫 ⋅ 35分钟前 ⋅ 0

JavaSE——线程介绍

声明:本栏目所使用的素材都是凯哥学堂VIP学员所写,学员有权匿名,对文章有最终解释权;凯哥学堂旨在促进VIP学员互相学习的基础上公开笔记。 线程: 介绍:管线程叫多任务处理,首先你得知道...

凯哥学堂 ⋅ 38分钟前 ⋅ 0

ORM——使用spring jpa data实现逻辑删除

前言 在业务中是忌讳物理删除数据的,数据的这个对于一个IT公司可以说是最核心的资产,如果删除直接就物理删除,无疑是对核心资产的不重视,可能扯的比较远,本文最主要是想通过spring jpa ...

alexzhu592 ⋅ 44分钟前 ⋅ 0

CDN caching

Incapsula应用感知CDN使用智能分析和频率分析来动态缓存内容,并最大限度地提高效率。确保可直接从RAM获取最常访问的资源,而不依赖于较慢的访问机制。 1、 静态内容缓存 Incapsula缓存静态内...

上树的熊 ⋅ 47分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部