Apache Kafka源码剖析:第4篇 业务线程池的原理
Apache Kafka源码剖析:第4篇 业务线程池的原理
强子哥哥 发表于4个月前
Apache Kafka源码剖析:第4篇 业务线程池的原理
  • 发表于 4个月前
  • 阅读 9
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

下面,我们来讲解一个请求是如何被业务线程池处理的!

Processor线程与Handler业务线程之间传递数据是通过RequestChannel里的RequestQueue来实现的。

响应则是通过responseQueues队列来实现的。

每个Processor线程对应着一个responseQueue.读到的请求放到requestQueue中,Handler业务线程从这个队里拿出请求进行处理;

业务线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程负责发送给客户端。

这个跟netty和thrift如出一辙!

看一下核心概念:

1)requestQueue

 private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道
 
  /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

注意这是全局唯一的1个对象,跟Thrift也是走这条路!

2)responseQueues

 private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道
 
  /** Send a response back to the socket server to be sent over the network */
  def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }

3)numProcessors

IO线程的个数

4)queueSize

请求的最大个数

5)responseListeners

监听器列表

在socketserver的初始化过程中,有注册监听器

 // register the processor threads for notification of responses
  requestChannel.addResponseListener(id => processors(id).wakeup())

 

因为TCP是字节流协议,所以要处理提取完整请求,结果就是RequestChannel.Request对象

供业务线程使用

 

这里聊一聊另外1个话题,我们看到这里使用了异步跨线程处理的方式,如何保证请求的顺序性呢?

之前的文章提过了,拿到一个请求体后,就取消对OP_READ的注册,导致就算有第2个请求过来我也装聋作哑,等把响应体传给IO线程后,再处理新的请求,此时多个响应会在IO线程那边排队,再通过socket发送出去,很简单!

 

Kafka网络层的设计原理和实现就介绍到这里了。很多框架都采用这种模式。

希望读者能够通过这几篇文章理解Kafka的网络端实现!

标签: Kafka
共有 人打赏支持
强子哥哥
粉丝 836
博文 686
码字总数 713911
作品 8
×
强子哥哥
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: