文档章节

kafka-网络层KafkaChannel

T
 Thinking--
发布于 2017/07/25 13:45
字数 1519
阅读 21
收藏 0
点赞 0
评论 0

KafkaChannel介绍

KafkaChannel负责基于socket的连接,认证,数据读取发送。它包含TransportLayer和Authenticator两个部分。TransportLayer负责数据交互,Authenticator负责安全验证。

框架图

输入图片说明

ChannelBuilders

ChannelBuilders提供了实例化ChannelBuilder的工厂方法,clientChannelBuilder和serverChannelBuilder

public class ChannelBuilders {
    // 这里构造为私有方法,表明这个类只提供类方法
    private ChannelBuilders() { }
    
    // 实例化客户端使用的ChannelBuilder
    public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol,
            JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName,
            String clientSaslMechanism, boolean saslHandshakeRequestEnable) {
        return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName,                         
clientSaslMechanism, saslHandshakeRequestEnable, null);
    }

    // 实例化服务端使用的ChannelBuilder
    public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
                               SecurityProtocol securityProtocol, AbstractConfig config,
                                CredentialCache credentialCache) {
        return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null,  true, credentialCache);
    }

    private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode,
                                         JaasContext.Type contextType, AbstractConfig config,
                                         ListenerName listenerName, String clientSaslMechanism,
                                         boolean saslHandshakeRequestEnable,  CredentialCache credentialCache) {
        .......
        ChannelBuilder channelBuilder;
        // 根据Protocol,选择不同的channelBuidler
        switch (securityProtocol) {
            case SSL:
                // 基于ssl
                requireNonNullMode(mode, securityProtocol);
                channelBuilder = new SslChannelBuilder(mode);
                break;
            case SASL_SSL:
            case SASL_PLAINTEXT:
                // 基于sasl
                requireNonNullMode(mode, securityProtocol);
                JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
                channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol,
                        clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
                break;
            case PLAINTEXT:
            case TRACE:
                // 没有任何加密
                channelBuilder = new PlaintextChannelBuilder();
                break;
            default:
                throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
        }

        channelBuilder.configure(configs);
        return channelBuilder;
    }

PlaintextChannelBuilder类

ChannelBuidler是接口,实现其接口的有PlaintextChannelBuilder, SaslChannelBuilder,SslChannelBuilder。其中PlaintextChannelBuilder最为简单,所以这里以它为例。 ChannelBuidler中最主要的方法是buildChannel,它会创建transportLayer和authenticator,来实例化KafkaChannel。

public class PlaintextChannelBuilder implements ChannelBuilder {
    
    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
        try {
            // 实例化TransportLayer
            PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
            // 实例化Authenticator
            Authenticator authenticator = new DefaultAuthenticator();
            authenticator.configure(transportLayer, this.principalBuilder, this.configs);
            // 返回KafkaChannel
            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
        } catch (Exception e) {
            log.warn("Failed to create channel due to ", e);
            throw new KafkaException(e);
        }
    }
}

Selector回顾

先回到Selector的pollSelectionKeys方法,它表明了KafkaChannel方法是何时被调用

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            KafkaChannel channel = channel(key);
            if (isImmediatelyConnected || key.isConnectable()) {
                    // 调用channel的finishConnect方法,处理连接
                    if (channel.finishConnect()) {
                        ......
                    } else
                        continue;
                }

                
                if (channel.isConnected() && !channel.ready())
                    // 然后调用channel的prepare方法,做准备工作(比如ssl连接的握手过程)
                    channel.prepare();

               
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    // 当channel准备工作完成,调用channel的read方法,读取请求
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }
            }
            

        .......
    }

KafkaChannel

KafkaChannel负责连接,数据读取,发送

public class KafkaChannel {
    // 首先完成连接
    public boolean finishConnect() throws IOException {
        boolean connected = transportLayer.finishConnect();
        if (connected)
            state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE;
        return connected;
    }

    public boolean isConnected() {
        return transportLayer.isConnected();
    }

    public void prepare() throws IOException {
        //然后握手
        if (!transportLayer.ready())
            transportLayer.handshake();
        // 认证
        if (transportLayer.ready() && !authenticator.complete())
            authenticator.authenticate();
        if (ready())
            // 如果都完成,更新状态
            state = ChannelState.READY;
    }
    
    public boolean ready() {
        // 当transportLayer和authenticator都完成,channel才认为状态准备好了
        return transportLayer.ready() && authenticator.complete();
    }
    
    // channel的读取请求
    public NetworkReceive read() throws IOException {
        NetworkReceive result = null;

        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id);
        }
        // 读取请求
        receive(receive);
        if (receive.complete()) {
            receive.payload().rewind();
            result = receive;
            receive = null;
        }
        return result;
    }

    private long receive(NetworkReceive receive) throws IOException {
        // 调用NetworkReceive的readFrom方法
        return receive.readFrom(transportLayer);
    }
    
    // 设置send,但是并不着急发送,等待transportLayer写事件就绪
    public void setSend(Send send) {
        if (this.send != null)
            // 只能一次发送一个Send
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        // 监听写事件
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

    // 如果没有发送完,返回null。如果发送完,返回send。并且更新this.send为null
    public Send write() throws IOException {
        Send result = null;
        // 调用send发送
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

    private boolean send(Send send) throws IOException {
        // 调用Send的writreTo方法
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
        return send.completed();
    }

NetworkReceive

NetworkReceive表示一个请求。数据格式为

| size | data |

size 表示data的长度,为4个字节的int类型 data则为请求的数据,长度为size

public class NetworkReceive implements Receive {
     // channel的id,表示这个请求是属于哪个channel
    private final String source;
    // 只有4个字节,读取请求的size
    private final ByteBuffer size;
    // 请求数据的最大长度
    private final int maxSize;
    // 请求数据
    private ByteBuffer buffer;

    public NetworkReceive(int maxSize, String source) {
        this.source = source;
        // 这里只分配4个字节
        this.size = ByteBuffer.allocate(4);
        this.buffer = null;
        this.maxSize = maxSize;
    }

    public long readFrom(ScatteringByteChannel channel) throws IOException {
        return readFromReadableChannel(channel);
    }
    
    
    public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
        int read = 0;
        // 检查是否已经完成读取size
        if (size.hasRemaining()) {
            // 读取数据的前4个字节,表示请求数据的大小
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {
                // 如果读取完成
                size.rewind();
                // 获取请求数据的大小receiveSize
                int receiveSize = size.getInt();
                // 检查数据大小的合理
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                // 根据receiveSize,分配buffer
                this.buffer = ByteBuffer.allocate(receiveSize);
            }
        }
        // buffer已经分配了,表明size读取完
        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
        }

        return read;
    }
    
    // 返回请求数据
    public ByteBuffer payload() {
        return this.buffer;
    }
    
    // 当size和buffer都读取玩,则返回true
    public boolean complete() {
        return !size.hasRemaining() && !buffer.hasRemaining();
    }

NetworkSend

NetworkSend只是继承ByteBufferSend,增加了两个类方法

public class NetworkSend extends ByteBufferSend {

    public NetworkSend(String destination, ByteBuffer buffer) {
        //为buffer添加sizeBuffer,然后初始化父类ByteBufferSend
        super(destination, sizeDelimit(buffer));
    }
    
    // 为buffer添加一个size的sizeBuffer,组成ByteBuffer数组
    private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {
        return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
    }
    
    // 实例化4个字节的ByteBuffer,使用int初始化
    private static ByteBuffer sizeBuffer(int size) {
        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
        sizeBuffer.putInt(size);
        sizeBuffer.rewind();
        return sizeBuffer;
    }

}

public class ByteBufferSend implements Send {
    // 发送地址
    private final String destination;
    // 响应数据的总大小
    private final int size;
    protected final ByteBuffer[] buffers;
    // remaining表示buffer中未写完的数据长度
    private int remaining;
    // 表示是否channel中还有数据未发送
    private boolean pending = false;
    
    public ByteBufferSend(String destination, ByteBuffer... buffers) {
        this.destination = destination;
        this.buffers = buffers;
        // 计算所有buffer的总大小
        for (ByteBuffer buffer : buffers)
            remaining += buffer.remaining();
        this.size = remaining;
    }

    @Override
    public boolean completed() {
         // 数据首先会从buffer中写入到channel,然后channel再把数据写入到真实的socket中
        return remaining <= 0 && !pending;
    }

    @Override
    public long writeTo(GatheringByteChannel channel) throws IOException {
        // 写入到channel中
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        // 更新remaining
        remaining -= written;
        // 检查pending状态
        pending = TransportLayers.hasPendingWrites(channel);
        return written;
    }
}

PlaintextTransportLayer

上面NetworkReceive和NetworkSend调用了TransportLayer的方法, channel.write和channel.read。 TransportLayer是接口,PlaintextTransportLayer是实现TransportLayer的类之一,因为它比较简单,所以这里以它为例。

public class PlaintextTransportLayer implements TransportLayer {

    private final SelectionKey key;
    private final SocketChannel socketChannel;

    public PlaintextTransportLayer(SelectionKey key) throws IOException {
        this.key = key;
        this.socketChannel = (SocketChannel) key.channel();
    }
    //调用socketChannel的read方法
    public long read(ByteBuffer[] dsts) throws IOException {
        return socketChannel.read(dsts);
    }
    //调用socketChannel的write方法
    public int write(ByteBuffer src) throws IOException {
        return socketChannel.write(src);
    }
}

概括

类之间的关系。ChannelBuilders实例化ChannelBuilder,ChannelBuilder实例化TransportLayer和Authenticator, 然后实例化ChannelBuidler。ChannelBuidler然后实例化KafkaChannel,KafkaChannel使用NetworkSend表示发送数据,NetworkReceive表示接收数据。

© 著作权归作者所有

共有 人打赏支持
T
粉丝 5
博文 49
码字总数 44403
作品 0
武汉
Kafka 源码分析2 : Network相关

原文出处:刘正阳 背景 我们直接跑到最底层,看看kafka的网络层处理是怎么处理的。因为Java的NIO还是偏底层,不能直接用来做应用开发,所以一般都使用像netty的框架或者按照自己的需要封装一...

刘正阳
05/20
0
0
Using Kafka with Flume

这个文档是 Cloudera Distribution of Apache Kafka 1.3.x. 其他版本的文档在Cloudera Documentation. Using Kafka with Flume 在CDH 5.2.0 及更高的版本中, Flume 包含一个Kafka source an......

晨磊
2015/08/29
852
0
flume_kafkaChannel_kafkaSink

agent.sources = source 抽取类型为目录 agent.sources.source.type = spooldir 抽取的文件目录 agent.sources.source.spoolDir = /root/tmp/flume/data 添加一个存储绝对路径文件名的头 ag...

tanj123
04/17
0
0
Kafka 源码分析1 : 基础搭建和项目结构介绍

原文出处:刘正阳 背景 从kafka也算有两年了,用它做了不少项目,但是之前对它的认识也仅仅停留在一些从其他地方听到的概念和官方文档的documentation上在遇到一些问题时往往不知道其原理只能...

刘正阳
05/16
0
0
Kafka源码分析-序列3 -Producer -Java NIO(Reactor VS Peactor)

上一篇我们分析了Metadata的更新机制,其中涉及到一个问题,就是Sender如何跟服务器通信,也就是网络层。同很多Java项目一样,Kafka client的网络层也是用的Java NIO,然后在上面做了一层封装...

tantexian
2017/11/07
0
0
架构设计:系统间通信(29)——Kafka及场景应用(中2)

接上文:《架构设计:系统间通信(28)——Kafka及场景应用(中1)》 4-3、复制功能 我们在上文中已经讨论了Kafka使用分区的概念存储消息,一个topic可以有多个分区它们分布在整个Kafka集群的...

yinwenjie
2016/05/06
0
0
天猫“双11”成交额实时统计技术详解

阿里巴巴资深技术专家莫问在2017年12月20日云栖大会北京峰会上做了题为“Apache Flink技术进阶”的主题演讲。Apache Flink作为流式计算引擎,支持了“双十一对的”实时计算,已经被国内外的公...

younger123
2017/12/29
0
0
闫燕飞:Kafka的高性能揭秘及优化

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文首发在云+社区,未经许可,不得转载。 大家下午好,我是来自腾讯云基础架构部ckafka团队的高级工程师闫燕飞。今天在这里首先...

腾讯云加社区
05/03
0
0
PHP微服务框架 PHP-MSF 发布 3.0.4 版本

PHP-MSF企业级微服务框架3.0.4发布 PHP微服务框架即“Micro Service Framework For PHP”,是Camera360社区服务器端团队基于Swoole自主研发现代化的PHP协程服务框架,简称msf或者php-msf,是...

wsdzadaq
2017/11/02
1K
7
携程实时用户数据采集与分析系统

一、携程实时用户数据采集系统设计实践 随着移动互联网的兴起,特别是近年来,智能手机、pad等移动设备凭借便捷、高效的特点风靡全球,同时各类APP的快速发展进一步降低了移动互联网的接入门...

大数据之路
2014/08/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

前端基础

1. get请求传参长度的误区 误区:我们经常说get请求参数的大小存在限制,而post请求的参数大小是无限制的。 实际上HTTP 协议从未规定 GET/POST 的请求长度限制是多少。对get请求参数的限制是...

wenxingjun
15分钟前
0
0
Android 复制和粘贴功能

做了一回搬运工,原文地址:https://blog.csdn.net/kennethyo/article/details/76602765 Android 复制和粘贴功能,需要调用系统服务ClipboardManager来实现。 ClipboardManager mClipboardM...

她叫我小渝
今天
0
0
拦截SQLSERVER的SSL加密通道替换传输过程中的用户名密码实现运维审计(一)

工作准备 •一台SQLSERVER 2005/SQLSERVER 2008服务 •SQLSERVER jdbc驱动程序 •Java开发环境eclipse + jdk1.8 •java反编译工具JD-Core 反编译JDBC分析SQLSERVER客户端与服务器通信原理 SQ...

紅顏為君笑
今天
6
0
jQuery零基础入门——(六)修改DOM结构

《jQuery零基础入门》系列博文是在廖雪峰老师的博文基础上,可能补充了个人的理解和日常遇到的点,用我的理解表述出来,主干出处来自廖雪峰老师的技术分享。 在《零基础入门JavaScript》的时...

JandenMa
今天
0
0
linux mint 1.9 qq 安装

转: https://www.jianshu.com/p/cdc3d03c144d 1. 下载 qq 轻聊版,可在百度搜索后下载 QQ7.9Light.exe 2. 去wine的官网(https://wiki.winehq.org/Ubuntu) 安装 wine . 提醒网页可以切换成中...

Canaan_
今天
0
0
PHP后台运行命令并管理运行程序

php后台运行命令并管理后台运行程序 class ProcessModel{ private $pid; private $command; private $resultToFile = ''; public function __construct($cl=false){......

colin_86
今天
1
0
数据结构与算法4

在此程序中,HighArray类中的find()方法用数据项的值作为参数传递,它的返回值决定是否找到此数据项。 insert()方法向数组下一个空位置放置一个新的数据项。一个名为nElems的字段跟踪记录着...

沉迷于编程的小菜菜
今天
1
1
fiddler安装和基本使用以及代理设置

项目需求 由于开发过程中客户端和服务器数据交互非常频繁,有时候服务端需要知道客户端调用接口传了哪些参数过来,这个时候就需要一个工具可以监听这些接口请求参数,已经接口的响应的数据,这种...

银装素裹
今天
0
0
Python分析《我不是药神》豆瓣评论

读取 Mongo 中的短评数据,进行中文分词 对分词结果取 Top50 生成词云 生成词云效果 看来网上关于 我不是药神 vs 达拉斯 的争论很热啊。关于词频统计就这些,代码中也会完成一些其它的分析任...

猫咪编程
今天
0
0
虚拟机怎么安装vmware tools

https://blog.csdn.net/tjcwt2011/article/details/72638977

AndyZhouX
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部