文档章节

Apache Kafka源码剖析:第3篇 Acceptor&Processor细节

强子哥哥
 强子哥哥
发布于 2017/08/13 03:22
字数 1076
阅读 33
收藏 0
点赞 0
评论 0

这一节,主要聊Acceptor。

主要功能是:接收请求,创建socket连接,并且分配给Processor处理。

/**
 * Thread that accepts and configures new connections. There is one of these per endpoint.
 */
private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

  private val nioSelector = NSelector.open()//注册监听socket的Selector对象!!!
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)//监听套接字!!!

  this.synchronized {//启动其管辖的Processor线程
    processors.foreach { processor =>
      Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor, false).start()
    }
  }

接下来,它的run方法是Acceptor的核心逻辑,我们看看具体实现:

 /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)//注册ACCEPT事件
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)//最多等待500毫秒的时间,看是否有socket过来!
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length//看来也是采用轮询的方案
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }

小贴士:

我们类比下Thrift的方案

 /**
   * A round robin load balancer for choosing selector threads for new
   * connections.
   */
  protected static class SelectorThreadLoadBalancer {
    private final Collection<? extends SelectorThread> threads;
    private Iterator<? extends SelectorThread> nextThreadIterator;

    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
      if (threads.isEmpty()) {
        throw new IllegalArgumentException("At least one selector thread is required");
      }
      this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
      nextThreadIterator = this.threads.iterator();
    }

    public SelectorThread nextThread() {
      // Choose a selector thread (round robin)
      if (!nextThreadIterator.hasNext()) {
        nextThreadIterator = threads.iterator();
      }
      return nextThreadIterator.next();
    }
  }
}
一旦到了最后,就回绕到第1个

可见,殊途同归,不解释!

接下来,重点就是accept函数

  /*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {

为了顺利进来,我们先打个断点如下:

stop in kafka.network.SocketServer$Acceptor.accept
stop in kafka.network.SocketServer$Acceptor.run
stop at kafka.network.SocketServer:335
stop at kafka.network.SocketServer:265
/*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()//调用accept函数获取 socket句柄
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)

      debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))

      processor.accept(socketChannel)//交给Processor处理,这个已经是通过轮询选中的
    } catch {
      case e: TooManyConnectionsException =>
        info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
        close(socketChannel)
    }
  }

我们看Processor怎么处理的

/**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

这个newConnections是个什么?

private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

是1个队列,恩,类比下Thrift怎么玩的

看到了吧,套路都一样。。。

那么, 这个新的连接是怎么被Processor处理的呢?

看代码

奥秘就在这里,我们再看看Thrift的玩法

真的没啥可说的,就这么回事吧

好,回到Kafka,我们知道Processor主要完成读取请求和写回响应。

Processor不参与具体的业务逻辑操作。

 

通过acceptor.accept创建的socket,通过processor.accept传给processor处理,

/**
   * Register any new connections that have been queued up
   */
  private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        val localHost = channel.socket().getLocalAddress.getHostAddress
        val localPort = channel.socket().getLocalPort
        val remoteHost = channel.socket().getInetAddress.getHostAddress
        val remotePort = channel.socket().getPort
        val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
        selector.register(connectionId, channel)//注册读事件
      } catch {
        // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
        // throwables will be caught in processor and logged as uncaught exceptions.
        case NonFatal(e) =>
          val remoteAddress = channel.getRemoteAddress
          // need to close the channel here to avoid a socket leak.
          close(channel)
          error(s"Processor $id closed connection from $remoteAddress", e)
      }
    }
  }

到这里,就注册了读事件,然后看Processor怎么处理读事件的!

 private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val openChannel = selector.channel(receive.source)
        // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
        val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)

        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
        requestChannel.sendRequest(req)//发给业务线程池,是通过requestChannel
        selector.mute(receive.source)
      } catch {
        case e @ (_: InvalidRequestException | _: SchemaException) =>
          // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
          error(s"Closing socket for ${receive.source} because of error", e)
          close(selector, receive.source)
      }
    }
  }
  /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

可见,把请求放入了队列,跟Thrift一模一样的

接下来,看这个队列如何被业务线程获取拿任务处理的!

在此之前,有1个细节

这个跟Thrift完全是一模一样啊

手法如出一辙。

回到kafka的代码,既然请求已经放到一个队列里了,那么就看业务线程如何处理了,下一节讲这个

 

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 856
博文 551
码字总数 647493
作品 8
南京
架构师
kafka系列文章索引

apache kafka在数据处理中特别是日志和消息的处理上会有很多出色的表现,这里写个索引,关于kafka的文章暂时就更新到这里,最近利用空闲时间在对kafka做一些功能性增强,并java化,虽然现在已...

Gaischen ⋅ 2013/03/25 ⋅ 7

kafka系列文章索引(结束)

apache kafka在数据处理中特别是日志和消息的处理上会有很多出色的表现,这里写个索引,关于kafka的文章暂时就更新到这里,最近利用空闲时间在对 kafka做一些功能性增强,并java化,虽然现在...

老先生二号 ⋅ 2017/05/28 ⋅ 0

源码圈 365 胖友的书单整理

🙂🙂🙂关注微信公众号:【芋道源码】有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问...

