文档章节

kafka-网络层框架

T
 Thinking--
发布于 2017/07/20 13:48
字数 1334
阅读 22
收藏 0

kafka-网络层介绍

kafka的请求都是通过socket进行通信的,网络层就是负责接收请求,并且发送响应的。kafka网络层使用了java的nio异步框架,大大提高了性能。

框架图

输入图片说明

Acceptor只监听新的连接,然后通过新的连接轮询发送给Processor。

Processor负责与连接的数据交互,并且将请求转发给RequestHandler处理。

RequestHandler负责处理Processor转发的请求。

KafkaSelector是对java nio的Selector封装,负责读取客户的请求和发送响应。

网络层初始化-SocketServer类

SocketServer负责上述类的初始化

def startup() {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
       .......
      var processorBeginIndex = 0
      // config.listeners表示监听地址
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        // numProcessorThreads表示Processor的数目
        val processorEndIndex = processorBeginIndex + numProcessorThreads
        // 初始化Processor
        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
        // 初始化Acceptor
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        // 启动acceptor线程
        Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
        // 等待Acceptor完成开始动作
        acceptor.awaitStartup()
        processorBeginIndex = processorEndIndex
      }

AbstractServerThread类

AbstractServerThread是Runnable的封装,提供startup和shutdown的过程。它是Acceptor和Processor的基类。下面介绍startup过程,shutdown过程类似。

abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
    // 计数器,表明是否startup完成
    private val startupLatch = new CountDownLatch(1)
    @volatile private var shutdownLatch = new CountDownLatch(0)
  // 等待线程startup阶段完成,由外界调用
  def awaitStartup(): Unit = startupLatch.await

  // 通知startup完成,由线程自身调用
  protected def startupComplete(): Unit = {
    // Replace the open latch with a closed one
    shutdownLatch = new CountDownLatch(1)
    startupLatch.countDown()
  }

Acceptor类

Acceptor的初始化过程

  // 将java nio的Selector重命名为NSelector
  import java.nio.channels.{Selector => NSelector}
  // 实例化NSelector
  private val nioSelector = NSelector.open()
  // 实例化server channel
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   
  // 启动Processor 
  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor, false).start()
    }
  }

openServerSocket方法

