文档章节

kafka协调者

jacked
 jacked
发布于 2017/07/03 17:31
字数 2099
阅读 23
收藏 0

我们先假设初始时世界是混沌的还没有盘古的开天辟地,协调者也是一片荒芜人烟之地,没有保存任何状态,因为消费组的初始状态是Stable,在第一次的Rebalance时,正常的还没有向消费组注册过的消费者会执行状态为Stable而且memberId=UNKNOWN_MEMBER_ID条件分支。在第一次Rebalance之后,每个消费者都分配到了一个成员编号,系统又会进入Stable稳定状态(Stable稳定状态包括两种:一种是没有任何消费者的稳定状态,一种是有消费者的稳定状态)。因为所有消费者在执行一次JoinGroup后并不是说系统就一直保持这种不变的状态,有可能因为这样或那样的事件导致消费者要重新进行JoinGroup,这个时候因为之前JoinGroup过了每个消费者都是有成员编号的,处理方式肯定是不一样的。

所以定义一种事件驱动的状态机就很有必要了,这世界看起来是杂乱无章的,不过只要遵循着状态机的规则(万物生长的理论),任何事件都是有迹可循有路可走有条不紊地进行着。

 

private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,
    clientHost: String,sessionTimeoutMs: Int,protocolType: String,
    protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {
  if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {
    //protocolType对于消费者是consumer,注意这里的协议类型和PartitionAssignor协议不同哦
    //协议类型目前总共就两种消费者和Worker,而协议是PartitionAssignor分配算法
    responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
  } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
    //如果当前组没有记录该消费者,而该消费者却被分配了成员编号,则重置为未知成员,并让消费者重试
    responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
  } else { group.currentState match {
    case Dead =>
      responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
    case PreparingRebalance =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二个消费者在这里了!
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else {
        val member = group.get(memberId)
        updateMemberAndRebalance(group, member, protocols, responseCallback)
      }
    case Stable =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {  //1.初始时第一个消费者在这里!
        //如果消费者成员编号是未知的,则向GroupMetadata注册并被记录下来
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else { //3.第二次Rebalance时第一个消费者在这里,此时要分Leader还是普通的消费者了
        val member = group.get(memberId)
        if (memberId == group.leaderId || !member.matches(protocols)) {
          updateMemberAndRebalance(group, member, protocols, responseCallback)
        } else {
          responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,
            generationId = group.generationId,subProtocol = group.protocol,
            leaderId = group.leaderId,errorCode = Errors.NONE.code))
        }
      }
    }
    if (group.is(PreparingRebalance))
      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  }
}

addMemberAndRebalance和updateMemberAndRebalance会创建或更新MemberMetadata,并且会尝试调用prepareRebalance,消费组中只有一个消费者有机会调用prepareRebalance,并且一旦调用该方法,会将消费组状态更改为PreparingRebalance,就会使得下一个消费者只能从case PreparingRebalance入口进去了,假设第一个消费者是从Stable进入的,它更改了状态为PreparingRebalance,下一个消费者就不会从Stable进来的。不过进入Stable状态还要判断消费者是不是已经有了成员编号,通常是之前已经发生了Rebalance,这种影响也是比较巨大的,每个消费者走的路径跟第一次的Rebalance是完全不同的迷宫地图了。

1)第一次Rebalance如图6-18的上半部分:

  1. 第一个消费者,状态为Stable,没有编号,addMemberAndRebalance,成为Leader,执行prepareRebalance,更改状态为PreparingRebalance,创建DelayedJoin
  2. 第二个消费者,状态为PreparingRebalance,没有编号,addMemberAndRebalance(不执行prepareRebalance,因为在状态改变成PreparingRebalance后就不会被执行了);后面的消费者同第二个
  3. 所有消费者都要等协调者收集完所有成员编号在DelayedJoin完成时才会收到JoinGroup响应

 

图6-18 第一次和第二次Rebalance

