文档章节

kafka分区策略

 张欢19933
发布于 2017/03/14 10:16
字数 990
阅读 87
收藏 0
public class kafkaProducer  extends Thread{

    private String topic;  

    public kafkaProducer(String topic){  
        super();  
        this.topic = topic;  
    }  


    @Override  
    public void run() {  
        Producer producer = createProducer();  
        int i=0;  
        while(true){
            i++;
            String  string = "hello"+i;
            producer.send(new KeyedMessage<Integer, String>(topic,string));  
            if(i==100){
                break;
            }
            try {  
                TimeUnit.SECONDS.sleep(1);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    private Producer createProducer() {  
        Properties properties = new Properties();  
        properties.put("serializer.class", StringEncoder.class.getName());  
        properties.put("metadata.broker.list", "ip1:9092,ip2:9092,ip3:9092");
        return new Producer<Integer, String>(new ProducerConfig(properties));  
     }  


    public static void main(String[] args) {  
        new kafkaProducer("user11").start();  
    } 
} 

发现其只向topic:user11中的某一个partiton中写数据。后来查了一下相关的参数:partitioner.class

partitioner.class

# 分区的策略
# 默认为kafka.producer.DefaultPartitioner,取模
partitioner.class = kafka.producer.DefaultPartitioner

在上面的程序中,我在producer中没有定义分区策略,也就是说程序采用默认的kafka.producer.DefaultPartitioner,来看看源码中是怎么定义的:

class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
  private val random = new java.util.Random

  def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }
}

其核心思想就是对每个消息的key的hash值对partition数取模得到。再来看看我的程序中有这么一段:

producer.send(new KeyedMessage<Integer, String>(topic,string))

来看看keyMessage:

case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.")

  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

  def this(topic: String, key: K, message: V) = this(topic, key, key, message)

  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null  
  }

  def hasKey = key != null
}

由于上面生产者代码中没有传入key,所以程序调用:

def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区。这里的随机是指在参数"topic.metadata.refresh.ms"刷新后随机选择一个, 这个时间段内总是使用唯一的分区。 默认情况下每十分钟才可能重新选择一个新的分区。

private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
    val numPartitions = topicPartitionList.size
    if(numPartitions <= 0)
      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
    val partition =
      if(key == null) {
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking availability of the leader,
            // since we want to postpone the failure until the send operation anyways
            partitionId
          case None => 
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId
        }
      } else
        partitioner.partition(key, numPartitions)
    if(partition < 0 || partition >= numPartitions)
      throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
        "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
    trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
    partition
  }

如果key为null, 它会从sendPartitionPerTopicCache查选缓存的分区, 如果没有,随机选择一个分区,否则就用缓存的分区。

LinkedIn工程师Guozhang Wang在邮件列表中解释了这一问题,
最初kafka是按照大部分用户理解的那样每次都随机选择一个分区, 后来改成了定期选择一个分区, 这是为了减少服务器段socket的数量。不过这的确很误导用户,据称0.8.2版本后又改回了每次随机选取。但是我查看0.8.2的代码还没看到改动。

所以,如果有可能,还是为KeyedMessage设置一个key值吧。比如:

producer.send(new KeyedMessage<String, String>(topic,String.valueOf(i),string)); 

自定义partitioner.class

如下所示为自定义的分区函数,分区函数实现了Partitioner接口

public class PersonalPartition implements Partitioner{

    public PersonalPartition(VerifiableProperties properties){

    }

    public int partition(Object arg0, int arg1) {
        if(arg0==null){
            return 0;
        }
        else{
            return 1;
        }   
    }

}

然后修改配置即可:

properties.put("partitioner.class", "com.xx.kafka.PersonalPartition"); 

向指定的partition写入数据

当然,也可以向topic中指定的partition中写数据,如下代码为向”user11”中partition 1中写入数据:

public class kafkaProducer  extends Thread{

    private String topic;  

    public kafkaProducer(String topic){  
        super();  
        this.topic = topic;  
    }  

    @Override  
    public void run() {  
        KafkaProducer producer = createProducer();  
        int i=0;  
        while(true){
            i++;
            String  string = "hello"+i;
            producer.send(new ProducerRecord(topic,1,null,string.getBytes()));
            if(i==10000){
                break;
            }
            try {  
                TimeUnit.SECONDS.sleep(1);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    private KafkaProducer createProducer() {  
        Properties properties = new Properties();  
        properties.put("serializer.class", StringEncoder.class.getName());  
        properties.put("metadata.broker.list", "ip1:9092,ip2:9092,ip3:9092");
        return new KafkaProducer(properties);  
     } 
 } 

本文转载自:http://blog.csdn.net/ouyang111222/article/details/51086037

粉丝 47
博文 533
码字总数 244932
作品 0
海淀
私信 提问
干趴面试官系列 | 请你简述一下Kafka中的分区分配

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/89529752 “请你简述一下Kafka中的分区分配”,当面试官问你这个问题的时候...

朱小厮
04/26
0
0
Spark实战 | Kafka与Spark Streaming的联姻

Kafka与Spark虽然没有很直接的必然关系,但是实际应用中却经常以couple的形式存在。如果你的Kafka的爱好者,流式计算框架Spark、Flink等也不得不去了解;如果你是Spark的爱好者,Kafka又或许...

HBase技术社区
02/28
0
0
Kafka科普系列 | 原来Kafka中的选举有这么多?

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/89369160 面试官在考查你Kafka知识的时候很可能会故弄玄虚的问你一下:Kaf...

朱小厮
04/18
0
0
新书《深入理解Kafka:核心设计与实践原理》上架,感谢支持~

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/87898176 新书上架 初识 Kafka 时,笔者接触的还是 0.8.1 版本,Kafka 发展...

朱小厮
02/23
0
0
Kafka集群磁盘使用率瞬超85%,幕后元凶竟是它?

Kafka是一种快速、可扩展的,设计内在就是分布式的、分区的和可复制的提交日志服务。作为一种高吞吐量的分布式发布订阅消息系统,Kafka被广泛的应用于海量日志的收集、存储。网上有大量Kafka...

大数据在浙里
2017/04/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
32分钟前
29
0
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
9
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部