文档章节

kafka控制层-controller启动与选举

T
 Thinking--
发布于 2017/08/27 00:21
字数 974
阅读 81
收藏 0

KafkaServer启动

在startup方法里,会实例化KafkaController

class KafkaServer {
  def startup() {
        .......
        kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
        kafkaController.startup()
        ......
    }
}

KafkaController重要属性

  • activeControllerId:获取controller的主broker的id
  • eventManager: ControllerEventManager实例,负责处理事件

ControllerEventManager

ControllerEventManager实例化一个线程,用来单独处理KafkaController发送的事件。

class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
                             eventProcessedListener: ControllerEvent => Unit) {
   // eventProcessedListener是event的回调函数

  @volatile private var _state: ControllerState = ControllerState.Idle

  // event队列
  private val queue = new LinkedBlockingQueue[ControllerEvent]
  private val thread = new ControllerEventThread("controller-event-thread")

  def state: ControllerState = _state

  def start(): Unit = thread.start()

  def close(): Unit = thread.shutdown()
   // KafkaController调用put方法,添加事件
  def put(event: ControllerEvent): Unit = queue.put(event)

  class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
    override def doWork(): Unit = {
      // 从queue取出event
      val controllerEvent = queue.take()
      // 更新状态为event的状态
      _state = controllerEvent.state

      try {
        // event自身包含process方法,定义了如何处理event
        rateAndTimeMetrics(state).time {
          controllerEvent.process()
        }
      } catch {
        case e: Throwable => error(s"Error processing event $controllerEvent", e)
      }
      // 回调
      try eventProcessedListener(controllerEvent)
      catch {
        case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
      }
      // 更新状态为Idle
      _state = ControllerState.Idle
    }
  }

}

事件基类

state表示处理event时,controller的状态。

process表示处理event的程序

sealed trait ControllerEvent {
  def state: ControllerState
  def process(): Unit
}

ControllerState

ControllerState目前有10种

object ControllerState {

  case object Idle extends ControllerState {
    def value = 0
    override protected def hasRateAndTimeMetric: Boolean = false
  }

  case object ControllerChange extends ControllerState {
    def value = 1
  }

  case object BrokerChange extends ControllerState {
    def value = 2
    override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs")
  }

  case object TopicChange extends ControllerState {
    def value = 3
  }

  case object TopicDeletion extends ControllerState {
    def value = 4
  }

  case object PartitionReassignment extends ControllerState {
    def value = 5
  }

  case object AutoLeaderBalance extends ControllerState {
    def value = 6
  }

  case object ManualLeaderBalance extends ControllerState {
    def value = 7
  }

  case object ControlledShutdown extends ControllerState {
    def value = 8
  }

  case object IsrChange extends ControllerState {
    def value = 9
  }

Startup事件

KafkaController在启动的时候,会生成Startup事件,添加到EventManage

class KafkaController {
  def startup() = {
    eventManager.put(Startup)
    eventManager.start()
  }
}

Startup定义

  case object Startup extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      // 注册zookeeper的连接事件
      registerSessionExpirationListener()
      // 注册"/controller"节点的数据变化事件(节点存储主controller的id,数据变化意味着主controller发送变化)
      registerControllerChangeListener()
      // 选举,尝试争取controller 
      elect()
    }

  private def registerSessionExpirationListener() = {
    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
  }

class SessionExpirationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkStateListener with Logging {

  @throws[Exception]
  override def handleNewSession(): Unit = {
    // 当重新连接到zookeeper时,发送Reelect事件
    eventManager.put(controller.Reelect)
  }
}

  private def registerControllerChangeListener() = {
    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager))
  }

  }

