文档章节

Kafka-ProducerRecord的异步发送过程-上半部

强子哥哥
 强子哥哥
发布于 2017/09/09 13:45
字数 1320
阅读 42
收藏 0
点赞 0
评论 0
new ProducerRecord(topic, index + messageNo, messageStr)

如果是这样构造的话,实际上调用了下面的构造函数

    /**
     * Create a record to be sent to Kafka
     * 
     * @param topic The topic the record will be appended to
     * @param key The key that will be included in the record
     * @param value The record contents
     */
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value);
    }

也就是说,我们指定了topic,key,value,还有2个默认的取值null

---

/**
     * Creates a record with a specified timestamp to be sent to a specified topic and partition
     * 
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param timestamp The timestamp of the record
     * @param key The key that will be included in the record
     * @param value The record contents
     */
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
    }

可以看到,只是1个简单的赋值

下面调用函数来发送

producer.send(new ProducerRecord(topic, index + messageNo, messageStr)).get();
 @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

这里我们没有指定callback

Step completed: "thread=main", org.apache.kafka.clients.producer.KafkaProducer.send(), line=435 bci=0
435            ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);

main[1] print this.interceptors
 this.interceptors = null
main[1] next
> 
Step completed: "thread=main", org.apache.kafka.clients.producer.KafkaProducer.send(), line=436 bci=20
436            return doSend(interceptedRecord, callback);

最终走doSend函数来。

发送的时候,可能这个topic相关的信息还没有拉过来,接下来先本地查询

 private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        metadata.add(topic);
        Cluster cluster = metadata.fetch();
        Integer partitionsCount = cluster.partitionCountForTopic(topic);

如果发现没有的话,就会触发这个meta信息的拉取


Step completed: "thread=main", org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(), line=533 bci=68
533            long remainingWaitMs = maxWaitMs;

main[1] main[1] !!
next
> 
Step completed: "thread=main", org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(), line=540 bci=71
540                log.trace("Requesting metadata update for topic {}.", topic);

main[1] next
> 
Step completed: "thread=main", org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(), line=541 bci=82
541                int version = metadata.requestUpdate();

那么,meta的更新请求是怎么触发的呢?

我们可以看bash的输出日志

Step completed: "thread=main", org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(), line=544 bci=98
544                    metadata.awaitUpdate(version, remainingWaitMs);

main[1] next
> 2017-09-09 12:47:42,957 [DEBUG] Initialize connection to node -2 for sending metadata request - [org.apache.kafka.clients.NetworkClient.644] 
2017-09-09 12:47:42,963 [DEBUG] Initiating connection to node -2 at 10.30.9.107:6667. - [org.apache.kafka.clients.NetworkClient.496] 
2017-09-09 12:47:43,093 [DEBUG] Added sensor with name node--2.bytes-sent - [org.apache.kafka.common.metrics.Metrics.296] 
2017-09-09 12:47:43,095 [DEBUG] Added sensor with name node--2.bytes-received - [org.apache.kafka.common.metrics.Metrics.296] 
2017-09-09 12:47:43,100 [DEBUG] Added sensor with name node--2.latency - [org.apache.kafka.common.metrics.Metrics.296] 
2017-09-09 12:47:43,102 [DEBUG] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2 - [org.apache.kafka.common.network.Selector.327] 
2017-09-09 12:47:43,104 [DEBUG] Completed connection to node -2 - [org.apache.kafka.clients.NetworkClient.476] 
2017-09-09 12:47:43,140 [DEBUG] Sending metadata request {topics=[myonlytopic]} to node -2 - [org.apache.kafka.clients.NetworkClient.640] 
2017-09-09 12:47:43,190 [DEBUG] Updated cluster metadata version 2 to Cluster(id = LNcOtZlOQnC1Lw4It-01bA, nodes = [izuf6c93k7a8ptt40stvvxz.hadoop:6667 (id: 1003 rack: /default-rack), izuf6c93k7a7ptt41stvvyz.hadoop:6667 (id: 1002 rack: /default-rack), izuf6c93k7a7ptt403tvvwz.hadoop:6667 (id: 1001 rack: /default-rack)], partitions = [Partition(topic = myonlytopic, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,])]) - [org.apache.kafka.clients.Metadata.241] 

所以,转去研究这个meta请求的更新

我们知道,Sender是一个runnable,所以先看这个类的启动

stop in org.apache.kafka.clients.producer.internals.Sender.run

        // main loop, runs until close is called
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

可以看到,是1个无限循环

stop in org.apache.kafka.clients.NetworkClient.leastLoadedNode

stop in org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate

触发真正的连接是

 /**
     * Initiate a connection to the given node
     */
    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
            this.connectionStates.connecting(nodeConnectionId, now);
            selector.connect(nodeConnectionId,
                             new InetSocketAddress(node.host(), node.port()),
                             this.socketSendBuffer,
                             this.socketReceiveBuffer);
        } catch (IOException e) {
            /* attempt failed, we'll try again after the backoff */
            connectionStates.disconnected(nodeConnectionId, now);
            /* maybe the problem is our metadata, update it */
            metadataUpdater.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
        }
    }

然后回到poll循环里来观察事件的进行度

最终定位到函数断点为

stop in org.apache.kafka.clients.NetworkClient.handleConnections

这里面会拿到meta信息,然后更新cluster

---回到发送过程,计算partition的过程

    /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

最终调用

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

我们来看看完整过程

/**
     * Implementation of asynchronously send a record to a topic.
     */
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }

            int partition = partition(record, serializedKey, serializedValue, cluster);
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            ensureValidRecordSize(serializedSize);
            tp = new TopicPartition(record.topic(), partition);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;

