今天看了kafka 发送消息部分的源码(0.8.2.1版本的),针对kafka的消息发送,分区策略如下:
1 kafka的分区策略
1.1 如果指定了partition,则将消息发到对应的partition
1.2 如果没有指定partition,但指定了key, 会根据key的hash选择一个partition,
如果如果key名固定,则消息只会发到固定的一个partition上, 所以key不要设置为固定的值,如果需要设置,则需要考虑修改kafka的源码,以支持将数据均匀发到不同的partition上
1.3 如果key,partition都没有指定,则采用round-robin即轮循的方式发到每个partition
2 消息的发送都是异步的,发送过程如下
涉及到三个对象:
2.1 RecordAccumulator
维护了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches 对象
一个partition对应一个RecordBatch的ArrayDeque
调用KafkaProducer.send方法发送消息,最终调用如下方法:
如果RecordBatch已经满 或 创建了新的RecordBatch,则唤醒发送对象Sender
2.2 Sender
The background thread that handles the sending of produce requests to the Kafka cluster
Sender通过kafkaclient将RecordAccumulator 的数据批量写入到server
Sender定义的run方法实现如下:
在run(long now)中,实现逻辑如下:
2.2.1 首先通过如下条件获取发送数据的节点
2.2.2删除掉当前不能发送的kafka node
2.2.3 获取发送的数据列表
循环此节点上是leader的partition
根据partition,获取此partition对应的RecordBatch,并放到此节点对应的 List<RecordBatch>
2.2.4组装请求对象,发送到不同的kafka节点
计算pollTimeout并发送请求对象到不同的kafka节点
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);
2.2.5 针对返回的数据进行处理
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);
for (ClientResponse response : responses) {
if (response.wasDisconnected())
handleDisconnect(response, now);
else
handleResponse(response, now);
}
2.3 KafkaClient
其实现类是:NetworkClient,基于socket方式与server进行数据交互
3 kafka参数配置
用于存储批量数据的缓冲大小(对应类:MemoryRecords) batch-size : 16384
用于整个client缓存所有发送对象的大小(对应类:BufferPool ) :BUFFER_MEMORY 32 * 1024 * 1024L 即 32M
用于发送延迟的时间配置(LINGER_MS),如果设置为1秒,则记录先发送到client缓存中,等待1秒后再发送数据,默认为0 表示立即发送
指定数据压缩类型: compression.type ,支持:none,gzip, snappy, lz4, 默认为none
理论上,设置LINGER_MS 会提高消息的吞吐量