文档章节

Kafka进阶

世界新新人
 世界新新人
发布于 2017/03/15 14:50
字数 377
阅读 13
收藏 0

版本信息:kafka0.8.0

(因为项目建立的比较早,kafka只是作为项目中一个简单的数据传输工具,所以一直没有更新版本。)

  1. 一个程序实现多线程并发消费

    项目中每个业务需求都有一个独立的程序处理kafka中的数据,因为项目比较简单,每个程序都配置了不同的消费者组ID和消费者ID。

    因为kafka是分布式的,而当前的程序只有一个在处理数据,能不能并发的处理数据呢?

    对同一个业务(对应一个程序),启动多个相同的程序,根据kafka的原理,只需使他们的消费者组ID相同,消费者ID不同即可。

    对于一个简单的应用场景,同一个业务逻辑却要同时启动多个程序来处理,还要手动设置不同的消费者ID,太繁琐。有没有更好的办法呢?

    根据partition的个数,在创建KafkaStream时,配置相应的参数,实现多线程并发处理数据。

ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(Constants.TOPIC, new Integer(4));//通道个数不应超过对应topic的partition个数。
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> kafkaStreams = consumerMap.get(Constants.TOPIC);
for(int i=0;i<kafkaStreams.size();i++){
    //对多个通道启动多个线程并行处理。
}

 

 


     

© 著作权归作者所有

世界新新人
粉丝 1
博文 6
码字总数 1743
作品 0
济南
程序员
私信 提问
详细剖析kafka分布式消息系统

1.背景 最近因为工作需要,调研了追求高吞吐的轻量级消息系统Kafka,打算替换掉线上运行的ActiveMQ,主要是因为明年的预算日流量有十亿,而ActiveMQ的分布式实现的很奇怪,所以希望找一个适合...

禁区铁铍人
2017/11/18
0
0
Structured Streaming教程(3) —— 与Kafka的集成

Structured Streaming最主要的生产环境应用场景就是配合kafka做实时处理,不过在Strucured Streaming中kafka的版本要求相对搞一些,只支持0.10及以上的版本。就在前一个月,我们才从0.9升级到...

董黎明
2018/12/09
121
0
在Kubernetes上运行Kafka合适吗?

介绍 Kubernetes设计的初衷是运行无状态工作负载。这些通常采用微服务架构的工作负载,是轻量级,可水平扩展,遵循十二要素应用程序,可以处理环形断路和随机Monkey测试。 另一方面,Kafka本...

Docker
06/04
0
0
Spark Streaming实时流处理学习

目录 1.初识实时流处理 2.分布式日志收集框架Flume 3.分布式发布订阅消息系统Kafka 4.实战环境搭建 5.Spark Streaming入门 6.Spark Streaming核心概念与编程 7.Spark Streaming进阶与案例实战...

牦牛sheriff
2018/09/02
0
0
高级Java架构学习资源来了——(1-5年开发 小白误入)

大型互联网公司分布式架构原理概述 http://www.365yg.com/item/6471499647222284814/ Mysql索引底层数据结构剖析 http://www.365yg.com/item/6470767465080029710/ 老司机带你用正确的姿势看...

阿阳啊啊
2017/10/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

面向对象编程

1、类和对象 类是对象的蓝图和模板,而对象是实例;即对象是具体的实例,类是一个抽象的模板 当我们把一大堆拥有共同特征的对象的静态特征(属性)和动态特征(行为)都抽取出来后,就可以定...

huijue
今天
8
0
redis异常解决 :idea启动本地redis出现 jedis.exceptions.JedisDataException: NOAUTH Authentication required

第一次安装在本地redis服务,试试跑项目,结果却出现nested exception is redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required错误,真是让人头疼 先检查一...

青慕
今天
10
0
Spring 之 IoC 源码分析 (基于注解方式)

一、 IoC 理论 IoC 全称为 Inversion of Control,翻译为 “控制反转”,它还有一个别名为 DI(Dependency Injection),即依赖注入。 二、IoC方式 Spring为IoC提供了2种方式,一种是基于xml...

星爵22
今天
25
0
Docker安装PostgresSql

Docker安装PostgresSql 拉取docker镜像 # docker pull postgres:10.1010.10: Pulling from library/postgres9fc222b64b0a: Pull complete 38296355136d: Pull complete 2809e135bbdb: Pu......

Tree
今天
8
0
内容垂直居中

方法一: 采用上下 padding 形式,将内容放置在垂直居中 .line { padding: 2% 0; text-align: center; height: 5px;} <div class="line"> 内容垂直居中</div> 方法二: 采......

低至一折起
今天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部