Apache Kafka源码剖析:第5篇 业务API处理
Apache Kafka源码剖析:第5篇 业务API处理
强子哥哥 发表于4个月前
Apache Kafka源码剖析:第5篇 业务API处理
  • 发表于 4个月前
  • 阅读 41
  • 收藏 1
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的。

-----------------------------------------------------------------------------------------------

业务线程属于 Kafka的API层,对请求的处理通过调用KafkaAPIs中的方法实现!

1 KafkaRequestHandler

首先我们得知道这个业务线程池是怎么创建的

回到KafkaServer.scala

        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
          config.numIoThreads)
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {

  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {//创建这么多个runnable,放到线程里执行
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
    Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
  }

取出请求执行的代码

def run() {
    while (true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = time.nanoseconds
          req = requestChannel.receiveRequest(300)
          val endTime = time.nanoseconds
          if (req != null)
            req.requestDequeueTimeNanos = endTime
          val idleTime = endTime - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if (req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
          latch.countDown()
          return
        }
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)
      } catch {
        case e: FatalExitError =>
          latch.countDown()
          Exit.exit(e.statusCode)
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }

这样就不难理解了吧,

可见,API层使用kafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程,它是1个简易版的线程池,其中创建了多个KafkaRequestHandler线程。

 

KafkaApis

是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler.Request分发到不同的handle*()方法里执行,见图:

因为函数太多,这里就不展开,后面碰到的时候再详细展开!

 

标签: Kafka
共有 人打赏支持
强子哥哥
粉丝 835
博文 686
码字总数 711916
作品 7
评论 (0)
×
强子哥哥
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: