文档章节

kafka协调者

jacked
 jacked
发布于 2017/07/03 17:31
字数 2099
阅读 17
收藏 0
点赞 0
评论 0

我们先假设初始时世界是混沌的还没有盘古的开天辟地,协调者也是一片荒芜人烟之地,没有保存任何状态,因为消费组的初始状态是Stable,在第一次的Rebalance时,正常的还没有向消费组注册过的消费者会执行状态为Stable而且memberId=UNKNOWN_MEMBER_ID条件分支。在第一次Rebalance之后,每个消费者都分配到了一个成员编号,系统又会进入Stable稳定状态(Stable稳定状态包括两种:一种是没有任何消费者的稳定状态,一种是有消费者的稳定状态)。因为所有消费者在执行一次JoinGroup后并不是说系统就一直保持这种不变的状态,有可能因为这样或那样的事件导致消费者要重新进行JoinGroup,这个时候因为之前JoinGroup过了每个消费者都是有成员编号的,处理方式肯定是不一样的。

所以定义一种事件驱动的状态机就很有必要了,这世界看起来是杂乱无章的,不过只要遵循着状态机的规则(万物生长的理论),任何事件都是有迹可循有路可走有条不紊地进行着。

 

private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,
    clientHost: String,sessionTimeoutMs: Int,protocolType: String,
    protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {
  if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {
    //protocolType对于消费者是consumer,注意这里的协议类型和PartitionAssignor协议不同哦
    //协议类型目前总共就两种消费者和Worker,而协议是PartitionAssignor分配算法
    responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
  } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
    //如果当前组没有记录该消费者,而该消费者却被分配了成员编号,则重置为未知成员,并让消费者重试
    responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
  } else { group.currentState match {
    case Dead =>
      responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
    case PreparingRebalance =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二个消费者在这里了!
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else {
        val member = group.get(memberId)
        updateMemberAndRebalance(group, member, protocols, responseCallback)
      }
    case Stable =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {  //1.初始时第一个消费者在这里!
        //如果消费者成员编号是未知的,则向GroupMetadata注册并被记录下来
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else { //3.第二次Rebalance时第一个消费者在这里,此时要分Leader还是普通的消费者了
        val member = group.get(memberId)
        if (memberId == group.leaderId || !member.matches(protocols)) {
          updateMemberAndRebalance(group, member, protocols, responseCallback)
        } else {
          responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,
            generationId = group.generationId,subProtocol = group.protocol,
            leaderId = group.leaderId,errorCode = Errors.NONE.code))
        }
      }
    }
    if (group.is(PreparingRebalance))
      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  }
}

addMemberAndRebalance和updateMemberAndRebalance会创建或更新MemberMetadata,并且会尝试调用prepareRebalance,消费组中只有一个消费者有机会调用prepareRebalance,并且一旦调用该方法,会将消费组状态更改为PreparingRebalance,就会使得下一个消费者只能从case PreparingRebalance入口进去了,假设第一个消费者是从Stable进入的,它更改了状态为PreparingRebalance,下一个消费者就不会从Stable进来的。不过进入Stable状态还要判断消费者是不是已经有了成员编号,通常是之前已经发生了Rebalance,这种影响也是比较巨大的,每个消费者走的路径跟第一次的Rebalance是完全不同的迷宫地图了。

1)第一次Rebalance如图6-18的上半部分:

  1. 第一个消费者,状态为Stable,没有编号,addMemberAndRebalance,成为Leader,执行prepareRebalance,更改状态为PreparingRebalance,创建DelayedJoin
  2. 第二个消费者,状态为PreparingRebalance,没有编号,addMemberAndRebalance(不执行prepareRebalance,因为在状态改变成PreparingRebalance后就不会被执行了);后面的消费者同第二个
  3. 所有消费者都要等协调者收集完所有成员编号在DelayedJoin完成时才会收到JoinGroup响应

 

图6-18 第一次和第二次Rebalance

2)第二次Rebalance,对于之前加入过的消费者都要成员编号如图6-18的下半部分:

  1. 第一个消费者是Leader,状态为Stable,有编号,updateMemberAndRebalance,更改状态为PreparingRebalance,创建DelayedJoin
  2. 第二个消费者,状态为PreparingRebalance,有编号,updateMemberAndRebalance;后面的消费者同第二个
  3. 所有消费者也要等待,因为其他消费者发送Join请求在Leader消费者之后。

3)不过如果有消费者在Leader之前发送又有点不一样了如图6-19:

  1. 第一个消费者不是Leader,状态为Stable,有编号,responseCallback,立即收到JoinGroup响应,好幸运啊!
  2. 第二个消费者如果也不是Leader,恭喜你,协调者也放过他,直接返回JoinGroup响应
  3. 第三个消费者是Leader(领导来了),状态为Stable(什么,你们之前的消费者竟然都没更新状态!,因为他们都没有add或update),有编号,updateMemberAndRebalance(还是我第一个调用add或update,看来还是只能我来更新状态),更改状态为PreparingRebalance,创建DelayedJoin
  4. 第四个消费者不是Leader,状态为PreparingRebalance,有编号,updateMemberAndRebalance(前面有领导,不好意思了,不能立即返回JoinGroup给你了,你们这些剩下的消费者都只能和领导一起返回了,算你们倒霉)

 