private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if(host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    // 实例化serverChannel
    val serverChannel = ServerSocketChannel.open()
    // 设置非阻塞
    serverChannel.configureBlocking(false)
    // bind地址
    serverChannel.socket.bind(socketAddress)
.......

Acceptor本质是一个线程,下面是它的run方法

def run() {
    // 注册监听ACCEPT事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    .........
    // currentProcessor表示processor的索引号
    var currentProcessor = 0
    while (isRunning) {
        // 等待500ms,返回就绪的通道
        val ready = nioSelector.select(500)
        if (ready > 0) {
            // 取出就绪的keys
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            // 遍历keys
            while (iter.hasNext && isRunning) {
                if (key.isAcceptable)
                    // 接收新的连接,发送给索引为currentProcessor的processor
                    accept(key, processors(currentProcessor))
                 // 更新currentProcessor,这里体现了轮询的算法
                currentProcessor = (currentProcessor + 1) % processors.length
                ........

下面是accept方法,它只是接收的新的连接,然后发送给processor

def accept(key: SelectionKey, processor: Processor) {
    .........
    // 接收新的连接
    val socketChannel = serverSocketChannel.accept()
    // 设置非阻塞
    socketChannel.configureBlocking(false)
    // 禁用了Nagle 算法,保证数据能立马发送出去
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setKeepAlive(true)
   
    // 调用processor的accept方法
    processor.accept(socketChannel)
    .........

Processor类

Processor 属性

// newConnections是acceptor线程和processor通信的queue,
//  acceptor会向队列添加新的连接,processor会从队列取出连接,然后进行处理。
newConnections = new ConcurrentLinkedQueue[SocketChannel]()

//kafka的Selector
selector = new KSelector(......)

下面是Processor的accept方法,在上面由acceptor调用

def accept(socketChannel: SocketChannel) {
   // acceptor将新的连接,加入到newConnections里面
   newConnections.add(socketChannel)
   // 唤醒selector,使processor能即时处理新连接
   wakeup()

接下来看看Processor的run方法

  override def run() {
    startupComplete()
    while (isRunning) {
        ..........
        // 处理newConnections队列里面的新连接
        configureNewConnections()
        // 处理新的响应
        processNewResponses()
        
        poll()
        // 处理已经读取完的请求
        processCompletedReceives()
        // 处理已经发送的请求
        processCompletedSends()
        //处理关闭的连接
        processDisconnected()
        .........
    }

    .........
  }

configureNewConnections方法,处理新连接

private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      // 从newConnections队列里面取出新连接
      val channel = newConnections.poll()
      // 获取连接信息
      val localHost = channel.socket().getLocalAddress.getHostAddress
      val localPort = channel.socket().getLocalPort
      val remoteHost = channel.socket().getInetAddress.getHostAddress
      val remotePort = channel.socket().getPort
      // 构件连接id
      val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
      // 调用selecotr注册新连接
      selector.register(connectionId, channel)
      ........

下面是processNewResponses方法

  private def processNewResponses() {
    // 从requestChannel取出连接
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
        // 根据响应的Action作不同的处理
        curr.responseAction match {
            case RequestChannel.NoOpAction =>
                val channelId = curr.request.connectionId
                if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
                    // 如果连接存在并且没有准备关闭,程序会继续等待读取数据。
                    // unmute方法,实质是注册新连接的读事件
                    selector.unmute(channelId)
           case RequestChannel.SendAction =>
                val responseSend = curr.responseSend.getOrElse(
                          throw new IllegalStateException(s"responseSend must be defined for SendAction,         response: $curr"))
                // 发送响应
                sendResponse(curr, responseSend)
           case RequestChannel.CloseConnectionAction =>
                //关闭连接
                close(selector, curr.request.connectionId)
        }
    ..........

poll方法,实质就是selector.poll的封装,后面篇幅会讲到Selector的实现细节

private def poll() {
    ......
    selector.poll(300)
    ......
 }

processCompletedReceives方法

private def processCompletedReceives() {
    // 从selector遍历completedReceives
    selector.completedReceives.asScala.foreach { receive =>
    val openChannel = selector.channel(receive.source)
    val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
    val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
    // 构建Request
    val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
    // 发送request到requestChannel
    requestChannel.sendRequest(req)
    // 取消这个连接的读事件,不继续读取请求
    selector.mute(receive.source)

processCompletedSends方法

  private def processCompletedSends() {
    // 从selector遍历completedSends
    selector.completedSends.asScala.foreach { send =>
      val resp = inflightResponses.remove(send.destination).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
      }
      // 增加这个连接的读事件,继续读取请求
      selector.unmute(send.destination)
    }
  }

processDisconnected方法

  private def processDisconnected() {
    // 从selector遍历disconnected
    selector.disconnected.keySet.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
      inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
      // the channel has been closed by the selector but the quotas still need to be updated
      connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }

概括

SocketServer: 负责框架初始化。实例Acceptor和Processor

Acceptor: 使用java nio框架的Selector,绑定监端口,负责接收新连接,并且通过基于线程间的队列,把新连接轮询发送给Processor

Processor:接收acceptor的新连接,使用KSelector负责读取连接的请求,然后把请求发送给

requestChannel处理。然后从requestChannel获取响应后,将响应基于KSelector发送出去

© 著作权归作者所有

共有 人打赏支持
T
粉丝 6
博文 49
码字总数 44403
作品 0
武汉
Kafka源码分析-序列3 -Producer -Java NIO(Reactor VS Peactor)

上一篇我们分析了Metadata的更新机制,其中涉及到一个问题,就是Sender如何跟服务器通信,也就是网络层。同很多Java项目一样,Kafka client的网络层也是用的Java NIO,然后在上面做了一层封装...

tantexian
2017/11/07
0
0
Kafka 源码分析2 : Network相关

原文出处:刘正阳 背景 我们直接跑到最底层,看看kafka的网络层处理是怎么处理的。因为Java的NIO还是偏底层,不能直接用来做应用开发,所以一般都使用像netty的框架或者按照自己的需要封装一...

刘正阳
05/20
0
0
Kafka 源码分析1 : 基础搭建和项目结构介绍

原文出处:刘正阳 背景 从kafka也算有两年了,用它做了不少项目,但是之前对它的认识也仅仅停留在一些从其他地方听到的概念和官方文档的documentation上在遇到一些问题时往往不知道其原理只能...

刘正阳
05/16
0
0
PHP微服务框架 PHP-MSF 发布 3.0.4 版本

PHP-MSF企业级微服务框架3.0.4发布 PHP微服务框架即“Micro Service Framework For PHP”,是Camera360社区服务器端团队基于Swoole自主研发现代化的PHP协程服务框架,简称msf或者php-msf,是...

wsdzadaq
2017/11/02
1K
7
Spring Kafka 1.1.7 和 1.2.3 发布

Spring Kafka 1.1.7 和 1.2.3 已发布,Spring Kafka 是 Spring 官方提供的一个 Spring 集成框架的扩展,用来为使用 Spring 框架的应用程序提供 Kafka 框架的集成。 点此了解更多: https://...

淡漠悠然
2017/09/21
0
1

没有更多内容

加载失败,请刷新页面

加载更多

DES/3DES(TripleDES)加密、解密测试数据

以下结果经 PHP+openssl及VB.NET验证,ECB模式。 PHP 7.0.10 (cli) (built: Aug 18 2016 09:48:53) ( ZTS ) OpenSSL Library Version: OpenSSL 1.0.1t 3 May 2016 VB.net 2003 ****** DES(S......

SamXIAO
31分钟前
1
2
Java11的新特性

Java语言特性系列 Java5的新特性 Java6的新特性 Java7的新特性 Java8的新特性 Java9的新特性 Java10的新特性 Java11的新特性 Java12的新特性 序 本文主要讲述一下Java11的新特性 版本号 java...

go4it
32分钟前
3
0
Maven常用命令及相关笔记

Maven常用命令 dos指令 4. 编译源代码: mvn compile 6. 运行测试: mvn test 8. 打包: mvn package 9. 在本地Repository中安装jar: mvn install 10. 清除产生的项目: mvn clean 4. 运行项...

颖伙虫
39分钟前
1
0
swagger2.2.2 与 spring cloud feign冲突 导致服务请求报空

swagger2.2.2 与 spring cloud feign冲突 Java代码 Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'com.choosefine.web.console.ar......

泉天下
42分钟前
1
0
设计模式之 明确责任 观察者模式 状态模式 责任链模式

观察者模式是任务分发的一种模式。 如果认为我们设计的系统的各个模块(或子系统)的最终目的是完成共同任务,那么这个任务如何分配到多个模块的就是我们遇到的第一个问题。简单设计场合我们...

backbye
46分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部