文档章节

kafka producer 中partition 使用方式

川明君
 川明君
发布于 2013/08/15 12:18
字数 453
阅读 30987
收藏 7

     为了更好的实现负载均衡和消息的顺序性,kafka的producer在分发消息时可以通过分发策略发送给指定的partition。实现分发的程序是需要制定消息的key值,而kafka通过key进行策略分发。

     为了更好的弄清楚相关原理,我们从kafka本身提供的分发函数分析:
     源代码如下:    

private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
  private val random = new java.util.Random
 
  def partition(key: T, numPartitions: Int): Int = {
    if(key == null)
    {
        println("key is null")
        random.nextInt(numPartitions)
    }
    else
    {
        println("key is "+ key + " hashcode is "+key.hashCode)
        math.abs(key.hashCode) % numPartitions
    }
  }
}

       上述类对key进行了模版封装,因此key 可以提供Int,String等类型。
       其中numPartitions是来自ZKBrokerPartitionInfo生成的数据,具体代码是:

val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
      (Tips:从上面可以看到,我们可以更多的扩展分区信息,多多利用zookeeper提供的信息,比如sortedBrokerPartitions等等)


        很多时候我们可能要自己实现一个分区函数,具体的使用方式就是:
       

private Properties props = new Properties();

...

 props.put("partitioner.class","***/***/TestPartition");//一定要写对路径和partitioner.class

      具体的实现代码就是改装自DefaultPartitioner的java实现方式,一并贴上:

public class TestPartition implements Partitioner<String>{

	public int partition(String key,int numPartitions)
	{
		//System.out.println("Fuck!!!!");
		System.out.print("partitions number is "+numPartitions+"   ");
		if (key == null) {
			Random random = new Random();
			System.out.println("key is null ");
			return random.nextInt(numPartitions);
		}
		else {
			int result = Math.abs(key.hashCode())%numPartitions; //很奇怪,
                     //hashCode 会生成负数,奇葩,所以加绝对值
			System.out.println("key is "+ key+ " partitions is "+ result);
			return result;
		}
	}
} 
    而发送消息使用方式:

List<String> messages = new java.util.ArrayList<String>();
      String messageString = "test-message"+Integer.toString(messageNo);
      messages.add(messageString);
      //producer.send(new ProducerData<String, String>(topic,"test_key", messageStr));
      ProducerData<String, String> data = new ProducerData<String, String>(topic, messageString, messages);
      producer.send(data);
    kafka官方文档中直接使用
ProducerData<String, String> data = new ProducerData<String, String>(topic, “xxx”, "XXX");
      producer.send(data);
    但是我没有实现,第三个参数用String会报错。

   




    

 

         

 


 


© 著作权归作者所有

共有 人打赏支持
川明君
粉丝 7
博文 6
码字总数 2606
作品 0
朝阳
程序员
私信 提问
加载中

评论(2)

龙门客栈三星主厨-周淮安
龙门客栈三星主厨-周淮安

引用来自“sxlfguoliang”的评论

发送消息的时候使用这个可以么:
    KeyedMessage<String, String> message = new KeyedMessage<String, String>(
        topic, key,content);
    
    producer.send(message);
可以的,就是这样的
sxlfguoliang
sxlfguoliang
发送消息的时候使用这个可以么:
    KeyedMessage<String, String> message = new KeyedMessage<String, String>(
        topic, key,content);
    
    producer.send(message);
apache kafka消息服务

apache kafka中国社区QQ群:162272557 apache kafka参考 http://kafka.apache.org/documentation.html 消息队列分类: 点对点: 消息生产者生产消息发送到queue中,然后消息消费者从queue中取...

老先生二号
2017/08/06
0
0
【Kafka】消息订阅框架Kafka

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/81409409 1、Kafka的产生背景 假设现在有两个模块,分别是A和B,其中A模块产生...

魏晓蕾
01/10
0
0
如何为Kafka集群选择合适的Topics/Partitions数量

这是许多kafka使用者经常会问到的一个问题。本文的目的是介绍与本问题相关的一些重要决策因素,并提供一些简单的计算公式。 越多的分区可以提供更高的吞吐量 首先我们需要明白以下事实:在k...

濡沫
2018/08/13
0
0
[转]使用PHP处理Kafka消息

Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用...

daos
2018/01/03
0
0
Kafka分区分配计算(分区器Partitions)

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景...

u013256816
2017/12/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

使用keepalived实现nginx的高可用

概述 是这样子的,我想让家中所有的应用服务都从nginx中出去,让nginx处于访问的最边缘地带,为了让nginx可靠性加强,所以nginx就得实现高可用,分别是下面两台机器要做nginx的集群 10.10.10...

bboysoulcn
今天
1
0
Mysql索引机制B+Tree

1、问题引入 有一个用户表,为了查询的效率,需要基于id去构建索引。构建索引我们需要考虑两个方面的问题,1个是查询的效率,1个是索引数据的存储问题。该表的记录需要支持百万、千万、甚至上...

万山红遍
今天
40
0
RDD

1.概念: RDD是spark整个体系中最基础核心的概念,RDD(Resilient Distributed DataSet)即弹性分布式数据集 弹性: RDD支持横向多分区,纵向操作内存不足写入磁盘,hdfs等,实现数据在内存和...

仟昭
今天
1
0
springboot整合mycat

动态数据源项目整合 Maven依赖信息 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> <relat......

须臾之余
今天
2
0
深入解析Vue 和微信小程序的区别、比较

写了vue项目和小程序,发现二者有许多相同之处,在此想总结一下二者的共同点和区别。 一、生命周期 先贴两张图: vue生命周期 小程序生命周期 相比之下,小程序的钩子函数要简单得多。 vue的...

前端攻城小牛
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部