Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaChannel

原创
2018/09/14 09:37
阅读数 5.7K

一、SocketChannel和KafkaChannel有什么区别?

上篇文章说道KafkaSelector在创建一个连接的时候和普通的nioSelector并没有什么不同,它是基于nioSelector的封装。我们知道创建连接的一系列操作都是由Channel去完成,而KafkaChannel实际上就是对它的进一步封装:      KafkaChannel不仅封装了SocketChannel,还封装了Kafka自己的认证器Authenticator,和读写相关的NetworkReceive、Send。NetworkReceive和Send的底层都是通过ByteBuffer来实现的。

二、KafkaChannel的创建

实际上基本等同于KafkaSelector的创建:

按照普通的方式创建完通道后,将其注册到NioSelector上,并关注OP_CONNECT,再以节点Id,SelectionKey来创建KafkaChannel,这里先不详细说明KafkaChannel,它是对通道的进一步封装。在创建完KafkaChannel后,将KafkaChannel与SelectionKey、节点ID做进一步绑定。

		SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 将当前这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 创建KafkaChannel
        key.attach(channel);// 将channel绑定到key上
        this.channels.put(id, channel);// 将 nodeId 和 Channel绑定

这样有一个好处,首先KafkaChannel中包含了节点ID与SelectionKey,而我们也可以根据节点ID来拿到KafkaChannel,同样可以根据SelectionKey来拿到KafkaChannel,这就意味着,我们只要拿到了KafkaChannel、SelectionKey、节点ID中的任意一个,都可以通过这些引用关系拿到彼此,从而进行相关操作

三、预发送

实际上就是将要发送的ByteBuffer扔进KafkaChannel,此时并未进行IO操作,这里的Send对象,实际上就是对ByteBuffer的进一步封装,它主要包含了将要发往的节点ID、ByteBuffer大小、是否发送完毕等信息。我们这里根据节点ID,从我们刚才的channels中,取出KafkaChannel。

public void send(Send send) {
        // 看看send要发的这个nodeId在不在
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            // 把数据扔进KafkaChannel中(只能放一个,放多个会报错),并关注write事件
            channel.setSend(send);
        } catch (CancelledKeyException e) {

            // 失败了加一条node_id的失败记录
            this.failedSends.add(send.destination());
            close(channel);
        }
    }

这个KafkaChannel的setSend方法实际上非常简单,就是将要发送的send对象的引用交给KafkaChannel中的send。并且使这个channel的SelectionKey去关注OP_WRITE事件。

this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);

四、nio中的io操作

在上篇文章里,我们知道KafkaSelector也是通过轮询器去进行IO操作,看一下原始的nioSelector是如何进行io操作的:

public class NioEchoServer {
    private static final int BUF_SIZE = 256;
    private static final int TIMEOUT = 3000;

    public static void main(String args[]) throws Exception {
        // 打开服务端 Socket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 打开 Selector
        Selector selector = Selector.open();

        // 服务端 Socket 监听8080端口, 并配置为非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        // 将 channel 注册到 selector 中.
        // 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
        // 注册到 Selector 中.
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
            if (selector.select(TIMEOUT) == 0) {
                System.out.print(".");
                continue;
            }

            // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

            while (keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
                keyIterator.remove();

                if (key.isAcceptable()) {
                    // 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
                    // 代表客户端的连接
                    // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
                    // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                    clientChannel.configureBlocking(false);
                    //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
                    // 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
                    clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                }

                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    long bytesRead = clientChannel.read(buf);
                    if (bytesRead == -1) {
                        clientChannel.close();
                    } else if (bytesRead > 0) {
                        key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                        System.out.println("Get data length: " + bytesRead);
                    }
                }

                if (key.isValid() && key.isWritable()) {
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    buf.flip();
                    SocketChannel clientChannel = (SocketChannel) key.channel();

                    clientChannel.write(buf);

                    if (!buf.hasRemaining()) {
                        key.interestOps(OP_READ);
                    }
                    buf.compact();
                }
            }
        }
    }
}

五、kafkaChannel 如何进行io操作?

