Apache Kafka源码剖析:第2篇 Kafka网络引擎: 核心字段&初始化
Apache Kafka源码剖析:第2篇 Kafka网络引擎: 核心字段&初始化
强子哥哥 发表于3个月前
Apache Kafka源码剖析:第2篇 Kafka网络引擎: 核心字段&初始化
  • 发表于 3个月前
  • 阅读 73
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 学生专属云服务套餐 10元起购>>>   

上一节,我们通过Thrift的案例讲解了Reactor的网络服务器处理模式,

其实,从redis,mina,netty,thrift[下面简称为RMNT]的源码来看,万变不离其宗,

或者说套路都是一样一样的,这没办法,纯技术的原理本来就是一样的。

下面开始正式进入Kafka的网络引擎的世界!

-----------------------------------------------------------------------------------------

Kafka的网络层采用多线程,多个Selector的设计,这跟RMNT的思路差不多(Redis特殊一些,后面不再强调这1点)。

核心类是SocketServer,包含1个Acceptor用于接受新连接,每个Acceptor对应多个Processor线程,每个Processor线程拥有自己独立的Selector.主要用于从连接中读取请求和写回响应。

每个Processor拥有自己的Selector,这样才可以将某个socket限制在自己的处理范围内,直到这个Socket的生命周期结束。
而且不但读取请求是这个Processor的Selector线程处理,写回响应也是它处理,
一言以蔽之:我会对你负责到底的!!! :)
其实想想也正常,总不可能同1个 socket一会被Selector A处理,一会被B处理,这就乱套了,还是稳定起来好!

每个Acceptor也对应多个Handle线程,一般业内称之为业务处理线程池

所以如果你去别的单位面试,

问你Netty如何处理耗时业务的,你不说要新开一个业务线程池,相信我,面试官内心会把你鄙视一顿的 :)

千万别说Netty的业务处理跟IO线程池在一个线程处理的,绝对要丢分!

此话一出,基本Netty这一项就不用问下去了

---这里要注意的是:业务线程池的结果是要返回给IO线程池的,也就是Processor线程组,

这2种线程之间通过RequestChannel进行通信

在Thrift中,是通过Runnable封装FrameBuffer来实现的
protected Runnable getRunnable(FrameBuffer frameBuffer) {
    return new Invocation(frameBuffer);
  }

FrameBuffer封装的就是业务逻辑完整的一个请求体,
你就理解为一个完整意义的HTTP请求体一样的

技术背景:TCP的字节流协议特性!!!不多说了

 

SocketServer的核心字段

源码位置

find ./ -name SocketServer.*
./core/src/main/scala/kafka/network/SocketServer.scala

1)endpoints:

EndPoint集合。一般服务器有多个网卡,这就可以配置多个IP,Kafka可以同时监听多个端口,

一个endpoint就定义了host,port,网络协议等信息,

每个Endpoint对应1个Acceptor对象

这个其实有点类似于ActiveMQ的概念,ActiveMQ支持多个协议,每个协议开启了一个TCP协议的监听端口,
所以一个ActiveMQ进程其实占用了很多个listening port.

2)numProcessorThreads & totalProcessorThreads

numProcessorThreads 的意思是 每个endpoint的Processor线程的个数

那么后面1个呢?因为有多个endpoint,所以就是endpoint的个数* numProcessorThreads

3)maxQueuedRequests: 缓存的最大请求个数

想一想,在Thrift中,最多可以缓存多少个? :)

这个其实是通过ProcessorThread对Socket进行读取后得到请求,塞到这个队列里进行缓冲

4)maxConnectionsPerIp: 每个IP上能创建的最大连接数

正常来说,不会有限制吧,难道要限制 client不连过来吗???

5)maxConnectionsPerIpOverrides: 略

6)requestChannel: 队列

kafka里的一个逻辑完整请求封装对应的队列,想想http的请求体对应的是HttpRequest

Thrift对应的是

