文档章节

KafkaProducer源码解析

DJZhu
 DJZhu
发布于 2017/03/03 12:38
字数 1275
阅读 22
收藏 0

先来看看一段简单的KafkaProducer应用的代码:

import org.apache.kafka.clients.producer.*;
import org.junit.Test;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

public class SimpleProducer {
    @Test
    public void testProduce(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "node87:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "3000");
        KafkaProducer<String , String> producer = new KafkaProducer<String, String>(props);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        for(int i = 0; i < 5; i++) {
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("test1234", i + "", 
                simpleDateFormat.format(new Date()) + "---" + i);
            producer.send(record ,
                    new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if(e != null)
                                e.printStackTrace();
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                        }
                    });
        }
        producer.close();
    }
}

代码很简单。我们先来看看KafkaProducer的继承关系,

kafkaProducer继承关系

Closeable接口表征了KafkaProducer的使用是会占用系统资源的,在停止使用的时候不要忘记调用close()方法回收资源。Producer接口则定义了一个生产者的主要方法,包括

方法

其中send()的两个方法是异步发送消息;partitionsFor()则用来获取指定topic的partition信息,它为每一个partition创建一个PartionInfo对象一起放在List中返回。metrics是监控相关的方法,本文不做涉及。 KafkaProducer包含了以下全局属性,可以通过它们看出KafkaProducer的一些基础特性

public class KafkaProducer<K,V> implements Producer<K,V> {
    private final Partitioner partitioner;  //分区指定策略
    private final int maxRequestSize; //max.request.size属性指定,定义了一个请求包(Record)的最大字节数
    private final long metadataFetchTimeoutMs;  //metadata.fetch.timeout.ms 首次往一个partition发送消息的时候需要获取其metadata,这里指定这个获取过程的最长时间
    private final long totalMemorySize; //buffer.memory 缓存包大小
    private final Metadata metadata;  //保存topic的一些关键信息
    private final RecordAccumulator accumulator;  //Record累加器队列,每次发送Record时都需要调用此对象的append方法来将Record压入一个MemoryRecords实例,当发送太频繁以至超过了totalMemorySize时append方法会被阻塞(可通过block.on.buffer.full来选择是否阻塞)
    private final Sender sender;  //Sender类实现了Runnable接口,是负责在子线程中处理发送请求的类
    private final Metrics metrics;  //监控
    private final Thread ioThread;  //io线程
    private final CompressionType compressionType;  //压缩类型,支持GZIP,SNAPPY,LZ4压缩方式,或者不压缩
    private final Sensor errors;  //跟监控有关,记录错误
    private final Time time;  //计时器
    private final Serializer<K> keySerializer; //key序列化器
    private final Serializer<V> valueSerializer;  //value序列化器
    private final ProducerConfig producerConfig;  //配置
    private static final AtomicInteger producerAutoId = new AtomicInteger(1); //priducerId

    //...
}

一共五个构造器支持通过Map和 Properties的方式配置Producer的属性,并且可以指定使用自己实现的序列化方法(这一块我暂时还没有想到有什么特别好的应用场景,通常的做法都是直接发送字符串或者ProtoBuf协议封装的字符串,还没有体会到自定义序列化函数的好处在哪里,有经验的朋友可以留言交流下)

在实例代码中调用的 KafkaProducer<String , String> producer = new KafkaProducer<String, String>(props); 构造器底层做的事情基本上就是实例化上面的全局参数,我们不赘述细节,这里只提两点。

一个是

List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

NetworkClient client = new NetworkClient(new Selector(this.metrics, time , "producer", metricTags),
                                         this.metadata,
                                         clientId,
                                         config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                                         config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                                         config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                                         config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG));
this.sender = new Sender(client,
                         this.metadata,
                         this.accumulator,
                         config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                         (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                         config.getInt(ProducerConfig.RETRIES_CONFIG),
                         config.getInt(ProducerConfig.TIMEOUT_CONFIG),
                         this.metrics,
                         new SystemTime(),
                         clientId);

代码通过解析bootstrap.servers.config获取IP/端口后,实例化了一个NetworkClient对象,这个对象的底层是通过维持一个Socket来进行TCP通信。并把这个NetworkClient对象交给sender管理具体的发送任务,想要了解Kafka底层通信机制的朋友可以继续深挖,这里不做展开。

另一个是:

this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                         this.totalMemorySize,
                                         config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                                         retryBackoffMs,
                                         config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                                         metrics,
                                         time,
                                         metricTags);

这里实例化了一个RecordAccumulator对象,它处理的是Kafka的发送队列。下面具体看看发送任务是怎么执行的。

@Override
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
    try {
        // 发送前首先确定已经获取了metadata,
        waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);

    /*尝试序列化key,value*/
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer");
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer");
        }
        ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);

        //获取record获取将要发往的partion, 如果在record中没有指定,则根据key随意生成一个,如果key也没有则使用partitioner内部的一个计数器依次发给每一个partirion
        int partition = partitioner.partition(serializedRecord, metadata.fetch());
        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
        ensureValidRecordSize(serializedSize);
        TopicPartition tp = new TopicPartition(record.topic(), partition);
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

        //将数据发往accumulator缓存起来等待发送
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            //唤醒sender,调用Client进行实际的发送工作
            this.sender.wakeup();
        }
        return result.future;
        // Handling exceptions and record the errors;
        // For API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        throw new KafkaException(e);
    } catch (KafkaException e) {
        this.errors.record();
        throw e;
    }
}

以上。

© 著作权归作者所有

DJZhu
粉丝 3
博文 26
码字总数 30529
作品 0
广州
程序员
私信 提问
Kafka分区分配计算(分区器Partitions)

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景...

u013256816
2017/12/03
0
0
kafka 源码分析三 Producer

原文出处:刘正阳 Producer Producer是生产者的接口定义 常用的方法有 public Future send(ProducerRecord record);public Future send(ProducerRecord record, Callback callback);public ......

刘正阳
2018/05/20
0
0
kafka源码分析3 : Producer

原文出处:刘正阳 Producer Producer是生产者的接口定义 常用的方法有 public Future send(ProducerRecord record);public Future send(ProducerRecord record, Callback callback);public ......

刘正阳
2018/05/21
0
0
Kafka源码分析Producer读取Metadata的数据结构及Metadata两种更新机制介绍

问题导读: 1. 什么是多线程异步发送模型? 2. Metadata的线程安全性如何实现? 3. Metadata的数据结构是什么? 4. producer如何读取Metadata? 5. Sender的如何创建? 6. Sender poll()如何更...

问津已非少年
2016/09/29
146
0
5、Hive的自定义UDF函数

1、pom.xml引入依赖及打包 org.apache.hivehive-exec1.1.0org.apache.hadoophadoop-common2.6.0 org.apache.maven.pluginsmaven-compiler-pluginUTF-81.81.8true 2、对单个字段,或者多个字段......

刘付kin
2016/12/01
29
0

没有更多内容

加载失败,请刷新页面

加载更多

Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

作者|白松 目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次...

数澜科技
今天
4
0
Xss过滤器(Java)

问题 最近旧的系统,遇到Xss安全问题。这个系统采用用的是spring mvc的maven工程。 解决 maven依赖配置 <properties><easapi.version>2.2.0.0</easapi.version></properties><dependenci......

亚林瓜子
今天
10
0
Navicat 快捷键

操作 结果 ctrl+q 打开查询窗口 ctrl+/ 注释sql语句 ctrl+shift +/ 解除注释 ctrl+r 运行查询窗口的sql语句 ctrl+shift+r 只运行选中的sql语句 F6 打开一个mysql命令行窗口 ctrl+l 删除一行 ...

低至一折起
今天
9
0
Set 和 Map

Set 1:基本概念 类数组对象, 内部元素唯一 let set = new Set([1, 2, 3, 2, 1]); console.log(set); // Set(3){ 1, 2, 3 } [...set]; // [1, 2, 3] 接收数组或迭代器对象 ...

凌兮洛
今天
4
0
PyTorch入门笔记一

张量 引入pytorch,生成一个随机的5x3张量 >>> from __future__ import print_function>>> import torch>>> x = torch.rand(5, 3)>>> print(x)tensor([[0.5555, 0.7301, 0.5655],......

仪山湖
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部