文档章节

kafka producer 中partition 使用方式

川明君
 川明君
发布于 2013/08/15 12:18
字数 453
阅读 3.5W
收藏 7

「深度学习福利」大神带你进阶工程师,立即查看>>>

     为了更好的实现负载均衡和消息的顺序性,kafka的producer在分发消息时可以通过分发策略发送给指定的partition。实现分发的程序是需要制定消息的key值,而kafka通过key进行策略分发。

     为了更好的弄清楚相关原理,我们从kafka本身提供的分发函数分析:
     源代码如下:    

private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
  private val random = new java.util.Random
 
  def partition(key: T, numPartitions: Int): Int = {
    if(key == null)
    {
        println("key is null")
        random.nextInt(numPartitions)
    }
    else
    {
        println("key is "+ key + " hashcode is "+key.hashCode)
        math.abs(key.hashCode) % numPartitions
    }
  }
}

       上述类对key进行了模版封装,因此key 可以提供Int,String等类型。
       其中numPartitions是来自ZKBrokerPartitionInfo生成的数据,具体代码是:

val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
      (Tips:从上面可以看到,我们可以更多的扩展分区信息,多多利用zookeeper提供的信息,比如sortedBrokerPartitions等等)


        很多时候我们可能要自己实现一个分区函数,具体的使用方式就是:
       

private Properties props = new Properties();

...

 props.put("partitioner.class","***/***/TestPartition");//一定要写对路径和partitioner.class

      具体的实现代码就是改装自DefaultPartitioner的java实现方式,一并贴上:

public class TestPartition implements Partitioner<String>{

	public int partition(String key,int numPartitions)
	{
		//System.out.println("Fuck!!!!");
		System.out.print("partitions number is "+numPartitions+"   ");
		if (key == null) {
			Random random = new Random();
			System.out.println("key is null ");
			return random.nextInt(numPartitions);
		}
		else {
			int result = Math.abs(key.hashCode())%numPartitions; //很奇怪,
                     //hashCode 会生成负数,奇葩,所以加绝对值
			System.out.println("key is "+ key+ " partitions is "+ result);
			return result;
		}
	}
} 
    而发送消息使用方式:

List<String> messages = new java.util.ArrayList<String>();
      String messageString = "test-message"+Integer.toString(messageNo);
      messages.add(messageString);
      //producer.send(new ProducerData<String, String>(topic,"test_key", messageStr));
      ProducerData<String, String> data = new ProducerData<String, String>(topic, messageString, messages);
      producer.send(data);
    kafka官方文档中直接使用
ProducerData<String, String> data = new ProducerData<String, String>(topic, “xxx”, "XXX");
      producer.send(data);
    但是我没有实现,第三个参数用String会报错。

   




    

 

         

 


 


川明君
粉丝 7
博文 6
码字总数 2630
作品 0
朝阳
程序员
私信 提问
加载中
此博客有 2 条评论,请先登录后再查看。
浅入浅出Android(003):使用TextView类构造文本控件

基础: TextView是无法供编辑的。 当我们新建一个项目MyTextView时候,默认的布局(/res/layout/activity_main.xml)中已经有了一个TextView: <TextView 运行效果如下: 修改其文本内容...

樂天
2014/03/22
708
1
CDH5: 使用parcels配置lzo

一、Parcel 部署步骤 1 下载: 首先需要下载 Parcel。下载完成后,Parcel 将驻留在 Cloudera Manager 主机的本地目录中。 2 分配: Parcel 下载后,将分配到群集中的所有主机上并解压缩。 3 激...

cloud-coder
2014/07/01
6.9K
1
浏览器中的scheme解释器--SchemeScript

一个用javascript实现的scheme解释器,可以运行在浏览器中或node.js中。 刚刚看到编译原理与实践第二章,一时兴起,想写个以前就想写的scheme的解释器。昨天晚上开始写,到刚才为止,接近一天...

zoowii
2012/11/01
1.2K
0
高效率的nio框架--nio java raptor

设计初衷是提供方便易用,且高效率的nio框架,一部分实现上参考了mina。还包括线程池,编解码,内存池等机制,以便于开发高性能tcp程序。 文档后续会慢慢的补上。 整体实现上尽量少的使用锁,...

齐楠
2012/12/12
3.3K
0
HTML动态嵌入Flash--embedSWF

embedSWF 是一个轻量的HTML动态嵌入Flash的方案。gzip之前,仅3kb大小!比swfobject要小很多。 简洁的API,所有object和embed标记支持的参数都通过一个options传递。甚至支持在options中传递...

JonyZhang
2013/05/01
5K
2

没有更多内容

加载失败,请刷新页面

加载更多

Model S被18轮重卡撞烂 乘客在车辆保护下幸存

日前,国外一位名为quarm813的网友在社交媒体分享了“Model S救他和女儿性命”的经历。 据该用户描述,当地时间7月31日,他驾驶Model S在高速公路快车道上行驶时,一辆18轮重卡突然实线并线闯...

osc_fipgtxy8
9分钟前
0
0
Redis-cluster5.x集群搭建

1.下载redis5.0.2 wget http://download.redis.io/releases/redis-5.0.2.tar.gz #官网下载 tar xzf redis-5.0.2.tar.gz #解压cd redis-5.0.2 yum install gcc #需要gcc来编......

osc_zzg7fpke
11分钟前
0
0
CGB2004-京淘项目Day12

1.还原系统配置 1.1 释放Linux资源 1.1.1 停止数据库主从服务 1.1.2 关闭数据库服务 说明:关闭数据库服务器. 1.1.3 关闭tomcat/mycat服务器 1.1.4关闭nginx服务器 1.2 修改代码中的配置 1.2....

osc_3361hjxk
12分钟前
0
0
【北京迅为】初识i.MX6ULL终结者开发板

目录 一、 开发板初体验 1. 初识i.MX6ULL终结者开发板 一、 开发板初体验 i.MX6ULL终结者开发板是北京迅为电子推出的一款Cortex-A7架构的开发板。采用核心板+底板的方式,如下图所示: 经典蓝...

osc_0esgtdby
12分钟前
0
0
如何利用基于PXI的下一代ATE系统测试平台进行军事/航天/卫星电子设备测试

前言 自动测试设备(ATE)系统用于在生产产品或产品使用过程中测试电子组件,子组件或完整系统的功能和性能,以确保他们可操作性。对设备、电路板、子组件或系统的测试要求从简单到复杂,设计...

osc_mxz6aybo
13分钟前
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部