文档章节

Apache Kafka源码剖析:第4篇 业务线程池的原理

强子大叔的码田
 强子大叔的码田
发布于 2017/08/13 04:26
字数 547
阅读 106
收藏 1

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

下面,我们来讲解一个请求是如何被业务线程池处理的!

Processor线程与Handler业务线程之间传递数据是通过RequestChannel里的RequestQueue来实现的。

响应则是通过responseQueues队列来实现的。

每个Processor线程对应着一个responseQueue.读到的请求放到requestQueue中,Handler业务线程从这个队里拿出请求进行处理;

业务线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程负责发送给客户端。

这个跟netty和thrift如出一辙!

看一下核心概念:

1)requestQueue

 private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道
 
  /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

注意这是全局唯一的1个对象,跟Thrift也是走这条路!

2)responseQueues

 private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道
 
  /** Send a response back to the socket server to be sent over the network */
  def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }

3)numProcessors

IO线程的个数

4)queueSize

请求的最大个数

5)responseListeners

监听器列表

在socketserver的初始化过程中,有注册监听器

 // register the processor threads for notification of responses
  requestChannel.addResponseListener(id => processors(id).wakeup())

 

因为TCP是字节流协议,所以要处理提取完整请求,结果就是RequestChannel.Request对象

供业务线程使用

 

这里聊一聊另外1个话题,我们看到这里使用了异步跨线程处理的方式,如何保证请求的顺序性呢?

之前的文章提过了,拿到一个请求体后,就取消对OP_READ的注册,导致就算有第2个请求过来我也装聋作哑,等把响应体传给IO线程后,再处理新的请求,此时多个响应会在IO线程那边排队,再通过socket发送出去,很简单!

 

Kafka网络层的设计原理和实现就介绍到这里了。很多框架都采用这种模式。

希望读者能够通过这几篇文章理解Kafka的网络端实现!

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 923
博文 1630
码字总数 1283214
作品 9
南京
架构师
私信 提问
加载中

评论(0)

YYWebImage 源码剖析:线程调度与缓存策略

系列文章: YYCache 源码剖析:一览亮点 YYModel 源码剖析:关注性能 YYAsyncLayer 源码剖析:异步绘制 YYImage 源码剖析:图片处理技巧 YYWebImage 源码剖析:线程调度与缓存策略 引言 在 ...

indulge_in
2018/07/21
0
0
消息队列框架的设计与实现阅读笔记

1 背景介绍 消息队列在互联网领域里得到了广泛的应用,它多应用在异步处理、模块之间的解偶和高并发的消峰等场景,消息队列中表现最好的当属Apache开源项目Kafka,Kafka使用支持高并发的Sca...

osc_z97fe7xu
2019/06/19
2
0
深度剖析Apache Dubbo核心技术内幕

一、前言 在单体应用时,不同业务模块部署在同一个JVM进程内,这时候通过本地调用就可以解决不同业务模块之间的相互引用;但多体应用时,不同业务模块大多部署到不同机器上,这时候一 个高效...

01/09
0
0
Netty源码分析(前言, 概述及目录)

Netty源码分析(完整版) 前言 前段时间公司准备改造redis的客户端, 原生的客户端是阻塞式链接, 并且链接池初始化的链接数并不高, 高并发场景会有获取不到连接的尴尬, 所以考虑了用netty长连接...

osc_rreaoxa0
2018/12/31
6
0
一步步动手实现高并发的Reactor模型 —— Kafka底层如何充分利用多线程优势去处理网络I/O与业务分发

一、从《Apeche Kafka源码剖析》上搬来的概念和图 Kafka网络采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程的读者应该了解Java NIO提供了Reactor模式的API。常见的单线程Jav...

Anur
2018/12/13
4.6K
7

没有更多内容

加载失败,请刷新页面

加载更多

微服务项目搭建,到底要不要聚合工程?

这是一个入门问题,做微服务项目,首先就是要搭建 Project,代码采用什么样的形式来组织,这是我们面临的第一个问题。 要扯清楚这个问题,首先对 Maven 的使用不能含糊,小伙伴们可以在公众号...

osc_y12wmf09
12分钟前
11
0
C语言探索之旅 | 第一部分第六课:变量的世界(三),显示变量内容

作者 谢恩铭,慕课网精英讲师 Oscar老师。 转载请注明出处。 内容简介 用 printf 显示变量内容 用 scanf 提取程序中的输入 总结 第一部分第七课预告 1. 用 printf 显示变量内容 变量相关的内...

osc_zjs1puzi
14分钟前
14
0
php 操作RabbitMQ

基本流程图 如果exchange 没有绑定queue,则消息将会被丢弃 如果创建exchange,queue,并且已经绑定了,则可以直接使用 为了防止脚本出问题 可以配合supervisor 安装 从网站 https://packag...

php开源社区
15分钟前
18
0
Kotlin Coroutines Flow 系列(五) 其他的操作符

八. Flow 其他的操作符 8.1 Transform operators transform 在使用 transform 操作符时,可以任意多次调用 emit ,这是 transform 跟 map 最大的区别: fun main() = runBlocking { (1...

osc_0l1onu3j
15分钟前
13
0
挽救数据库性能的30条黄金法则

原文: 挽救数据库性能的30条黄金法则 1. 优化查询,应尽量避免全表扫描,应该在用于检索数据和排序数据的字段上建立索引,如where子句用于搜索,order by子句用于排序,所以在这两个子句涉及...

osc_gxvh47u5
16分钟前
16
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部