文档章节

STORM-KAFKA集成

吹比龙
 吹比龙
发布于 2017/05/05 10:23
字数 243
阅读 312
收藏 0

kafka版本是 0.10.0.1 offset存在kafka中的 区别于之前的存在zk中

1、pom.xml配置

<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>1.1.0</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka</artifactId>
			<version>1.1.0</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.11</artifactId>
			<version>0.10.0.1</version>
		</dependency>

2、示例代码

            // zookeeper的服务器地址 多个逗号分隔
            String zks = "192.168.154.128:2181";
            // 消息的topic
            String topic = "lidalong";
            // kafka-strom在zookeeper上的根 用于记录其消费的offset位置
            String zkRoot = "/kafka-storm";
            // 类似group name
            String id = "spout-1";

            BrokerHosts brokerHosts = new ZkHosts(zks);
            SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot,
                    id);
            spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
            // 记录Spout读取进度所用的zookeeper的host,即记录offset位置的zk
            List<String> servers = new ArrayList<>();
            servers.add("192.168.154.128");
            spoutConf.zkServers = servers;
            spoutConf.zkPort = 2181;
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            // 线程数应该等于topic分区数
            topologyBuilder.setSpout("SimpleSpout", new KafkaSpout(spoutConf),
                    1);
            topologyBuilder.setBolt("SimpleBolt", new GPSBolt(), 1)
                    .shuffleGrouping("SimpleSpout");

注意:bolt一定要是ack模式的,推荐bolt继承BaseBasicBolt

3、zk存储消息

© 著作权归作者所有

下一篇: ZOOKEEPER监控
吹比龙
粉丝 9
博文 134
码字总数 38691
作品 0
合肥
程序员
私信 提问
基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats)

基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats) 网盘地址:https://pan.baidu.com/s/1-59o76H32Jfp0Vp-o8zLYg 提取码: im8f 备用地址(腾讯微云):https://share...

小小搞笑弟
2018/11/16
0
0
Apache Storm 0.9.3 发布,分布式实时计算系统

Apache Storm 0.9.3 发布,此版本现已提供下载。此版本总共包括 62 位独立贡献者,超过 100 处的改进和修复。 此版本主要改进了 Kafka 集成,添加了 HDFS 集成和 HBase 集成。更多内容请看发...

oschina
2014/11/26
2.6K
9
Apache Storm 0.9.6/0.10.0 发布

Apache Storm 0.10.0 发布,此版本是个稳定版本,相比之前的 Beta 版本主要包括 bug 修复和改进: STORM-1108: Fix NPE in simulated time STORM-1106: Netty should not limit attempts to...

oschina
2015/11/06
4.7K
8
Apache Storm 1.2.0,1.1.2 和 1.0.6 发布

Apache Storm 1.2.0,1.1.2 和 1.0.6 发布了。主要更新内容及下载地址如下: 1.2.0 New Feature [STORM-2383] - [storm-hbase] Support HBase as state backend [STORM-2484] - Flux: suppo......

达尔文
2018/02/18
923
0
【Storm】- Storm集成kafka

Storm 流式处理Kafka数据 --- tips 老版本:官方文档 新版本:官方文档 Storm可集成组件: 测试代码 需求:给kafka数据添加日期 实际用途:可根据业务续期自定义,例如解析Nginx日志ip限制访...

ZeroneLove
04/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

group by分组后获得每组中时间最大的那条记录

用途: GROUP BY 语句用于 对一个或多个列对结果集进行分组。 例子: 原表: 现在,我们希望根据USER_ID 字段进行分组,那么,可使用 GROUP BY 语句。 我们使用下列 SQL 语句: SELECT ID,US...

豆花饭烧土豆
29分钟前
0
0
android6.0源码分析之Camera API2.0下的Preview(预览)流程分析

本文将基于android6.0的源码,对Camera API2.0下Camera的preview的流程进行分析。在文章android6.0源码分析之Camera API2.0下的初始化流程分析中,已经对Camera2内置应用的Open即初始化流程进...

天王盖地虎626
39分钟前
2
0
java 序列化和反序列化

1. 概述 序列恢复为Java对象的过程。 对象的序列化主要有两 首先我们介绍下序列化和反序列化的概念: 序列化:把Java对象转换为字节序列的过程。 反序列化:把字节序列恢复为Java对象的过程。...

edison_kwok
50分钟前
2
0
分布式数据一致性

狼王黄师傅
今天
2
0
经验

相信每位开发者在自己开发的过程中,都会反思一些问题,比如怎样提高编程能力、如何保持心态不砍产品经理、996 之后怎样恢复精力……最近开发者 Tomasz Łakomy 将他 7 年的开发生涯中学习到...

WinkJie
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部