####1、读操作 首先,进行是否可以开始读操作的判断。1、channel.ready(),这里做了两个判断,一个是Kafka的认证器是否认证通过,另一个则是是否握手成功。2、key.isReadable(),selectionKey是否关注了OP_READ。3、!hasStagedReceive(channel),判断该channel是否在hasStagedReceive这个map里面,如果该channel正在读,那么它会在这个map里面,直到读取完成。

			// channel是否已经准备好从连接中读取任何可读数据
            /* if channel is ready read from any connections that have readable data */
            if (channel.ready() // 连接的三次握手完成,并且 todo 权限验证通过
                && key.isReadable() // key是否关注了read事件
                && !hasStagedReceive(channel)) {// todo 这个通道不能是正在读数据的,因为在读的时候,会把这个channel扔进stagedReceives里面
                NetworkReceive networkReceive;

                /**
                 * 实际上这里就是分多次去一个channel取数据,直到取完,并将其保存在key:channel  value:new ArrayDeque<NetworkReceive> 中
                 */
                 while ((networkReceive = channel.read()) != null) {
                    // 将多次接收的数据放进stagedReceives下channel的Deque里面
                    addToStagedReceives(channel, networkReceive);
                }
            }

剩下的channel.read()就比较简单了,KafkaChannel里面封装了一个NetworkReceives,而NetworkReceives主要就是对ByteBuffer的封装。

我们将该NioChannel传入,调用channel.read(size)方法,这个size,其实就是一个ByteBuffer,它是kafka协议中用来判断包体有多长的包头。

第一步,先判断byteBuffer(size)中是否还有剩余空间

第二步,从nioChannel中将数据读到byteBuffer中

第三步,判断byteBuffer是不是装满了

第四步,如果装满了,证明size这个bytebuffer已经拿到了包体的长度,调用readInt获取其capacity,再用这个capacity去申请一个用于接收包体的byteBuffer(buffer)。

第五步,正式地将channel中的数据中读取到byteBuffer(buffer)

 public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
        int read = 0;
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0) {
                throw new EOFException();
            }
            read += bytesRead;
            if (!size.hasRemaining()) {
                size.rewind();
                int receiveSize = size.getInt();
                if (receiveSize < 0) {
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                }
                if (maxSize != UNLIMITED && receiveSize > maxSize) {
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                }

                this.buffer = ByteBuffer.allocate(receiveSize);
            }
        }

        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0) {
                throw new EOFException();
            }
            read += bytesRead;
        }

        return read;
    }

读取完成之后,再做一下校验:就会返回了,也就是上面while ((networkReceive = channel.read()) != null)拿到的这个networkReceives,里面装着包头和包体。这里Kafka有一个小操作,就是将kafkaChannel内的networkReceive的引用赋值给外面的这个networkReceive后,会将kafkaChannel内的networkReceive的引用置为空。

/**
     * 接收数据,将数据保存在 NetworkReceive
     */
    public NetworkReceive read() throws IOException {
        NetworkReceive result = null;

        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id);
        }

        receive(receive);// 这个方法就是上面说了一大堆第一步第二步第三步的那个方法。
        if (receive.complete()) {
            receive.payload()
                   .rewind();
            result = receive;
            receive = null;
        }
        return result;
    }

####2、写操作 写操作要比读操作更加简单,上面有一个预发送操作,就是将要send的对象Send

			   /**
                 * 发送时其实也有一次没发送完的情况,每发送完的话,就不会出现在completedSends里面
                 */
                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                // 如果channel已经ready 并且 我们有数据来准备好写sockets
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    // 这里会将KafkaChannel的send字段发送出去,
                    // 如果未完成发送,或者没发完,则返回null
                    // 发送成功则返回send对象
                    if (send != null) {
                        this.completedSends.add(send);// 添加到completedSends集合
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

主要的发送方法就是channel.write();

 public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

而write方法中最核心的方法则是send(send),这个send对象也是一个byteBuffer对象。底层中的底层还是调用了channel.write(byteBuffer方法)

 @Override
    public long writeTo(GatheringByteChannel channel) throws IOException {
        long written = channel.write(buffers);
        if (written < 0) {
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        }
        remaining -= written;
        // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
        // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
        // GatheringByteChannel or ScatteringByteChannel.

        // 这是一个临时工作区,当发送时,接收数据的接口一直被BlockingChannel使用着。
        // 一旦BlockingChannel 被移除,我们就可以开始我们的发送操作,接收通过 transportLayer 来工作而不是 GatheringByteChannel 或 ScatteringByteChannel
        if (channel instanceof TransportLayer) {
            pending = ((TransportLayer) channel).hasPendingWrites();
        }

        return written;
    }

参考: Java NIO 的前生今世 之四 NIO Selector 详解 《Apache Kafka 源码剖析》 - 徐郡明著 Apache Kafka 源码 0.10.0.1

展开阅读全文
打赏
0
3 收藏
分享
加载中
您好,请问在什么情况下会发生这种情况呢:if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
2020/02/13 09:36
回复
举报
更多评论
打赏
1 评论
3 收藏
0
分享
返回顶部
顶部