2)第二次Rebalance,对于之前加入过的消费者都要成员编号如图6-18的下半部分:

  1. 第一个消费者是Leader,状态为Stable,有编号,updateMemberAndRebalance,更改状态为PreparingRebalance,创建DelayedJoin
  2. 第二个消费者,状态为PreparingRebalance,有编号,updateMemberAndRebalance;后面的消费者同第二个
  3. 所有消费者也要等待,因为其他消费者发送Join请求在Leader消费者之后。

3)不过如果有消费者在Leader之前发送又有点不一样了如图6-19:

  1. 第一个消费者不是Leader,状态为Stable,有编号,responseCallback,立即收到JoinGroup响应,好幸运啊!
  2. 第二个消费者如果也不是Leader,恭喜你,协调者也放过他,直接返回JoinGroup响应
  3. 第三个消费者是Leader(领导来了),状态为Stable(什么,你们之前的消费者竟然都没更新状态!,因为他们都没有add或update),有编号,updateMemberAndRebalance(还是我第一个调用add或update,看来还是只能我来更新状态),更改状态为PreparingRebalance,创建DelayedJoin
  4. 第四个消费者不是Leader,状态为PreparingRebalance,有编号,updateMemberAndRebalance(前面有领导,不好意思了,不能立即返回JoinGroup给你了,你们这些剩下的消费者都只能和领导一起返回了,算你们倒霉)

 

图6-19 Leader非第一个发送JoinGroup请求

4)如果第一个消费者不是Leader,也没有编号,说明这是一个新增的消费者,流程又不同了如图6-20:

  1. 第一个消费者不是Leader,状态为Stable,没有编号,addMemberAndRebalance,执行prepareRebalance(我是第一个调用add或update的哦,你们都别想跟我抢这个头彩了),更改状态为PreparingRebalance(我不是Leader但我骄傲啊),创建DelayedJoin(我抢到头彩,当然创建DelayedJoin的工作只能由我来完成了)
  2. 第二个消费者也不是Leader,恭喜你,协调者也放过他,直接返回JoinGroup响应
  3. 第三个消费者是Leader(领导来了),状态为PreparingRebalance(有个新来的不懂规矩,他已经把状态改了),有编号,updateMemberAndRebalance(有人已经改了,你老就不用费心思了),凡是没有立即返回响应的,都需要等待,领导也不例外
  4. 第四个消费者不是Leader(废话,只有一个领导,而且领导已经在前面了),不会立即返回响应(你看领导都排队呢)
  5. 虽然DelayedJoin是由没有编号的消费者创建,不过由于DelayedJoin是以消费组为级别的,所以不用担心,上一次选举出来的领导还是领导,协调者最终还是会把members交给领导,不会是给那个没有编号的消费者的,虽然说在他注册的时候已经有编号了,但是大家不认啊。不过领导其实不在意是谁开始触发prepareRebalance的,那个人要负责生成DelayedJoin,而不管是领导自己还是其他人一旦更改状态为PreparingRebalance,后面的消费者都要等待DelayedJoin完成了,而领导者总是要等待的,所以他当然无所谓了,因为他知道最后协调者总是会把members交给他的。

 

图6-20 新增消费组第一个发送JoinGroup请求

根据上面的几种场景总结下来状态机的规则和一些结论如下:

  1. 第一个调用addMemberAndRebalance或者updateMemberAndRebalance的会将状态改为PreparingRebalance,并且负责生成DelayedJoin
  2. 一旦状态进入PreparingRebalance,其他消费者就只能从PreparingRebalance状态入口进入,这里只有两种选择addMemberAndRebalance或者updateMemberAndRebalance,不过他们不会更改状态,也不会生成DelayedJoin
  3. 发生DelayedJoin之后,其他消费者的JoinGroup响应都会被延迟,因为如规则2中,他们只能调用add或update,无法立即调用responseCallback,所以就要和DelayedJoin的那个消费者一起等待
  4. 正常流程时,发生responseCallback的是存在成员编号的消费者在Leader之前发送了JoinGroup,或者新增加的消费者发送了JoinGroup请求之前
  5. 第一次Rebalance时,第一个消费者会创建DelayedJoin,之后的Rebalance,只有新增的消费者才有机会创建(如果他在Leader之前发送的话,如果在Leader之后就没有机会了),而普通消费者总是没有机会创建DelayedJoin的,因为状态为Stable时,他会直接开溜,有人(Leader或者新增加的消费者)创建了DelayedJoin之后,他又在那边怨天尤人只能等待

