分布式任务编排之数据分片与路由

原创
01/23 11:32
阅读数 613

前段时间研究的temporal最近要上线了,为了能Hold住后续的线上问题,年前打算把核心的实现给看一下,后续出问题也好有思路排查,今天是第一篇集群原理之发现

集群机制

temporal在实现上并没有引入zk/etcd等第三方中间件,而是基于Uber/ringpop和数据库实现了整个集群的高可用,然后我们先介绍下这块的关键知识点

ringpop

ringpop是uber设计的基于hash环和swim协议的构建的用于构建分布式高可用系统的库, 其中基于swim gossip协议来实现分布式系统的通信、复制、容错、协调等高可用机制,并通过hash环来实现分布式系统中数据分布, 后面有时间再回来看吧

心跳机制

不同于其他的分布式系统,temporal的心跳机制是基于数据库(temporal有不同的持久化方式, 这里主要是说的基于mysql的持久化存储)来实现,简单的来说就是定时的去更新数据库里面的那条数据,然后再获取数据的时候,根据时间来进行过滤

为了保障集群的稳定性,temporal里面的心跳时间分为了三个部分:10s的心跳周期、5s抖动、5s时钟偏移,所以当前的temporal对网络上的要求特别高,在社区看到作者的回复100ms的延迟都可能会导致问题,所以更多的时候,可以采用local cluster的方式进行部署

Temporal的工程实践

这里我们先重点介绍下temporal里面的集群中的节点发现、ringpop、锁机制、数据路由四个机制,剩下的我们后期再看

节点发现

前面提到过Temporal的节点发现是基于数据库做的,所以这里其实本质上就是构建数据库的查询。请注意这里的时间是本机的当前时间,这就说如果出现时钟偏移、回退的情况,则集群可能会返回错误的节点,进而导致集群的不可用,所以一定要尽可能保障集群节点之间的时间偏移在20s内

    now := time.Now().UTC()
filter := &sqlplugin.ClusterMembershipFilter{
HostIDEquals: request.HostIDEquals,
RoleEquals: request.RoleEquals,
RecordExpiryAfter: now,
SessionStartedAfter: request.SessionStartedAfter,
MaxRecordCount: request.PageSize,
}
// 省略其他过滤条件

ringpop

在temporal中ringpop主要是用于当前集群的任务分区,即同时由不同的server控制不同区间任务的执行,由于数据都在数据库中,所以并不需要进行数据复制

has环的构建: Fingerprint32是一个当前ringpop中自带的hash函数, 而后面的replicaPoints当前为100,即每个成员添加后,会生成100个索引节点加入到红黑树中

func newHashRing() *hashring.HashRing {
return hashring.New(farm.Fingerprint32, replicaPoints)
}

节点成员变更: 因为不需要用到复制、通信等分布式场景所以在temporal中, 当节点有变更后每次都会生成对应的hashring用于存储当前最新的节点信息, 后续路由的时候就可以使用这个信息进行对于任务的路由了

    // 构建新的hash环
ring := newHashRing()
for _, addr := range addrs {
host := NewHostInfo(addr, r.getLabelsMap())
//TODOXP 添加到对应的hash环里面
ring.AddMembers(host)
}

r.membersMap = newMembersMap
r.lastRefreshTime = time.Now().UTC()
r.ringValue.Store(ring)

有了这个对应的hashring后,接下来就需要去分配每个节点应该负责哪些分片了

生成分片:根据配置的分片数量来生成对应的分片ID, 

    // Submit tasks to the channel.
for shardID := int32(1); shardID <= c.config.NumberOfShards; shardID++ {
shardActionCh <- shardID
if c.isShuttingDown() {
return
}
}

创建引擎:根据自己分配到的分片ID创建对应的任务引擎

if info.Identity() == c.GetHostInfo().Identity() {
// TODOXP 如果检测当前的槽位, 获取对应的engine
_, err1 := c.GetEngineForShard(shardID)
if err1 != nil {
c.metricsScope.IncCounter(metrics.GetEngineForShardErrorCounter)
c.logger.Error("Unable to create history shard engine", tag.Error(err1), tag.OperationFailed, tag.ShardID(shardID))
}
}

这个引擎是干啥的呢?其实主要是用于接收和响应worker的任务执行数据保存到持久化存储,这样就类似于zj/etcd的leader写入机制

数据路由

有了上面的铺垫其实就简单了,只需要根据命名空间和工作流ID,生成对应的hash,然后在通过ringpop中的hash环来解析对应的区间其实就知道该给那个server了,然后在讲请求转发给对应的Server就可以了

func WorkflowIDToHistoryShard(namespaceID, workflowID string, numberOfShards int32) int32 {
idBytes := []byte(namespaceID + "_" + workflowID)
hash := farm.Fingerprint32(idBytes)
return int32(hash%uint32(numberOfShards)) + 1 // ShardID starts with 1
}

吐槽

这周没太多时间去看这块的东西,早晨起来将之前休息时碎片读的东西整理了下,已经上午11点了, 先这样吧, 这块资料比较少,代码质量也比较差,主要是学习下工程实践以及分布式、容错、高可用机制即可,估计是由于项目比较早,代码看起来乱乱的。不过有问题在社区里都能得到解答, 版本更新也比较快,反正也没有更好的选择了,还是拥抱它吧。

云原生学习笔记地址:  https://www.yuque.com/baxiaoshi/tyado3微信号:baxiaoshi2020 公共号: 图解源码


本文分享自微信公众号 - 图解源码(sreguide)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部