文档章节

key为null时Kafka会将消息发送给哪个分区?

m
 miscellanea
发布于 2015/08/24 19:46
字数 817
阅读 2046
收藏 2

当你编写kafka Producer时, 会生成KeyedMessage对象。

KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:

The third property  "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

但是这句话相当的误导人。

从字面上来讲,这句话没有问题, 但是这里的随机是指在参数"topic.metadata.refresh.ms"刷新后随机选择一个, 这个时间段内总是使用唯一的分区。 默认情况下每十分钟才可能重新选择一个新的分区。 但是相信大部分的程序员和我一样, 都理解成每个消息都会随机选择一个分区。
可以查看相关的代码:

private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
    val numPartitions = topicPartitionList.size    if(numPartitions <= 0)
      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
    val partition =
      if(key == null) {
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking availability of the leader,
            // since we want to postpone the failure until the send operation anyways
            partitionId          case None => 
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId        }
      } else
        partitioner.partition(key, numPartitions)
    if(partition < 0 || partition >= numPartitions)
      throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
        "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
    trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
    partition  }

如果key为null, 它会从sendPartitionPerTopicCache查选缓存的分区, 如果没有,随机选择一个分区,否则就用缓存的分区。

LinkedIn工程师Guozhang Wang在邮件列表中解释了这一问题,
最初kafka是按照大部分用户理解的那样每次都随机选择一个分区, 后来改成了定期选择一个分区, 这是为了减少服务器段socket的数量。不过这的确很误导用户,据称0.8.2版本后又改回了每次随机选取。但是我查看0.8.2的代码还没看到改动。

所以,如果有可能,还是为KeyedMessage设置一个key值吧。

当你编写kafka Producer时, 会生成KeyedMessage对象。

KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:

The third property  "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

但是这句话相当的误导人。


本文转载自:http://ju.outofmemory.cn/entry/112202

共有 人打赏支持
m
粉丝 6
博文 86
码字总数 22525
作品 0
海淀
私信 提问
kafka java编程

自定义系列化方式Encoder kafka自带的序列化方式 DefaultEncoder默认的这个Encoder事实上不做任何处理,接收到什么byte[]就返回什么byte[]: class DefaultEncoder(props: VerifiablePropert...

张欢19933
2016/01/25
442
0
kafka 文档 (三)producer(生产者)和高级消费者

API Producer API 此处只简介一个procedure的例子 生产类是用来创建新消息的主题和可选的分区。 如果使用Java你需要包括几个包和支持类: import kafka.javaapi.producer.Producer; import k...

q5725827
2014/05/29
0
0
Kafka分区机制介绍与示例

Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹, 以”topicName_partitionIndex”的命名方式命名,该文件夹下存储这个分区的所有消息(.log...

HZCoder
2016/03/01
11
0
Kafka实战解惑

Kafka是LinkedIn使用Scala开发的一个分布式消息中间件,它以水平扩展能力和高吞吐率著称,被广泛用于日志处理、ETL等应用场景。Kafka具有以下主要特点: 消息的发布、订阅均具有高吞吐量:据...

量子世界的我
2016/11/16
121
0
Kafka分区分配计算(分区器Partitions)

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景...

u013256816
2017/12/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

矿难让显卡压了那么多货咋办?NV如是说

在苏州 GTC 开幕的几天前,英伟达刚刚遭遇了一次股价的腰斩。 近来加密货币的热度渐低,受到挖矿热潮照顾许多的英伟达「矿机」销量受到打击,甚至出现了严重的库存危机,加上近来刚刚发的 RT...

linux-tao
12分钟前
1
0
Python学习日记5|BeautifulSoup中find和find_all的用法

Python学习日记5|BeautifulSoup中find和find_all的用法 是蓝先生 关注 2016.04.20 11:26* 字数 930 阅读 37205评论 11喜欢 10 今天是4.20号。 前天晚上看到蒋方舟的一句话: 不要左顾右盼。慢...

linjin200
12分钟前
1
0
浅析Vue.js 中的条件渲染指令

1 应用于单个元素 Vue.js 中的条件渲染指令可以根据表达式的值,来决定在 DOM 中是渲染还是销毁元素或组件。 html: <div id="app"><p v-if="type===1">拌面</p><p v-else-if="type==...

开元中国2015
13分钟前
1
0
聊天机器人最难理解的 10 个词汇

简评:现在,越来越多的「聊天机器人」凭借着人工智能能与人类对话,甚至编写新闻。人们该如何判断对方是一个血肉之躯,还是一个可笑的算法?又该如何判断一个小说故事是由一台机器编写的,而...

极光推送
19分钟前
1
0
开源 java CMS - FreeCMS2.8会员我的评论

项目地址:http://www.freeteam.cn/ 我的评论 从左侧管理菜单点击我的评论进入。在这里可以查看当前登录会员的所有评论记录。 删除评论 选择评论然后点击删除按钮可以完成删除操作。 为了防止...

freeteam
42分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部