关注最后一段,通过accumulator.append(..),返回1个future.

说明,本质上是1个异步的过程。

下面,我们去分析这个append过程,放下一节!

 

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 855
博文 551
码字总数 647665
作品 8
南京
架构师
kafka源码分析3 : Producer

原文出处:刘正阳 Producer Producer是生产者的接口定义 常用的方法有 public Future send(ProducerRecord record);public Future send(ProducerRecord record, Callback callback);public ......

刘正阳 ⋅ 05/21 ⋅ 0

kafka 源码分析三 Producer

原文出处:刘正阳 Producer Producer是生产者的接口定义 常用的方法有 public Future send(ProducerRecord record);public Future send(ProducerRecord record, Callback callback);public ......

刘正阳 ⋅ 05/20 ⋅ 0

Kafka分区分配计算(分区器Partitions)

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景...

u013256816 ⋅ 2017/12/03 ⋅ 0

Kafka Producer拦截器

Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器。本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对...

u013256816 ⋅ 2017/11/19 ⋅ 0

架构设计:系统间通信(29)——Kafka及场景应用(中2)

接上文:《架构设计:系统间通信(28)——Kafka及场景应用(中1)》 4-3、复制功能 我们在上文中已经讨论了Kafka使用分区的概念存储消息,一个topic可以有多个分区它们分布在整个Kafka集群的...

yinwenjie ⋅ 2016/05/06 ⋅ 0

分布式学习(6) ---- 深入kafka生产消费模型

生产者详解 上文中生产者代码如下: 生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。 send()...

KKys ⋅ 2017/02/14 ⋅ 0

kafka消息系统-初识kafka

为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据...

xinlingjun2007 ⋅ 05/05 ⋅ 0

Kafka、RabbitMQ、RocketMQ消息中间件的对比—— 消息发送性能

中间件小哥 浏览 228 2016-04-08 11:21:09 摘要引言 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦。现在开源的消息中间件有很多,前段时间我们自家的产品 RocketMQ...

tantexian ⋅ 2016/05/04 ⋅ 0

Kafka实战解惑

Kafka是LinkedIn使用Scala开发的一个分布式消息中间件,它以水平扩展能力和高吞吐率著称,被广泛用于日志处理、ETL等应用场景。Kafka具有以下主要特点: 消息的发布、订阅均具有高吞吐量:据...

量子世界的我 ⋅ 2016/11/16 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

mysql in action / alter table

change character set ALTER SCHEMA `employees` DEFAULT CHARACTER SET utf8mb4 DEFAULT COLLATE utf8mb4_general_ci ;ALTER TABLE `employees`.`t2` CHARACTER SET = utf8mb4 , COLLAT......

qwfys ⋅ 今天 ⋅ 0

Java 开发者不容错过的 12 种高效工具

Java 开发者常常都会想办法如何更快地编写 Java 代码,让编程变得更加轻松。目前,市面上涌现出越来越多的高效编程工具。所以,以下总结了一系列工具列表,其中包含了大多数开发人员已经使用...

jason_kiss ⋅ 昨天 ⋅ 0

Linux下php访问远程ms sqlserver

1、安装freetds(略,安装在/opt/local/freetds 下) 2、cd /path/to/php-5.6.36/ 进入PHP源码目录 3、cd ext/mssql进入MSSQL模块源码目录 4、/opt/php/bin/phpize生成编译配置文件 5、 . ./...

wangxuwei ⋅ 昨天 ⋅ 0

如何成为技术专家

文章来源于 -- 时间的朋友 拥有良好的心态。首先要有空杯心态,用欣赏的眼光发现并学习别人的长处,包括但不限于工具的使用,工作方法,解决问题以及规划未来的能力等。向别人学习的同时要注...

长安一梦 ⋅ 昨天 ⋅ 0

Linux vmstat命令实战详解

vmstat命令是最常见的Linux/Unix监控工具,可以展现给定时间间隔的服务器的状态值,包括服务器的CPU使用率,内存使用,虚拟内存交换情况,IO读写情况。这个命令是我查看Linux/Unix最喜爱的命令...

刘祖鹏 ⋅ 昨天 ⋅ 0

MySQL

查看表相关命令 - 查看表结构    desc 表名- 查看生成表的SQL    show create table 表名- 查看索引    show index from  表名 使用索引和不使用索引 由于索引是专门用于加...

stars永恒 ⋅ 昨天 ⋅ 0

easyui学习笔记

EasyUI常用控件禁用方法 combobox $("#id").combobox({ disabled: true }); ----- $("#id").combobox({ disabled: false}); validatebox $("#id").attr("readonly", true); ----- $("#id").r......

miaojiangmin ⋅ 昨天 ⋅ 0

金山WPS发布了Linux WPS Office

导读 近日,金山WPS发布了Linux WPS Office中文社区版新版本,支持大部分主流Linux系统,功能更加完善,兼容性、稳定性大幅度提升。本次更新WPS将首次在Linux提供专业办公文件云存储服务,实...

问题终结者 ⋅ 昨天 ⋅ 0

springboot2输出metrics到influxdb

序 本文主要研究一下如何将springboot2的metrics输出到influxdb maven <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-bo......

go4it ⋅ 昨天 ⋅ 0

微信小程序 - 选择图片显示操作菜单

之前我分享过选择图片这个文章,但是我在实际开发测试使用中发现一个问题在使用 wx.chooseImage 选择照片显示出第一格是拍照,后面是相册里的图片。这种实现之前说过了,效果如下。 但是你从...

hello_hp ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部