Kafka.network包源码解读

原创
2013/01/15 18:41
阅读数 2.8K

最近阅读了kafka network包的源码,主要是想了解下kafka底层通信的一些细节,这部分都是用NIO实现的,并且用的是最基本的NIO实现模板,代码阅读起来也比较简单。抛开zookeeper这部分的通信不看,我们就看最基本的producerconsumer之间的基于NIO的通信模块。在network中主要包含以下类:

我们挑选几个最主要的类说明,先从SocketServer的描述看起:

/**
 * An NIO socket server. The thread model is
 *   1 Acceptor thread that handles new connections
 *   N Processor threads that each have their own selectors and handle all requests from their connections synchronously
 */
SocketServer 中采用 processors 数组保存 processor
Private val processors = new Array[Processor](numProcessorThreads)

AbstractServerThread继承了runnable,其中采用闭锁控制开始和结束,主要作用是为了实现同步。同时打开selector,为后续的继承者使用。

protected val selector = Selector.open();
  protected val logger = Logger.getLogger(getClass())
  private val startupLatch = new CountDownLatch(1)
  private val shutdownLatch = new CountDownLatch(1)
  private val alive = new AtomicBoolean(false)
这个类是后续讲到的两个类的基类,并且闭锁的应用是整个同步作用实现的关键,我们看一组 stratup 的闭锁操作,其中 Unit scala 语法中你可以把他认为是 void ,也就是方法的返回值为空:
/**
   * Wait for the thread to completely start up
   */
  def awaitStartup(): Unit = startupLatch.await

  /**
   * Record that the thread startup is complete
   */
  protected def startupComplete() = {
    alive.set(true)
    startupLatch.countDown
  }
Acceptor继承了AbstractServerThread,虽然叫Acceptor,但是它并没有单独拿出来使用,而是直接被socketServer引用,这点在命名和使用上与一般的通信框架不同:
private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {

这个类中主要实现了ServerSocketChannel的相关工作:

val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    serverChannel.socket.bind(new InetSocketAddress(port))
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    logger.info("Awaiting connections on port " + port)
    startupComplete()

其内部操作和NIO一样:

/*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
    
    val socketChannel = serverSocketChannel.accept()
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    if (logger.isDebugEnabled()) {
      logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() 
          + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]")
    }

    processor.accept(socketChannel)
  }

Procesor类继承了abstractServerThread,其实主要是在Acceptor类中的accept方法中,又新启一个线程来处理读写操作:

private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
                               val time: Time,
                               val stats: SocketServerStats,
                               val maxRequestSize: Int) extends AbstractServerThread

所以整个kafka中使用的NIO的模型可以归结为下图:

socketServer中引用Acceptor处理多个client过来的connector,并为每个connection创建出一个processor去单独处理,每个processor中均引用独立的selector。

整体来说,这样的设计和我们在用NIO写传统的通信没有什么区别,只是这里在同步上稍微做了点儿文章。更详细的网络操作还是请看mina系列的分析。

展开阅读全文
打赏
3
9 收藏
分享
加载中
更多评论
打赏
0 评论
9 收藏
3
分享
返回顶部
顶部