图6-19 Leader非第一个发送JoinGroup请求

4)如果第一个消费者不是Leader,也没有编号,说明这是一个新增的消费者,流程又不同了如图6-20:

  1. 第一个消费者不是Leader,状态为Stable,没有编号,addMemberAndRebalance,执行prepareRebalance(我是第一个调用add或update的哦,你们都别想跟我抢这个头彩了),更改状态为PreparingRebalance(我不是Leader但我骄傲啊),创建DelayedJoin(我抢到头彩,当然创建DelayedJoin的工作只能由我来完成了)
  2. 第二个消费者也不是Leader,恭喜你,协调者也放过他,直接返回JoinGroup响应
  3. 第三个消费者是Leader(领导来了),状态为PreparingRebalance(有个新来的不懂规矩,他已经把状态改了),有编号,updateMemberAndRebalance(有人已经改了,你老就不用费心思了),凡是没有立即返回响应的,都需要等待,领导也不例外
  4. 第四个消费者不是Leader(废话,只有一个领导,而且领导已经在前面了),不会立即返回响应(你看领导都排队呢)
  5. 虽然DelayedJoin是由没有编号的消费者创建,不过由于DelayedJoin是以消费组为级别的,所以不用担心,上一次选举出来的领导还是领导,协调者最终还是会把members交给领导,不会是给那个没有编号的消费者的,虽然说在他注册的时候已经有编号了,但是大家不认啊。不过领导其实不在意是谁开始触发prepareRebalance的,那个人要负责生成DelayedJoin,而不管是领导自己还是其他人一旦更改状态为PreparingRebalance,后面的消费者都要等待DelayedJoin完成了,而领导者总是要等待的,所以他当然无所谓了,因为他知道最后协调者总是会把members交给他的。

 

图6-20 新增消费组第一个发送JoinGroup请求

根据上面的几种场景总结下来状态机的规则和一些结论如下:

  1. 第一个调用addMemberAndRebalance或者updateMemberAndRebalance的会将状态改为PreparingRebalance,并且负责生成DelayedJoin
  2. 一旦状态进入PreparingRebalance,其他消费者就只能从PreparingRebalance状态入口进入,这里只有两种选择addMemberAndRebalance或者updateMemberAndRebalance,不过他们不会更改状态,也不会生成DelayedJoin
  3. 发生DelayedJoin之后,其他消费者的JoinGroup响应都会被延迟,因为如规则2中,他们只能调用add或update,无法立即调用responseCallback,所以就要和DelayedJoin的那个消费者一起等待
  4. 正常流程时,发生responseCallback的是存在成员编号的消费者在Leader之前发送了JoinGroup,或者新增加的消费者发送了JoinGroup请求之前
  5. 第一次Rebalance时,第一个消费者会创建DelayedJoin,之后的Rebalance,只有新增的消费者才有机会创建(如果他在Leader之前发送的话,如果在Leader之后就没有机会了),而普通消费者总是没有机会创建DelayedJoin的,因为状态为Stable时,他会直接开溜,有人(Leader或者新增加的消费者)创建了DelayedJoin之后,他又在那边怨天尤人只能等待

© 著作权归作者所有

共有 人打赏支持
jacked
粉丝 0
博文 31
码字总数 4850
作品 0
朝阳
​福利丨又一波大数据新书推介,来做第一个吃螃蟹的人吧!

又到了每周发福利的时刻! 延续上期主题,本期给大家带来3本@图灵教育的大数据新书推荐 小编云: 经典读物固然能让收获有所保障 可新书却给人更多阅读的新鲜体验 来吧,一起做第一个吃螃蟹的...

DBAplus社群 ⋅ 2017/12/20 ⋅ 0

福利丨这也许是目前市面上最权威的Kafka学习指南!

Kafka,不是村上春树的《海边的卡夫卡》,不是《变形记》的作者弗兰兹·卡夫卡。程序员世界里的 Kafka,有三分之一的世界 500 强公司都在使用它发布和订阅、存储及实时地处理大规模流数据。 ...

DBAplus社群 ⋅ 01/15 ⋅ 0

kafka精确一次投递和事务消息学习整理

概述 今年6月发布的kafka 0.11.0.0包含两个比较大的特性,exactly once delivery和transactional transactional messaging。之前一直对事务这块比较感兴趣,所以抽空详细学习了一下,感觉收获...

xiaomin0322 ⋅ 05/22 ⋅ 0

使用 OpenWhisk 自建 Serverless 服务

