Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaSelector

原创
2018/09/14 09:36
阅读数 939

##NioSelector和KafkaSelector有什么区别?

先说结论,KafkaSelector(org.apache.kafka.common.network.selector)是对NioSelector(java.nio.channels.Selector)的进一步封装。回想一下NioSelector,它参与了IO中的哪些过程?

1、创建一个通道,并将通道注册到NioSelector上,我们可以得到一个SelectionKey 2、轮询NioSelector中的ready集合,拿到对应的SelectionKey,并根据这个SelectionKey所关注的事件去执行对应的操作

实际上,KafkaSelector也是在调用NioSelector去执行这些操作,待补充……

##一、创建连接

KafkaSelector创建连接,和普通的NioSelector并没有什么不同,首先创建一个通道,并将其设置为非阻塞式的长连接,设置完毕后,执行连接操作。

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);// 非阻塞模式
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);// 设置为长连接
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
            socket.setSendBufferSize(sendBufferSize);// 设置SO_SNDBUF 大小
        }
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
            socket.setReceiveBufferSize(receiveBufferSize);// 设置 SO_RCVBUF 大小
        }
        socket.setTcpNoDelay(true);
        boolean connected;
        try {
            connected = socketChannel.connect(address);// 因为是非阻塞模式,所以方法可能会在连接正式连接之前返回
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }
       

创建完通道后,将其注册到NioSelector上,并关注OP_CONNECT,再以节点Id,SelectionKey来创建KafkaChannel,这里先不详细说明KafkaChannel,它是对通道的进一步封装。在创建完KafkaChannel后,将KafkaChannel与SelectionKey、节点ID做进一步绑定。

        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 将当前这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 创建KafkaChannel
        key.attach(channel);// 将channel绑定到key上
        this.channels.put(id, channel);// 将 nodeId 和 Channel绑定

这样有一个好处,首先KafkaChannel中包含了节点ID与SelectionKey,而我们也可以根据节点ID来拿到KafkaChannel,同样可以根据SelectionKey来拿到KafkaChannel,这就意味着,我们只要拿到了KafkaChannel、SelectionKey、节点ID中的任意一个,都可以通过这些引用关系拿到彼此,从而进行相关操作。

##二、预发送 实际上就是将要发送的ByteBuffer扔进KafkaChannel,此时并未进行IO操作,这里的Send对象,实际上就是对ByteBuffer的进一步封装,它主要包含了将要发往的节点ID、ByteBuffer大小、是否发送完毕等信息。我们这里根据节点ID,从我们刚才的channels中,取出KafkaChannel。

 public void send(Send send) {
        // 看看send要发的这个nodeId在不在
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            // 把数据扔进KafkaChannel中(只能放一个,放多个会报错),并关注write事件
            channel.setSend(send);
        } catch (CancelledKeyException e) {

            // 失败了加一条node_id的失败记录
            this.failedSends.add(send.destination());
            close(channel);
        }
    }

##三、进行IO操作 来到了我们比较熟悉的轮询环节,从NioSelector中取出所有SelectionKey进行轮询。

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);// 处理I/O的核心方法
            pollSelectionKeys(immediatelyConnectedKeys, true);
}
 
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
		Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            // 创建连接时(connect)将kafkaChannel注册到key上,就是为了在这里获取
            KafkaChannel channel = channel(key);

……………………


#####1、判断一下key 连接好了没有,因为我们用的是非阻塞连接,所以到了轮询阶段,还没有完成连接是正常的。

				if (isImmediatelyConnected || key.isConnectable()) {
                    // finishConnect方法会先检测socketChannel是否建立完成,建立后,会取消对OP_CONNECT事件关注,//TODO 并开始关注OP_READ事件
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());// 将当前channel id 添加到已连接的集合中
                        this.sensors.connectionCreated.record();
                    } else {
                        continue;// 代表连接未完成,则跳过对此Channel的后续处理
                    }
                }

#####2、身份验证(略过) #####3、判断KafkaChannel有没有准备好,有没有关注OP_READ,能不能读之类的,并进行读操作。 这里有一个判断,就是判断当前的KafkaChannel是不是在StagedReceives里。我们往后看看,在从网络上读取数据时,我们会将KafkaChannel扔进StagedReceives里,也就是说,如果这个KafkaChannel已经在StagedReceives里了,那么代表它已经在读数据了。

			  if (channel.ready() // 连接的三次握手完成,并且 todo 权限验证通过
                    && key.isReadable() // key是否关注了read事件
                    && !hasStagedReceive(channel)) {// todo 这个通道不能是正在读数据的,因为在读的时候,会把这个channel扔进stagedReceives里面
                    NetworkReceive networkReceive;

                    /**
                     * 实际上这里就是分多次去一个channel取数据,直到取完,并将其保存在key:channel  value:new ArrayDeque<NetworkReceive> 中
                     */
                    while ((networkReceive = channel.read()) != null) {
                        addToStagedReceives(channel, networkReceive);
                    }
                }

#####4、判断KafkaChannel有没有准备好,有没有关注OP_WRITE,并进行写操作

			   if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    // 这里会将KafkaChannel的send字段发送出去,
                    // 如果未完成发送,或者没发完,则返回null
                    // 发送成功则返回send对象
                    if (send != null) {
                        this.completedSends.add(send);// 添加到completedSends集合
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

##四、关闭空闲连接 在每一次IO操作完毕后,KafkaSelector都会调用一个方法,去关闭掉那些没怎么用的连接,实际上它就是一个基于时间戳的断连机制。 KafkaSelector中维护了一个哈希表,

LinkedHashMap<String, Long> lruConnections (new LinkedHashMap<>(16, .75F, true);

在每次进行IO操作时,将Key:节点ID,Value:当前时间戳扔进哈希表里面,在IO操作进行完毕时,检查一下,最大的那个节点,它的最后一次IO时间+connectionsMaxIdleNanos(创建KafkaSelector时指定),是否超过了当前的时间。 如果是,这个连接就会被关掉。

比如说connectionsMaxIdleNanos被指定成了1分钟,那么如果这个有序哈希表的最后一个节点的时间是一分钟之前,那么这个节点ID的通道将会被关掉。

 private void maybeCloseOldestConnection() {
        if (currentTimeNanos > nextIdleCloseCheckTime) {
            if (lruConnections.isEmpty()) {
                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
            } else {
                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet()
                                                                              .iterator()
                                                                              .next();
                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
                if (currentTimeNanos > nextIdleCloseCheckTime) {
                    String connectionId = oldestConnectionEntry.getKey();
                    if (log.isTraceEnabled()) {
                        log.trace("About to close the idle connection from " + connectionId
                            + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
                    }

                    disconnected.add(connectionId);
                    close(connectionId);
                }
            }
        }
    }

参考: 《Apache Kafka 源码剖析》 - 徐郡明著 Apache Kafka 源码 0.10.0.1

展开阅读全文
打赏
0
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部