芋道源码掘金Java群217878901 ⋅ 2017/09/21 ⋅ 0

apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe ⋅ 2015/09/06 ⋅ 1

Java后端工程师学习大纲

之前自己总结过的Java后端工程师技能树,其涵盖的技术点比较全面,并非一朝一夕能够全部覆盖到的。对于一些还没有入门或者刚刚入门的Java后端工程师,如果一下子需要学习如此多的知识,想必很...

JackFace ⋅ 2016/07/08 ⋅ 0

分布式消息系统 Kafka 简介

Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。 ...

xrzs ⋅ 2014/08/19 ⋅ 0

Kafka文章索引(入门)

目录索引: 1)apache kafka消息服务 2)kafka在zookeeper中存储结构 3)kafka log4j配置 4)kafka replication设计机制 5)apache kafka监控系列-监控指标 6)kafka.common.ConsumerRebala...

阿莱倪士 ⋅ 2014/11/27 ⋅ 0

kafka 源码调研系列1 特色

kafka 相关调研很多,其中以FrankHui大神(http://my.oschina.net/ielts0909)的kafka系列文章非常精彩,悲催的是,前期调研时候没有看到,老老实实的看完了Apache kafka官方文档(http://ka...

川明君 ⋅ 2013/07/17 ⋅ 2

优秀Java书单整理

书籍列表 《Effective Java 中文版》 豆瓣评分:9.1【1235 人评价】 推荐理由:本书介绍了在Java编程中78条极具实用价值的经验规则,这些经验规则涵盖了大多数开发人员每天所面临的问题的解决...

yunlielai ⋅ 01/09 ⋅ 0

Mina源码阅读笔记(一)-整体解读

今天的这一节,将从整体上对mina的源代码进行把握,网上已经有好多关于mina源码的阅读笔记,但好多都是列举了一下每个接口或者类的方法。我倒是想从mina源码的结构和功能上对这个框架进行剖析...

Gaischen ⋅ 2012/11/19 ⋅ 19

没有更多内容

加载失败,请刷新页面

加载更多

下一页

知乎Java数据结构

作者:匿名用户 链接:https://www.zhihu.com/question/35947829/answer/66113038 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 感觉知乎上嘲讽题主简...

颖伙虫 ⋅ 今天 ⋅ 0

Confluence 6 恢复一个站点有关使用站点导出为备份的说明

推荐使用生产备份策略。我们推荐你针对你的生产环境中使用的 Confluence 参考 Production Backup Strategy 页面中的内容进行备份和恢复(这个需要你备份你的数据库和 home 目录)。XML 导出备...

honeymose ⋅ 今天 ⋅ 0

JavaScript零基础入门——(九)JavaScript的函数

JavaScript零基础入门——(九)JavaScript的函数 欢迎回到我们的JavaScript零基础入门,上一节课我们了解了有关JS中数组的相关知识点,不知道大家有没有自己去敲一敲,消化一下?这一节课,...

JandenMa ⋅ 今天 ⋅ 0

火狐浏览器各版本下载及插件httprequest

各版本下载地址:http://ftp.mozilla.org/pub/mozilla.org//firefox/releases/ httprequest插件截至57版本可用

xiaoge2016 ⋅ 今天 ⋅ 0

Docker系列教程28-实战:使用Docker Compose运行ELK

原文:http://www.itmuch.com/docker/28-docker-compose-in-action-elk/,转载请说明出处。 ElasticSearch【存储】 Logtash【日志聚合器】 Kibana【界面】 答案: version: '2'services: ...

周立_ITMuch ⋅ 今天 ⋅ 0

使用快嘉sdkg极速搭建接口模拟系统

在具体项目研发过程中,一旦前后端双方约定好接口,前端和app同事就会希望后台同事可以尽快提供可供对接的接口方便调试,而对后台同事来说定好接口还仅是个开始、设计流程,实现业务逻辑,编...

fastjrun ⋅ 今天 ⋅ 0

PXE/KickStart 无人值守安装

导言 作为中小公司的运维,经常会遇到一些机械式的重复工作,例如:有时公司同时上线几十甚至上百台服务器,而且需要我们在短时间内完成系统安装。 常规的办法有什么? 光盘安装系统 ===> 一...

kangvcar ⋅ 昨天 ⋅ 0

使用Puppeteer撸一个爬虫

Puppeteer是什么 puppeteer是谷歌chrome团队官方开发的一个无界面(Headless)chrome工具。Chrome Headless将成为web应用自动化测试的行业标杆。所以我们很有必要来了解一下它。所谓的无头浏...

小草先森 ⋅ 昨天 ⋅ 0

Java Done Right

* 表示难度较大或理论性较强。 ** 表示难度更大或理论性更强。 【Java语言本身】 基础语法,面向对象,顺序编程,并发编程,网络编程,泛型,注解,lambda(Java8),module(Java9),var(...

风华神使 ⋅ 昨天 ⋅ 0

Linux系统日志

linux 系统日志 /var/log/messages /etc/logrotate.conf 日志切割配置文件 https://my.oschina.net/u/2000675/blog/908189 logrotate 使用详解 dmesg 命令 /var/log/dmesg 日志 last命令,调......

Linux学习笔记 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部