#Reelect事件 Reelect表明竞选主controller

  case object Reelect extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      val wasActiveBeforeChange = isActive
      // 获取主controller的id
      activeControllerId = getControllerID()
      // 先前是主controller,现在是从controller
      if (wasActiveBeforeChange && !isActive) {
        // 关闭先前zookeeper事件的监听
        onControllerResignation()
      }
      // 尝试竞选主controller
      elect()
    }
  }

  def elect(): Unit = {
    val timestamp = time.milliseconds
    // 节点数据,为brokerId + timestamp
    val electString = ZkUtils.controllerZkData(config.brokerId, timestamp)

    activeControllerId = getControllerID()
    // activeControllerId为-1,表示主controller还没有
    if (activeControllerId != -1) {
      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
      return
    }

    try {
      // 竞争创建临时节点。如果未成功,会抛出ZkNodeExistsException异常
      val zkCheckedEphemeral = new ZKCheckedEphemeral(ZkUtils.ControllerPath,
                                                      electString,
                                                      controllerContext.zkUtils.zkConnection.getZookeeper,
                                                      controllerContext.zkUtils.isSecure)
      zkCheckedEphemeral.create()
      info(config.brokerId + " successfully elected as the controller")
      activeControllerId = config.brokerId
      // 注册监听事件,这些都是主controller负责
      onControllerFailover()
    } catch {
      case _: ZkNodeExistsException =>
        activeControllerId = getControllerID
        if (activeControllerId != -1)
          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
        else
          warn("A controller has been elected but just resigned, this will result in another round of election")

      case e2: Throwable =>
        error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
        triggerControllerMove()
    }
  }

  def onControllerFailover() {
    info("Broker %d starting become controller state transition".format(config.brokerId))
    readControllerEpochFromZookeeper()
    incrementControllerEpoch()

    // 分区分配事件
    registerPartitionReassignmentListener()
    // 通知事件
    registerIsrChangeNotificationListener()
    // replica选举事件
    registerPreferredReplicaElectionListener()
    // topic变化事件
    registerTopicChangeListener()
    // topic删除事件
    registerTopicDeletionListener()
    // broker事件
    registerBrokerChangeListener()

    // 初始化context
    initializeControllerContext()
    
    val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
    topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)

    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    // 启动副本状态机
    replicaStateMachine.startup()
    // 启动分片状态机
    partitionStateMachine.startup()

    // 监听partition改变事件
    controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
    info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
    maybeTriggerPartitionReassignment()
    topicDeletionManager.tryTopicDeletion()
    val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
    onPreferredReplicaElection(pendingPreferredReplicaElections)
    info("starting the controller scheduler")
    kafkaScheduler.startup()
    if (config.autoLeaderRebalanceEnable) {
      scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
    }
  }

概括

KafkaController负责集群中的一些事件,比如topic的新增和删除。

KafkaController在kafka集群中,扮演着很重要的角色。所以为了保证controller的高可用,集群的每个节点,都会运行KafkaController服务,通过zookeeper的选举,保证任何时刻只有一个主的controller。

KafkaController在启动的时候,就会尝试竞争主controller。如果竞选成功,就会在zookeeper中注册监听事件。

© 著作权归作者所有

上一篇: sqoop2-数据源
下一篇: kafka存储-segment
T
粉丝 6
博文 49
码字总数 44403
作品 0
武汉
私信 提问
Kafka Partition Leader选主机制

Kafka Partition Leader选主机制 更多干货 分布式实战(干货) spring cloud 实战(干货) mybatis 实战(干货) spring boot 实战(干货) React 入门实战(干货) 构建中小型互联网企业架构...

tantexian
2018/11/14
300
0
深入学习Kafka:Leader Election - Kafka集群Leader选举过程分析

本文所讲的Leader是指集群中的Controller,而不是各个Partition的Leader。 为什么要有Leader? 在Kafka早期版本,对于分区和副本的状态的管理依赖于zookeeper的Watcher和队列:每一个broker都...

墨竹
2017/11/30
0
0
Kafka科普系列 | 原来Kafka中的选举有这么多?

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/89369160 面试官在考查你Kafka知识的时候很可能会故弄玄虚的问你一下:Kaf...

朱小厮
04/18
0
0
apache kafka系列之在zookeeper中存储结构

1.topic注册信息 /brokers/topics/[topic] : 存储某个topic的partitions所有分配信息 2.partition状态信息 /brokers/topics/[topic]/partitions/[0...N] 其中[0..N]表示partition索引号 /bro......

岩之有理
2014/12/24
13.4K
0
深入学习Kafka:KafkaController

scala class KafkaController { private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation......

墨竹
2017/11/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

nginx学习笔记

中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯。 是连接两个独立应用程序或独立系统的软件。 web请求通过中间件可以直接调用操作系统,也可以经过中间件把请求分发到多...

码农实战
今天
5
0
Spring Security 实战干货:玩转自定义登录

1. 前言 前面的关于 Spring Security 相关的文章只是一个预热。为了接下来更好的实战,如果你错过了请从 Spring Security 实战系列 开始。安全访问的第一步就是认证(Authentication),认证...

码农小胖哥
今天
9
0
JAVA 实现雪花算法生成唯一订单号工具类

import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.util.Calendar;/** * Default distributed primary key generator. * * <p> * Use snowflake......

huangkejie
昨天
12
0
PhotoShop 色调:RGB/CMYK 颜色模式

一·、 RGB : 三原色:红绿蓝 1.通道:通道中的红绿蓝通道分别对应的是红绿蓝三种原色(RGB)的显示范围 1.差值模式能模拟三种原色叠加之后的效果 2.添加-颜色曲线:调整图像RGB颜色----R色增强...

东方墨天
昨天
11
1
将博客搬至CSDN

将博客搬至CSDN

算法与编程之美
昨天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部