在Thrift中,是通过Runnable封装FrameBuffer来实现的
protected Runnable getRunnable(FrameBuffer frameBuffer) {
    return new Invocation(frameBuffer);
  }

FrameBuffer封装的就是业务逻辑完整的一个请求体,
你就理解为一个完整意义的HTTP请求体一样的

技术背景:TCP的字节流协议特性!!!不多说了

不解释!

7)Acceptors:

Acceptor对象集合,每个Endpoint对应一个这样的对象,不解释!

8)Processors:

IO线程的集合,不解释!

===介绍完了核心字段,下面看SocketServer的初始化流程===

首先,老规矩,构造Linux的debug环境。

1)启动Kafka server
2)查看启动命令
/root/myAllFiles/jdk1.8.0_111/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties -cp /root/leveldb_0.9:/root/leveldb_0.9/*::/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/argparse4j-0.7.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/commons-lang3-3.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-api-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-file-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-json-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-runtime-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-transforms-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/guava-20.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-api-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-locator-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-utils-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-core-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-databind-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javassist-3.21.0-GA.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.annotation-api-1.2.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.servlet-api-3.1.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-client-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-common-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-core-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-guava-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-media-jaxb-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-server-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-security-9.2.15.v20160210.jar:

3)构造debug命令
jdb -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties -classpath /root/leveldb_0.9:/root/leveldb_0.9/*::/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/argparse4j-0.7.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/commons-lang3-3.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-api-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-file-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-json-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-runtime-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-transforms-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/guava-20.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-api-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-locator-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-utils-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-core-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-databind-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javassist-3.21.0-GA.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.annotation-api-1.2.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.servlet-api-3.1.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-client-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-common-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-core-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-guava-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-media-jaxb-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-server-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-security-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/libs/*:  kafka.Kafka config/server.properties

第3步如果报错,读者可以自己体会然后修正 :)

好,通过jdb跑起来之后,我们的目标是大体了解SocketServer的执行过程,具体每个组件的实现会在后面详细介绍。

SocketServer会在启动时遍历EndPoint,启动对应的各种线程 :)

安装scala插件 见 http://www.cnblogs.com/xiyuan2016/p/6626825.html
http://scala-ide.org/download/prev-stable.html

------开始尝试debug,来热热身------

先来个断点

stop in kafka.network.SocketServer:54

然后可以开始debug了,必要的话,请自己加上源码的文件夹即可。

如图所示:

================下面正式debug==================

小贴士:

属性文件的对应关系,请参考:
kafka.server.KafkaConfig.scala
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
  //默认就1个
  private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
  private val numProcessorThreads = config.numNetworkThreads//配置文件中是3,val NumNetworkThreadsProp = "num.network.threads"
  private val maxQueuedRequests = config.queuedMaxRequests//默认500, val QueuedMaxRequestsProp = "queued.max.requests"
  private val totalProcessorThreads = numProcessorThreads * endpoints.size//3*1

  private val maxConnectionsPerIp = config.maxConnectionsPerIp//默认值Int.MaxValue->2147483647 val MaxConnectionsPerIpProp = "max.connections.per.ip"
  private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides

  this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "

接下来初始化RequestChannel对象

  val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)

跟进去

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  private var responseListeners: List[(Int) => Unit] = Nil
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道
  for(i <- 0 until numProcessors)//初始化,用了LinkedBlockingQueue
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

回到SocketServer.scala

  private val processors = new Array[Processor](totalProcessorThreads)//准备构造IO线程池

  private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()//Acceptor
  private var connectionQuotas: ConnectionQuotas = _
  // register the processor threads for notification of responses
  requestChannel.addResponseListener(id => processors(id).wakeup())

意思就是说当有业务响应准备好时,需要wakeup当前io线程的Selector.

接下来,执行startup方法,这是核心

Step completed: "thread=main", kafka.network.SocketServer.startup(), line=74 bci=0

我们来看看做了哪些事情

/**
   * Start the socket server
   */
  def startup() {
    this.synchronized {
      //限额
      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
      //一些TCP的参数
      val sendBufferSize = config.socketSendBufferBytes//配置文件:102400 val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
      val recvBufferSize = config.socketReceiveBufferBytes//配置文件:102400 val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
      val brokerId = config.brokerId//这个就不用说了

      var processorBeginIndex = 0
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        val securityProtocol = endpoint.securityProtocol
        val processorEndIndex = processorBeginIndex + numProcessorThreads//每个endpoint都启动这么多个线程

        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)//初始化Processor线程

        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)//初始化Acceptor线程
        acceptors.put(endpoint, acceptor)
        Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }

关闭就比较简单了

  /**
   * Shutdown the socket server
   */
  def shutdown() = {//关闭操作
    info("Shutting down")
    this.synchronized {
      acceptors.values.foreach(_.shutdown)//关闭acceptor
      processors.foreach(_.shutdown)//关闭processor
    }
    info("Shutdown completed")
  }

 

AbstractServerThread

看下面2个

/**
 * Thread that accepts and configures new connections. There is one of these per endpoint.
 */
private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

/**
 * Thread that processes all requests from a single connection. There are N of these running in parallel
 * each of which has its own selector
 */
private[kafka] class Processor(val id: Int,
                               time: Time,
                               maxRequestSize: Int,
                               requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               connectionsMaxIdleMs: Long,
                               listenerName: ListenerName,
                               securityProtocol: SecurityProtocol,
                               config: KafkaConfig,
                               metrics: Metrics,
                               credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

知道,Acceptor和Processor都是继承了AbstractServerThread这个类

/**
 * A base class with some helper variables and methods
 */
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {

它实现了Runnable接口的抽象类,分别为acceptor和Processor线程提供了具体的startup/shutdown功能!

小贴士:
停下来,回顾一下Netty的玩法,是不是很熟悉?
/**
 * A base class with some helper variables and methods
 */
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {

  private val startupLatch = new CountDownLatch(1)//标识是否已经启动完毕

  // `shutdown()` is invoked before `startupComplete` and `shutdownComplete` if an exception is thrown in the constructor
  // (e.g. if the address is already in use). We want `shutdown` to proceed in such cases, so we first assign an open
  // latch and then replace it in `startupComplete()`.
  @volatile private var shutdownLatch = new CountDownLatch(0)//标记是否关闭完毕

  private val alive = new AtomicBoolean(true)//是否存活

  def wakeup(): Unit

  /**
   * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
   */
  def shutdown(): Unit = {
    alive.set(false)
    wakeup()
    shutdownLatch.await()
  }

  /**
   * Wait for the thread to completely start up
   */
  def awaitStartup(): Unit = startupLatch.await

  /**
   * Record that the thread startup is complete
   */
  protected def startupComplete(): Unit = {
    // Replace the open latch with a closed one
    shutdownLatch = new CountDownLatch(1)
    startupLatch.countDown()
  }

  /**
   * Record that the thread shutdown is complete
   */
  protected def shutdownComplete(): Unit = shutdownLatch.countDown()

  /**
   * Is the server still running?
   */
  protected def isRunning: Boolean = alive.get

  /**
   * Close the connection identified by `connectionId` and decrement the connection count.
   */
  def close(selector: KSelector, connectionId: String): Unit = {//关闭socket,减少连接数统计
    val channel = selector.channel(connectionId)
    if (channel != null) {
      debug(s"Closing selector connection $connectionId")
      val address = channel.socketAddress
      if (address != null)
        connectionQuotas.dec(address)
      selector.close(connectionId)
    }
  }

  /**
   * Close `channel` and decrement the connection count.
   */
  def close(channel: SocketChannel): Unit = {
    if (channel != null) {
      debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
      connectionQuotas.dec(channel.socket.getInetAddress)
      swallowError(channel.socket().close())
      swallowError(channel.close())
    }
  }
}

关于更详细的Acceptor和Processor的细节,后面再说!

标签: Kafka
共有 人打赏支持
强子哥哥
粉丝 825
博文 691
码字总数 711776
作品 7
×
强子哥哥
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: