kafka协调者
博客专区 > jacked 的博客 > 博客详情
kafka协调者
jacked 发表于8个月前
kafka协调者
  • 发表于 8个月前
  • 阅读 11
  • 收藏 0
  • 点赞 0
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

摘要: 消费组状态机

我们先假设初始时世界是混沌的还没有盘古的开天辟地,协调者也是一片荒芜人烟之地,没有保存任何状态,因为消费组的初始状态是Stable,在第一次的Rebalance时,正常的还没有向消费组注册过的消费者会执行状态为Stable而且memberId=UNKNOWN_MEMBER_ID条件分支。在第一次Rebalance之后,每个消费者都分配到了一个成员编号,系统又会进入Stable稳定状态(Stable稳定状态包括两种:一种是没有任何消费者的稳定状态,一种是有消费者的稳定状态)。因为所有消费者在执行一次JoinGroup后并不是说系统就一直保持这种不变的状态,有可能因为这样或那样的事件导致消费者要重新进行JoinGroup,这个时候因为之前JoinGroup过了每个消费者都是有成员编号的,处理方式肯定是不一样的。

所以定义一种事件驱动的状态机就很有必要了,这世界看起来是杂乱无章的,不过只要遵循着状态机的规则(万物生长的理论),任何事件都是有迹可循有路可走有条不紊地进行着。

 

private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,
    clientHost: String,sessionTimeoutMs: Int,protocolType: String,
    protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {
  if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {
    //protocolType对于消费者是consumer,注意这里的协议类型和PartitionAssignor协议不同哦
    //协议类型目前总共就两种消费者和Worker,而协议是PartitionAssignor分配算法
    responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
  } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
    //如果当前组没有记录该消费者,而该消费者却被分配了成员编号,则重置为未知成员,并让消费者重试
    responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
  } else { group.currentState match {
    case Dead =>
      responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
    case PreparingRebalance =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二个消费者在这里了!
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else {
        val member = group.get(memberId)
        updateMemberAndRebalance(group, member, protocols, responseCallback)
      }
    case Stable =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {  //1.初始时第一个消费者在这里!
        //如果消费者成员编号是未知的,则向GroupMetadata注册并被记录下来
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else { //3.第二次Rebalance时第一个消费者在这里,此时要分Leader还是普通的消费者了
        val member = group.get(memberId)
        if (memberId == group.leaderId || !member.matches(protocols)) {
          updateMemberAndRebalance(group, member, protocols, responseCallback)
        } else {
          responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,
            generationId = group.generationId,subProtocol = group.protocol,
            leaderId = group.leaderId,errorCode = Errors.NONE.code))
        }
      }
    }
    if (group.is(PreparingRebalance))
      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  }
}

addMemberAndRebalance和updateMemberAndRebalance会创建或更新MemberMetadata,并且会尝试调用prepareRebalance,消费组中只有一个消费者有机会调用prepareRebalance,并且一旦调用该方法,会将消费组状态更改为PreparingRebalance,就会使得下一个消费者只能从case PreparingRebalance入口进去了,假设第一个消费者是从Stable进入的,它更改了状态为PreparingRebalance,下一个消费者就不会从Stable进来的。不过进入Stable状态还要判断消费者是不是已经有了成员编号,通常是之前已经发生了Rebalance,这种影响也是比较巨大的,每个消费者走的路径跟第一次的Rebalance是完全不同的迷宫地图了。

1)第一次Rebalance如图6-18的上半部分:

  1. 第一个消费者,状态为Stable,没有编号,addMemberAndRebalance,成为Leader,执行prepareRebalance,更改状态为PreparingRebalance,创建DelayedJoin
  2. 第二个消费者,状态为PreparingRebalance,没有编号,addMemberAndRebalance(不执行prepareRebalance,因为在状态改变成PreparingRebalance后就不会被执行了);后面的消费者同第二个
  3. 所有消费者都要等协调者收集完所有成员编号在DelayedJoin完成时才会收到JoinGroup响应

 

图6-18 第一次和第二次Rebalance

2)第二次Rebalance,对于之前加入过的消费者都要成员编号如图6-18的下半部分:

  1. 第一个消费者是Leader,状态为Stable,有编号,updateMemberAndRebalance,更改状态为PreparingRebalance,创建DelayedJoin
  2. 第二个消费者,状态为PreparingRebalance,有编号,updateMemberAndRebalance;后面的消费者同第二个
  3. 所有消费者也要等待,因为其他消费者发送Join请求在Leader消费者之后。