摘要:Apache OpenWhisk(http://openwhisk.org/)是一个由IBM和Adobe驱动的开源项目,它是一个健壮的FaaS平台,可以部署在云或数据中心内。相比其他serverless项目,OpenWhisk是一个健壮的、...

phodal ⋅ 2017/12/14 ⋅ 0

三:分布式事务一致性协议2pc和3pc

一:分布式一致性协议 --->对于一个分布式系统进行架构设计的过程中,往往会在系统的可用性和数据一致性之间进行反复的权衡,于是就产生了一系列的一致性协议。 --->长期探索涌现出一大批经典...

无信不立 ⋅ 2016/02/17 ⋅ 0

分布式两阶段提交和三阶段提交

随着大型网站的各种高并发访问、海量数据处理等场景越来越多,如何实现网站的高可用、易伸缩、可扩展、安全等目标就显得越来越重要。 为了解决这样一系列问题,大型网站的架构也在不断发展。...

tantexian ⋅ 2016/11/27 ⋅ 0

分布式一致性算法2PC和3PC

  为了解决分布式一致性问题,产生了不少经典的分布式一致性算法,本文将介绍其中的2PC和3PC。2PC即Two-Phase Commit,译为二阶段提交协议。3PC即Three-Phase Commit,译为三阶段提交协议。...

莫名2013 ⋅ 01/08 ⋅ 0

分布式事务XA实现数据一致性的协议与原理

我们在实用单点数据库的一般的关系型数据库都有实现事务ACID属性的方法,一般使用诸如UNDO,REDO等。但在使用分布式数据库的时候,如何来保证数据的一致性呢?典型的影响场景就是大家已经听到...

rewiner22 ⋅ 2017/07/08 ⋅ 0

分布式事务:两阶段提交与三阶段提交

在分布式系统中著有 CAP 理论,该理论由加州大学伯克利分校的 Eric Brewer 教授提出,阐述了在一个分布式系统中不可能同时满足 一致性(Consistency)、可用性(Availability),以及 分区容...

zhenchao ⋅ 2016/08/23 ⋅ 2

【分布式事务系列九】聊聊分布式事务

分布式事务 【分布式事务系列一】提出疑问和研究过程 【分布式事务系列二】Spring事务管理器PlatformTransactionManager 【分布式事务系列三】Spring的事务体系 【分布式事务系列四】分布式事...

陶邦仁 ⋅ 2015/12/21 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

内核线程、轻量级进程、用户线程

线程与进程概念 在现代操作系统中,进程支持多线程。 进程是资源管理的最小单元; 线程是程序执行的最小单元。 即线程作为调度和分配的基本单位,进程作为资源分配的基本单位 一个进程的组成...

117 ⋅ 21分钟前 ⋅ 0

elasticsearch2.4.6升级为elasticsearch-5.5.0的经历

将elasticsearch-5.5.0 中的配置 path.data 指向原来的数据路径 即 path.data: /usr/local/src/elasticsearch-2.4.6/data 注意: elasticsearch-5.5.0 需要将jdk版本升级到1.8...

晨猫 ⋅ 22分钟前 ⋅ 1

lvm讲解 磁盘故障小案例

1

oschina130111 ⋅ 26分钟前 ⋅ 0

那些提升开发人员工作效率的在线工具

本文转载自公众号 Hollis 作为一个Java开发人员,经常要和各种各样的工具打交道,除了我们常用的IDE工具以外,其实还有很多工具是我们在日常开发及学习过程中要经常使用到的。 Hollis偏爱使用...

时刻在奔跑 ⋅ 39分钟前 ⋅ 0

restful风格 实现DELETE PUT请求 的web.xml的配置

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframe......

泉天下 ⋅ 44分钟前 ⋅ 0

Shell数组

Shell数组 Shell在编程方面比Windows批处理强大很多,无论是在循环、运算。 bash支持一维数组(不支持多维数组),并且没有限定数组的大小。类似与C语言,数组元素的下标由0开始编号。获取数...

蜗牛奔跑 ⋅ 53分钟前 ⋅ 0

nmap为了开发方便 可以做简单的修改

因为nmap扫描是默认使用的是nse脚本,但是在开发的过程中需要修改后缀(主要是因为后缀为lua才能显示高亮,所以这里用一个取巧的办法) nse_main.lua文件中我们找到如下代码 local t, path = cn...

超级大黑猫 ⋅ 57分钟前 ⋅ 0

springmvc获取axios数据为null情况

场景:前端用了vue没有用ajax与后台通信,用了axios,但是在代码运行过程中发现axios传递到后台的值接受到数据为null。 问题原因:此处的问题在与axios返回给后台的数据为json类型的,后台接...

王子城 ⋅ 59分钟前 ⋅ 0

hadoop技术入门学习之发行版选择

经常会看到这样的问题:零基础学习hadoop难不难?有的人回答说:零基础学习hadoop,没有想象的那么难,也没有想象的那么容易。看到这样的答案不免觉得有些尴尬,这个问题算是白问了,因为这个...

左手的倒影 ⋅ 59分钟前 ⋅ 0

806. Number of Lines To Write String - LeetCode

Question 806. Number of Lines To Write String Solution 思路:注意一点,如果a长度为4,当前行已经用了98个单元,要另起一行。 Java实现: public int[] numberOfLines(int[] widths, Str...

yysue ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部