本文转载自:http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/#第六章_协调者

共有 人打赏支持
jacked
粉丝 0
博文 33
码字总数 5379
作品 0
朝阳
【干货】Kafka 事务特性分析

特性背景 消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。这个特性在0.10.2的版本是不支持的,从0.11版本开始才支持。华为云DMS率先提供Kafka 1.1.0的专...

中间件小哥
08/14
0
0
​福利丨又一波大数据新书推介,来做第一个吃螃蟹的人吧!

又到了每周发福利的时刻! 延续上期主题,本期给大家带来3本@图灵教育的大数据新书推荐 小编云: 经典读物固然能让收获有所保障 可新书却给人更多阅读的新鲜体验 来吧,一起做第一个吃螃蟹的...

DBAplus社群
2017/12/20
0
0
福利丨这也许是目前市面上最权威的Kafka学习指南!

Kafka,不是村上春树的《海边的卡夫卡》,不是《变形记》的作者弗兰兹·卡夫卡。程序员世界里的 Kafka,有三分之一的世界 500 强公司都在使用它发布和订阅、存储及实时地处理大规模流数据。 ...

DBAplus社群
01/15
0
0
kafka精确一次投递和事务消息学习整理

概述 今年6月发布的kafka 0.11.0.0包含两个比较大的特性,exactly once delivery和transactional transactional messaging。之前一直对事务这块比较感兴趣,所以抽空详细学习了一下,感觉收获...

xiaomin0322
05/22
0
0
使用 OpenWhisk 自建 Serverless 服务

摘要:Apache OpenWhisk(http://openwhisk.org/)是一个由IBM和Adobe驱动的开源项目,它是一个健壮的FaaS平台,可以部署在云或数据中心内。相比其他serverless项目,OpenWhisk是一个健壮的、...

phodal
2017/12/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

计算卷积神经网络浮点数运算量

前言 本文主要是介绍了,给定一个卷积神经网络的配置之后,如何大概估算它的浮点数运算量。 相关代码:CalFlops,基于MXNet框架的 Scala 接口实现的一个计算MXNet网络模型运算量的demo。 正文...

Ldpe2G
今天
1
0
Sql语言与MySql数据库

1. 数据库简介 1. 数据库,就是存储数据的仓库,只能通过sql语言来访问,数据库也是一个文件系统。通常,MySQL、Oracle等数据库,也被称为关系型数据库,其保存的不仅仅只是数据,还包括数据...

江左煤郎
今天
1
0
IDEA 取消自动import .*

打开设置 > Editor > Code Style > Java > Scheme Default > Imports ① 将 Class count to use import with "*" 改为 99 (导入同一个包的类超过这个数值自动变为 * ) ② 将 Names count ......

乔老哥
今天
3
0
PostGIS学习笔记(开篇)

PostGIS事实上算是笔者开始写博客的第一篇内容。而事实上那篇博文的内容并不丰富,笔者对PostGIS的了解仍然不多,然而17年在OSGeo课程学习时对PostGIS又有了进一步了解,并逐步发现它的强大。...

胖胖雕
今天
3
0
【Centos】在nginx服务器中配置php和mysql

接上一章《【Centos】利用Vultr服务器和namesilo布网》(https://my.oschina.net/u/3776619/blog/2051986),在Centos中配置好nginx,并在iptables中开启了80端口,和为了远程mysql操作方便开...

yongh701
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部