3)不过如果有消费者在Leader之前发送又有点不一样了如图6-19:

  1. 第一个消费者不是Leader,状态为Stable,有编号,responseCallback,立即收到JoinGroup响应,好幸运啊!
  2. 第二个消费者如果也不是Leader,恭喜你,协调者也放过他,直接返回JoinGroup响应
  3. 第三个消费者是Leader(领导来了),状态为Stable(什么,你们之前的消费者竟然都没更新状态!,因为他们都没有add或update),有编号,updateMemberAndRebalance(还是我第一个调用add或update,看来还是只能我来更新状态),更改状态为PreparingRebalance,创建DelayedJoin
  4. 第四个消费者不是Leader,状态为PreparingRebalance,有编号,updateMemberAndRebalance(前面有领导,不好意思了,不能立即返回JoinGroup给你了,你们这些剩下的消费者都只能和领导一起返回了,算你们倒霉)

 

图6-19 Leader非第一个发送JoinGroup请求

4)如果第一个消费者不是Leader,也没有编号,说明这是一个新增的消费者,流程又不同了如图6-20:

  1. 第一个消费者不是Leader,状态为Stable,没有编号,addMemberAndRebalance,执行prepareRebalance(我是第一个调用add或update的哦,你们都别想跟我抢这个头彩了),更改状态为PreparingRebalance(我不是Leader但我骄傲啊),创建DelayedJoin(我抢到头彩,当然创建DelayedJoin的工作只能由我来完成了)
  2. 第二个消费者也不是Leader,恭喜你,协调者也放过他,直接返回JoinGroup响应
  3. 第三个消费者是Leader(领导来了),状态为PreparingRebalance(有个新来的不懂规矩,他已经把状态改了),有编号,updateMemberAndRebalance(有人已经改了,你老就不用费心思了),凡是没有立即返回响应的,都需要等待,领导也不例外
  4. 第四个消费者不是Leader(废话,只有一个领导,而且领导已经在前面了),不会立即返回响应(你看领导都排队呢)
  5. 虽然DelayedJoin是由没有编号的消费者创建,不过由于DelayedJoin是以消费组为级别的,所以不用担心,上一次选举出来的领导还是领导,协调者最终还是会把members交给领导,不会是给那个没有编号的消费者的,虽然说在他注册的时候已经有编号了,但是大家不认啊。不过领导其实不在意是谁开始触发prepareRebalance的,那个人要负责生成DelayedJoin,而不管是领导自己还是其他人一旦更改状态为PreparingRebalance,后面的消费者都要等待DelayedJoin完成了,而领导者总是要等待的,所以他当然无所谓了,因为他知道最后协调者总是会把members交给他的。

 

图6-20 新增消费组第一个发送JoinGroup请求

根据上面的几种场景总结下来状态机的规则和一些结论如下:

  1. 第一个调用addMemberAndRebalance或者updateMemberAndRebalance的会将状态改为PreparingRebalance,并且负责生成DelayedJoin
  2. 一旦状态进入PreparingRebalance,其他消费者就只能从PreparingRebalance状态入口进入,这里只有两种选择addMemberAndRebalance或者updateMemberAndRebalance,不过他们不会更改状态,也不会生成DelayedJoin
  3. 发生DelayedJoin之后,其他消费者的JoinGroup响应都会被延迟,因为如规则2中,他们只能调用add或update,无法立即调用responseCallback,所以就要和DelayedJoin的那个消费者一起等待
  4. 正常流程时,发生responseCallback的是存在成员编号的消费者在Leader之前发送了JoinGroup,或者新增加的消费者发送了JoinGroup请求之前
  5. 第一次Rebalance时,第一个消费者会创建DelayedJoin,之后的Rebalance,只有新增的消费者才有机会创建(如果他在Leader之前发送的话,如果在Leader之后就没有机会了),而普通消费者总是没有机会创建DelayedJoin的,因为状态为Stable时,他会直接开溜,有人(Leader或者新增加的消费者)创建了DelayedJoin之后,他又在那边怨天尤人只能等待
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 0
博文 27
码字总数 4274
×
jacked
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: