Kafka进阶

原创
2017/03/15 14:50
阅读数 15

版本信息:kafka0.8.0

(因为项目建立的比较早,kafka只是作为项目中一个简单的数据传输工具,所以一直没有更新版本。)

  1. 一个程序实现多线程并发消费

    项目中每个业务需求都有一个独立的程序处理kafka中的数据,因为项目比较简单,每个程序都配置了不同的消费者组ID和消费者ID。

    因为kafka是分布式的,而当前的程序只有一个在处理数据,能不能并发的处理数据呢?

    对同一个业务(对应一个程序),启动多个相同的程序,根据kafka的原理,只需使他们的消费者组ID相同,消费者ID不同即可。

    对于一个简单的应用场景,同一个业务逻辑却要同时启动多个程序来处理,还要手动设置不同的消费者ID,太繁琐。有没有更好的办法呢?

    根据partition的个数,在创建KafkaStream时,配置相应的参数,实现多线程并发处理数据。

ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(Constants.TOPIC, new Integer(4));//通道个数不应超过对应topic的partition个数。
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> kafkaStreams = consumerMap.get(Constants.TOPIC);
for(int i=0;i<kafkaStreams.size();i++){
    //对多个通道启动多个线程并行处理。
}

 

 


     
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部