文档章节

Apache Kafka源码剖析:第5篇 业务API处理

强子哥哥
 强子哥哥
发布于 2017/08/13 15:31
字数 433
阅读 52
收藏 1
点赞 0
评论 0

之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的。

-----------------------------------------------------------------------------------------------

业务线程属于 Kafka的API层,对请求的处理通过调用KafkaAPIs中的方法实现!

1 KafkaRequestHandler

首先我们得知道这个业务线程池是怎么创建的

回到KafkaServer.scala

        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
          config.numIoThreads)
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {

  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {//创建这么多个runnable,放到线程里执行
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
    Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
  }

取出请求执行的代码

def run() {
    while (true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = time.nanoseconds
          req = requestChannel.receiveRequest(300)
          val endTime = time.nanoseconds
          if (req != null)
            req.requestDequeueTimeNanos = endTime
          val idleTime = endTime - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if (req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
          latch.countDown()
          return
        }
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)
      } catch {
        case e: FatalExitError =>
          latch.countDown()
          Exit.exit(e.statusCode)
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }

这样就不难理解了吧,

可见,API层使用kafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程,它是1个简易版的线程池,其中创建了多个KafkaRequestHandler线程。

 

KafkaApis

是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler.Request分发到不同的handle*()方法里执行,见图:

因为函数太多,这里就不展开,后面碰到的时候再详细展开!

 

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 859
博文 551
码字总数 640910
作品 8
南京
架构师
消息中间件—简谈Kafka中的NIO网络通信模型

文章摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型 前面写的两篇RocketMQ源码研究笔记系列: ...

癫狂侠
今天
0
0
架构设计:系统间通信(31)——其他消息中间件及场景应用(下1)

接上文:《架构设计:系统间通信(30)——Kafka及场景应用(中3)》 5、场景应用——电商平台:浏览记录收集功能 事件/日志收集系统是大中型软件不得不面对的话题。目前第三方业务系统对 事...

yinwenjie
2016/05/19
0
0
Heron:来自Twitter的新一代流处理引擎应用篇

【导语】 本文对比了Heron和常见的流处理项目,包括Storm、Flink、Spark Streaming和Kafka Streams,归纳了系统选型的要点。此外实践了Heron的一个案例,以及讨论了Heron在这一年开发的新特性...

dev_csdn
2017/12/26
0
0
Kafka分区分配计算(分区器Partitions)

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景...

u013256816
2017/12/03
0
0
Java后端工程师学习大纲

之前自己总结过的Java后端工程师技能树,其涵盖的技术点比较全面,并非一朝一夕能够全部覆盖到的。对于一些还没有入门或者刚刚入门的Java后端工程师,如果一下子需要学习如此多的知识,想必很...

JackFace
2016/07/08
567
0
flume源码编译/拦截器分析(一)

flume介绍 由于是第一次进行源码编译与开发,步骤有点复杂,后续再进行简化 Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统。Flume支持在日志系统中定制...

九_天
01/11
0
0
其他消息中间件及场景应用(下1)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51354773 目...

yunlielai
04/15
0
0
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe
2015/09/06
453
1
Apache Ignite剖析

1.概述   Apache Ignite和Apache Arrow很类似,属于大数据范畴中的内存分布式管理系统。在《Apache Arrow 内存数据》中介绍了Arrow的相关内容,它统一了大数据领域各个生态系统的数据格式,...

smartloli
03/11
0
0
消息中间件(Kafka/RabbitMQ)收录集

本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能。 这里会持续收录相关知识,包括安装、部署、使用示例、监...

u013256816
2017/01/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Dubbo 源码解读 —— 可支持序列化及自定义扩展

一、概述 从源码中,我们可以看出来。目前,Dubbo 内部提供了 5 种序列化的方式,分别为 fastjson、Hessian2、Kryo、fst 及 Java原生支持的方式 。 针对不同的序列化方式,对比内容如下: 名...

Ryan-瑞恩
7分钟前
0
0
MySQL内存设置—— MySQL server has gone away

set global max_allowed_packet=268435456

一梦心草
16分钟前
0
0
推导式

列表、集合和字典推导式 列表推导式是Python最受喜爱的特性之一。它允许用户方便的从一个集合过滤元素,形成列表,在传递参数的过程中还可以修改元素。形式如下: [expr for val in collect...

火力全開
21分钟前
0
0
maven配置文件settings.xml详解

settings.xml有什么用? 如果在Eclipse中使用过Maven插件,想必会有这个经验:配置settings.xml文件的路径。 settings.xml文件是干什么的,为什么要配置它呢? 从settings.xml的文件名就可以...

浮躁的码农
26分钟前
0
0
MakeCode图形化编程语言学习笔记:micro:bit编程练习题[图]

MakeCode图形化编程语言学习笔记:micro:bit编程练习题[图]: 基础训练题: Q1:摇晃micro:bit编程板,随机出现7个小动物图标中的一个,并且前后相邻两次出现的小动物不重复。 注:七个小动物...

原创小博客
26分钟前
0
0
Redis 压力测试说明

Redis 压力测试说明 redis压力测试 2014-03-24 21:41:07| 分类: 默认分类 | 标签:redis |举报|字号 订阅 这几天对比测试mongodb、redis、pg的性能,主要是在消息队列、消息处理、用户经纬度...

舒文joven
26分钟前
0
0
拉姆达表达式 追加 条件判断 Expression>

public static class PredicateBuilder {   /// <summary>   /// 机关函数应用True时:单个AND有效,多个AND有效;单个OR无效,多个OR无效;混应时写在AND后的OR有效   /// </summary...

Lytf
39分钟前
0
0
【HAVENT原创】Spring Boot + Kafka 消息日志开发

最近因为部门需要将服务程序的各种日志发送给 Kafka 进行分析,所以写一个 Kafka 消息日志操作类,主要用来保存日志到 Kafka 以便查询。 一、pom.xml 增加配置 <!-- HH: 引入 kafka 模块 ...

HAVENT
39分钟前
0
0
7、Git命令解析

1、创建版本库 cd E:mkdir myRepositorypwdls -ah======git init 2、添加文件到仓库 添加git add readme.txt提交git commit -m "i wrote a readme file"【为什么Git添加...

丑陋的皮囊
40分钟前
0
0
ImageMagick批量压缩图片

#!/bin/shfor img in `find ./image -name "*.jpg"`; donewimg=`basename $img` convert -quality 75% $img ./ok/$newimg echo ./ok/$newimgdone...

dworry
40分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部