文档章节

Apache Kafka源码剖析:第5篇 业务API处理

强子大叔的码田
 强子大叔的码田
发布于 2017/08/13 15:31
字数 433
阅读 174
收藏 2

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的。

-----------------------------------------------------------------------------------------------

业务线程属于 Kafka的API层,对请求的处理通过调用KafkaAPIs中的方法实现!

1 KafkaRequestHandler

首先我们得知道这个业务线程池是怎么创建的

回到KafkaServer.scala

        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
          config.numIoThreads)
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {

  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {//创建这么多个runnable,放到线程里执行
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
    Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
  }

取出请求执行的代码

def run() {
    while (true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = time.nanoseconds
          req = requestChannel.receiveRequest(300)
          val endTime = time.nanoseconds
          if (req != null)
            req.requestDequeueTimeNanos = endTime
          val idleTime = endTime - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if (req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
          latch.countDown()
          return
        }
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)
      } catch {
        case e: FatalExitError =>
          latch.countDown()
          Exit.exit(e.statusCode)
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }

这样就不难理解了吧,

可见,API层使用kafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程,它是1个简易版的线程池,其中创建了多个KafkaRequestHandler线程。

 

KafkaApis

是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler.Request分发到不同的handle*()方法里执行,见图:

因为函数太多,这里就不展开,后面碰到的时候再详细展开!

 

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 923
博文 1630
码字总数 1283214
作品 9
南京
架构师
私信 提问
Apache Beam实战指南 | 手把手教你玩转KafkaIO与Flink

https://mp.weixin.qq.com/s?biz=MzU1NDA4NjU2MA==&mid=2247492538&idx=2&sn=9a2bd9fe2d7fd681c10ebd368ef81c9c&chksm=fbea5a75cc9dd3636c148ebe6e296621d0c07132938a62f0b3643f34af414b3fd8......

osc_azsn5lm2
2018/09/05
25
0
玩转KafkaIO与Flink

随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大数据应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发...

微笑向暖wx
2018/09/28
297
0
王家林大咖清华大学新书Spark第二版已上市:前浪致 Spark + AI 后浪

王家林大咖清华大学新书Spark第二版已上市:前浪 致 Spark + AI 后浪 大咖心声 新书图片 新书介绍 编辑推荐 内容简介 作者简介 新书目录 第二版前言 第一版前言 新书案例讲解 第二版网购链接...

段智华
05/25
0
0
一步步动手实现高并发的Reactor模型 —— Kafka底层如何充分利用多线程优势去处理网络I/O与业务分发

一、从《Apeche Kafka源码剖析》上搬来的概念和图 Kafka网络采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程的读者应该了解Java NIO提供了Reactor模式的API。常见的单线程Jav...

Anur
2018/12/13
4.6K
7
消息中间件—简谈Kafka中的NIO网络通信模型

文章摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型 前面写的两篇RocketMQ源码研究笔记系列: ...

癫狂侠
2018/07/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Windows 10 中安装 Anaconda 3

首先通过下面链接地址下载 Anaconda 的个人版本。 https://www.anaconda.com/products/individual 从上面下载的地址中,选择你需要的版本,目前 Windows 应该基本上都是 64 位的了。 在你下载...

honeymoose
30分钟前
19
0
如何禁用textarea的resizable属性? - How do I disable the resizable property of a textarea?

问题: I want to disable the resizable property of a textarea . 我想禁用textarea的resizable属性。 Currently, I can resize a textarea by clicking on the bottom right corner of t......

技术盛宴
30分钟前
25
0
即时通信E聊SDK简介(1)

2.简介: E聊SDK是一套适用于PC端, 移动端的即时通讯解决方案,源代码开放。E聊整合了即时通讯的基础能力,使用E聊,您可以让您的应用快速接入即时聊天的功能。E聊现已适配PC Web, 移动Web, ...

E聊
42分钟前
9
0
多个知乎账号一起登陆,同时管理运营的神器!

随着互联网生态的优化,从2016年开始,信息内容产业相当有关注度,其和粉丝互动起来很方便、流量大到惊人、可长远发展等等优势,迅速聚集了无数企业和个人,为了有非常全面的播放数据,大家通...

易媒助手
45分钟前
22
0
403禁止vs 401未经授权的HTTP响应 - 403 Forbidden vs 401 Unauthorized HTTP responses

问题: For a web page that exists, but for which a user does not have sufficient privileges (they are not logged in or do not belong to the proper user group), what is the prope......

fyin1314
今天
19
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部