文档章节

Apache Kafka源码剖析:第5篇 业务API处理

强子1985
 强子1985
发布于 2017/08/13 15:31
字数 433
阅读 62
收藏 1

之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的。

-----------------------------------------------------------------------------------------------

业务线程属于 Kafka的API层,对请求的处理通过调用KafkaAPIs中的方法实现!

1 KafkaRequestHandler

首先我们得知道这个业务线程池是怎么创建的

回到KafkaServer.scala

        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
          config.numIoThreads)
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {

  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {//创建这么多个runnable,放到线程里执行
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
    Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
  }

取出请求执行的代码

def run() {
    while (true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = time.nanoseconds
          req = requestChannel.receiveRequest(300)
          val endTime = time.nanoseconds
          if (req != null)
            req.requestDequeueTimeNanos = endTime
          val idleTime = endTime - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if (req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
          latch.countDown()
          return
        }
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)
      } catch {
        case e: FatalExitError =>
          latch.countDown()
          Exit.exit(e.statusCode)
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }

这样就不难理解了吧,

可见,API层使用kafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程,它是1个简易版的线程池,其中创建了多个KafkaRequestHandler线程。

 

KafkaApis

是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler.Request分发到不同的handle*()方法里执行,见图:

因为函数太多,这里就不展开,后面碰到的时候再详细展开!

 

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 883
博文 1173
码字总数 870976
作品 8
南京
架构师
私信 提问
玩转KafkaIO与Flink

随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大数据应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发...

微笑向暖wx
2018/09/28
0
0
消息中间件—简谈Kafka中的NIO网络通信模型

文章摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型 前面写的两篇RocketMQ源码研究笔记系列: ...

癫狂侠
2018/07/16
0
0
一步步动手实现高并发的Reactor模型 —— Kafka底层如何充分利用多线程优势去处理网络I/O与业务分发

一、从《Apeche Kafka源码剖析》上搬来的概念和图 Kafka网络采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程的读者应该了解Java NIO提供了Reactor模式的API。常见的单线程Jav...

Anur
2018/12/13
0
7
架构设计:系统间通信(31)——其他消息中间件及场景应用(下1)

接上文:《架构设计:系统间通信(30)——Kafka及场景应用(中3)》 5、场景应用——电商平台:浏览记录收集功能 事件/日志收集系统是大中型软件不得不面对的话题。目前第三方业务系统对 事...

yinwenjie
2016/05/19
0
0
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe
2015/09/06
453
1

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周五乱弹 —— 姑娘馋的口水都留下来了。

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @且无需多言 :分享Fall Out Boy的单曲《Disloyal Order Of Water Buffaloes》 《Disloyal Order Of Water Buffaloes》- Fall Out Boy 手机党...

小小编辑
46分钟前
17
6
vue 对对象的属性进行修改时,不能渲染页面 vue.$set()

我在vue里的方法里给一个对象添加某个属性时,我console.log出来的是已经更改的object ,但是页面始终没有变化 原因如下: **受现代 JavaScript 的限制 (而且 Object.observe 也已经被废弃),...

Js_Mei
今天
2
0
开始看《Java学习笔记》

虽然书买了很久,但一直没看。这其中也写过一些Java程序,但都是基于IDE的帮助和对C#的理解来写的,感觉不踏实。 林信良的书写得蛮好的,能够帮助打好基础,看得出作者是比较用心的。 第1章概...

max佩恩
昨天
12
0
Redux 三大原则

1.单一数据源 在传统的MVC架构中,我们可以根据需要创建无数个Model,而Model之间可以互相监听、触发事件甚至循环或嵌套触发事件,这些在Redux中都是不被允许的。 因为在Redux的思想里,一个...

wenxingjun
昨天
9
0
跟我学Spring Cloud(Finchley版)-12-微服务容错三板斧

至此,我们已实现服务发现、负载均衡,同时,使用Feign也实现了良好的远程调用——我们的代码是可读、可维护的。理论上,我们现在已经能构建一个不错的分布式应用了,但微服务之间是通过网络...

周立_ITMuch
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部