文档章节

Apache Kafka源码剖析:第4篇 业务线程池的原理

强子1985
 强子1985
发布于 2017/08/13 04:26
字数 547
阅读 30
收藏 0

下面,我们来讲解一个请求是如何被业务线程池处理的!

Processor线程与Handler业务线程之间传递数据是通过RequestChannel里的RequestQueue来实现的。

响应则是通过responseQueues队列来实现的。

每个Processor线程对应着一个responseQueue.读到的请求放到requestQueue中,Handler业务线程从这个队里拿出请求进行处理;

业务线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程负责发送给客户端。

这个跟netty和thrift如出一辙!

看一下核心概念:

1)requestQueue

 private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道
 
  /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

注意这是全局唯一的1个对象,跟Thrift也是走这条路!

2)responseQueues

 private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道
 
  /** Send a response back to the socket server to be sent over the network */
  def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }

3)numProcessors

IO线程的个数

4)queueSize

请求的最大个数

5)responseListeners

监听器列表

在socketserver的初始化过程中,有注册监听器

 // register the processor threads for notification of responses
  requestChannel.addResponseListener(id => processors(id).wakeup())

 

因为TCP是字节流协议,所以要处理提取完整请求,结果就是RequestChannel.Request对象

供业务线程使用

 

这里聊一聊另外1个话题,我们看到这里使用了异步跨线程处理的方式,如何保证请求的顺序性呢?

之前的文章提过了,拿到一个请求体后,就取消对OP_READ的注册,导致就算有第2个请求过来我也装聋作哑,等把响应体传给IO线程后,再处理新的请求,此时多个响应会在IO线程那边排队,再通过socket发送出去,很简单!

 

Kafka网络层的设计原理和实现就介绍到这里了。很多框架都采用这种模式。

希望读者能够通过这几篇文章理解Kafka的网络端实现!

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 875
博文 1090
码字总数 799329
作品 8
南京
架构师
私信 提问
YYWebImage 源码剖析:线程调度与缓存策略

系列文章: YYCache 源码剖析:一览亮点 YYModel 源码剖析:关注性能 YYAsyncLayer 源码剖析:异步绘制 YYImage 源码剖析:图片处理技巧 YYWebImage 源码剖析:线程调度与缓存策略 引言 在 ...

indulge_in
07/21
0
0
消息中间件—简谈Kafka中的NIO网络通信模型

文章摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型 前面写的两篇RocketMQ源码研究笔记系列: ...

癫狂侠
07/16
0
0
Java系列文章(全)

JVM JVM系列:类装载器的体系结构 JVM系列:Class文件检验器 JVM系列:安全管理器 JVM系列:策略文件 Java垃圾回收机制 深入剖析Classloader(一)--类的主动使用与被动使用 深入剖析Classloader(二...

www19
2017/07/04
0
0
为什么建议 Netty 的 I/O 线程与业务线程分离

点击上方“芋道源码”,选择“置顶公众号” 技术文章第一时间送达! 来源:Netty 之家 问题背景: 某互联网同学咨询一个Netty使用问题:最近在研究公司内部的RPC框架,发现底层通信框架使用的...

芋道源码
10/09
0
0
Thrift源码剖析

由于工作的关系,需要定位一个 bug 是否和 Thrift 有关, 所以用了一下午的时间研读了 Thrift-0.9.0 代码,虽然发现这个 bug 和 thrift 无关。 但是读源码还是有所收获,所以整理成这篇文章,...

Mr_Tea
2016/07/12
66
0

没有更多内容

加载失败,请刷新页面

加载更多

开源 serverless 产品原理剖析(二) - Fission

背景 本文是开源 serverless 产品原理剖析系列文章的第二篇,关于 serverless 背景知识的介绍可参考文章开源 serverless 产品原理剖析(一) - Kubeless,这里不再赘述。 Fission 简介 Fiss...

阿里云官方博客
2分钟前
0
0
Android面试整理(附答案)

面试,无非都是问上面这些问题(挺多的 - -!),聘请中高级的安卓开发会往深的去问,并且会问一延伸二。以下我先提出几点重点,是面试官基本必问的问题,请一定要去了解! 基础知识 – 四大组...

终端研发部
7分钟前
1
0
Vue 改变数组触发视图更新

Vue 改变数组触发视图更新 以下方法调用会改变原始数组 push(), pop(), shift(), unshift(), splice(), sort(), reverse()push()push() 方法可向数组的末尾添加一个或多个元素,并返回新的...

不负好时光
12分钟前
0
0
计算机系统要素 C5

本章值得一提的是组织计算机的结构。Hack 的指令和数据是分开存储的,因此它的 CPU 有两个 input: IN inM[16], // M value input (M = contents of RAM[A]) instruction[16],...

lionets
28分钟前
2
0
SpringSecurity404需要注意的地方

在使用@RequestMapping的时候路径的值如果写为("auth"),虽然用的时候前面加不加"/"没有区别,但是在配置了SpringSecurity的http.authorizeRequests().antMatchers()时就必须要注意了! 🌰1...

百萬馬力
32分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部