文档章节

Apache Kafka源码剖析:第4篇 业务线程池的原理

强子哥哥
 强子哥哥
发布于 2017/08/13 04:26
字数 547
阅读 19
收藏 0

下面,我们来讲解一个请求是如何被业务线程池处理的!

Processor线程与Handler业务线程之间传递数据是通过RequestChannel里的RequestQueue来实现的。

响应则是通过responseQueues队列来实现的。

每个Processor线程对应着一个responseQueue.读到的请求放到requestQueue中,Handler业务线程从这个队里拿出请求进行处理;

业务线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程负责发送给客户端。

这个跟netty和thrift如出一辙!

看一下核心概念:

1)requestQueue

 private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道
 
  /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

注意这是全局唯一的1个对象,跟Thrift也是走这条路!

2)responseQueues

 private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道
 
  /** Send a response back to the socket server to be sent over the network */
  def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }

3)numProcessors

IO线程的个数

4)queueSize

请求的最大个数

5)responseListeners

监听器列表

在socketserver的初始化过程中,有注册监听器

 // register the processor threads for notification of responses
  requestChannel.addResponseListener(id => processors(id).wakeup())

 

因为TCP是字节流协议,所以要处理提取完整请求,结果就是RequestChannel.Request对象

供业务线程使用

 

这里聊一聊另外1个话题,我们看到这里使用了异步跨线程处理的方式,如何保证请求的顺序性呢?

之前的文章提过了,拿到一个请求体后,就取消对OP_READ的注册,导致就算有第2个请求过来我也装聋作哑,等把响应体传给IO线程后,再处理新的请求,此时多个响应会在IO线程那边排队,再通过socket发送出去,很简单!

 

Kafka网络层的设计原理和实现就介绍到这里了。很多框架都采用这种模式。

希望读者能够通过这几篇文章理解Kafka的网络端实现!

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 859
博文 900
码字总数 615641
作品 8
南京
架构师
YYWebImage 源码剖析:线程调度与缓存策略

系列文章: YYCache 源码剖析:一览亮点 YYModel 源码剖析:关注性能 YYAsyncLayer 源码剖析:异步绘制 YYImage 源码剖析:图片处理技巧 YYWebImage 源码剖析:线程调度与缓存策略 引言 在 ...

indulge_in
07/21
0
0
消息中间件—简谈Kafka中的NIO网络通信模型

文章摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型 前面写的两篇RocketMQ源码研究笔记系列: ...

癫狂侠
07/16
0
0
Java系列文章(全)

JVM JVM系列:类装载器的体系结构 JVM系列:Class文件检验器 JVM系列:安全管理器 JVM系列:策略文件 Java垃圾回收机制 深入剖析Classloader(一)--类的主动使用与被动使用 深入剖析Classloader(二...

www19
2017/07/04
0
0
架构设计:系统间通信(30)——Kafka及场景应用(中3)

接上文:《架构设计:系统间通信(29)——Kafka及场景应用(中2)》 4-5、Kafka原理:消费者 作为Apache Kafka消息队列,它的性能指标相当一部分取决于消费者们的性能——只要消息能被快速消...

yinwenjie
2016/05/08
0
0
Thrift源码剖析

由于工作的关系,需要定位一个 bug 是否和 Thrift 有关, 所以用了一下午的时间研读了 Thrift-0.9.0 代码,虽然发现这个 bug 和 thrift 无关。 但是读源码还是有所收获,所以整理成这篇文章,...

Mr_Tea
2016/07/12
66
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

maven坐标和依赖

一、maven坐标详解 <groupId>com.fgt.club</groupId><artifactId>club-common-service-facade</artifactId><version>3.0.0</version><packaging>jar</packaging> maven的坐标元素说......

老韭菜
今天
1
0
springmvc-servlet.xml配置表功能解释

问:<?xml version="1.0" encoding="UTF-8" ?> 答: xml version="1.0"表示是此xml文件的版本是1.0 encoding="UTF-8"表示此文件的编码方式是UTF-8 问:<!DOCTYPE beans PUBLIC "-//SPRING//......

隐士族隐逸
今天
1
0
基于TP5的微信的公众号获取登录用户信息

之前讲过微信的公众号自动登录的菜单配置,这次记录一下在TP5项目中获取自动登录的用户信息并存到数据库的操作 基本的流程为:微信设置自动登录的菜单—>访问的URL指定的函数里获取用户信息—...

月夜中徘徊
今天
0
0
youTrack

package jetbrains.teamsys.license.runtime; 计算lis package jetbrains.ring.license.reader; 验证lis 安装后先不要生成lis,要把相关文件进行替换 ring-license-checker-1.0.41.jar char......

max佩恩
今天
1
0
12.17 Nginx负载均衡

Nginx负载均衡 下面的dig看到可以返回2个IP,就是解析出来的IP,这样我们可以做负载均衡。 dig www.qq.com 1.vim /usr/local/nginx/conf/vhost/fuzai.conf 2.添加如下配置 upstream qq //定义...

芬野de博客
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部