文档章节

Apache Kafka源码剖析:第5篇 业务API处理

强子1985
 强子1985
发布于 2017/08/13 15:31
字数 433
阅读 60
收藏 1

之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的。

-----------------------------------------------------------------------------------------------

业务线程属于 Kafka的API层,对请求的处理通过调用KafkaAPIs中的方法实现!

1 KafkaRequestHandler

首先我们得知道这个业务线程池是怎么创建的

回到KafkaServer.scala

        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
          config.numIoThreads)
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {

  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {//创建这么多个runnable,放到线程里执行
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
    Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
  }

取出请求执行的代码

def run() {
    while (true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = time.nanoseconds
          req = requestChannel.receiveRequest(300)
          val endTime = time.nanoseconds
          if (req != null)
            req.requestDequeueTimeNanos = endTime
          val idleTime = endTime - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if (req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
          latch.countDown()
          return
        }
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)
      } catch {
        case e: FatalExitError =>
          latch.countDown()
          Exit.exit(e.statusCode)
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }

这样就不难理解了吧,

可见,API层使用kafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程,它是1个简易版的线程池,其中创建了多个KafkaRequestHandler线程。

 

KafkaApis

是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler.Request分发到不同的handle*()方法里执行,见图:

因为函数太多,这里就不展开,后面碰到的时候再详细展开!

 

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 872
博文 1061
码字总数 757086
作品 8
南京
架构师
私信 提问
玩转KafkaIO与Flink

随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大数据应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发...

微笑向暖wx
09/28
0
0
消息中间件—简谈Kafka中的NIO网络通信模型

文章摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型 前面写的两篇RocketMQ源码研究笔记系列: ...

癫狂侠
07/16
0
0
Java后端工程师学习大纲

之前自己总结过的Java后端工程师技能树,其涵盖的技术点比较全面,并非一朝一夕能够全部覆盖到的。对于一些还没有入门或者刚刚入门的Java后端工程师,如果一下子需要学习如此多的知识,想必很...

JackFace
2016/07/08
567
0
消息中间件(Kafka/RabbitMQ)收录集

本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能。 这里会持续收录相关知识,包括安装、部署、使用示例、监...

u013256816
2017/01/26
0
0
源码圈 365 胖友的书单整理

🙂🙂🙂关注微信公众号:【芋道源码】有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问...

芋道源码掘金Java群217878901
2017/09/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

DataFrames中的reindex用法

from pandas import DataFrame frame = DataFrame(np.arange(9).reshape((3,3)),index=['a','c','d'],columns=['Ohio','Texas','California'] states = ['Texas','Utah','California'] frame......

卖小孩的小火柴
9分钟前
0
0
拜托!面试请不要再问我Spring Cloud底层原理

毫无疑问,Spring Cloud是目前微服务架构领域的翘楚,无数的书籍博客都在讲解这个技术。不过大多数讲解还停留在对Spring Cloud功能使用的层面,其底层的很多原理,很多人可能并不知晓。因此本...

James-
9分钟前
1
0
Shiro框架

提供了认证,授权,加密,会话管理等功能 在spring配置文件中配置shiro,需要配置的有shiro的过滤器工厂,在里面我们可以配置什么页面需要认证,什么认证不需要认证,认证成功后跳转的路径,认证失败...

tinder_boy
12分钟前
0
0
有关定时任务的表达式--cron 详细解

Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式: Seconds Minutes Hours DayofMonth Month DayofWeek Year或 Seconds Minu...

kuchawyz
14分钟前
1
0
下一代大数据处理引擎,阿里云实时计算独享模式重磅发布

11月14日,阿里云重磅发布了实时计算独享模式,即用户独享一部分物理资源,这部分资源在网络/磁盘/CPU/内存等资源上跟其他用户完全独立,是实时计算在原有共享模式基础上的重大升级。 (观看...

阿里云云栖社区
15分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部