文档章节

storm1.1.0集成kafka

dhlsoft
 dhlsoft
发布于 2017/06/14 15:02
字数 695
阅读 192
收藏 1

storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景。

下面代码运行的前提是zookeeper、kafka、storm正常运行,建立一个maven项目,在pom.xml写入如下代码:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>strom</groupId>
  <artifactId>strom</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>strom</name> 
  	<repositories>
		<repository>
		    <id>nexus-aliyun</id>
		    <name>Nexus aliyun</name>
		    <layout>default</layout>
		    <url>http://maven.aliyun.com/nexus/content/groups/public</url>
		    <snapshots>
		        <enabled>false</enabled>
		    </snapshots>
		    <releases>
		        <enabled>true</enabled>
		    </releases>
		</repository>
	</repositories>
     <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
		    <groupId>org.apache.storm</groupId>
		    <artifactId>storm-kafka</artifactId>
		    <version>1.1.0</version>
		    <exclusions>
		    	<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
            </exclusions>
		</dependency>
        <dependency>
		    <groupId>org.apache.kafka</groupId>
		    <artifactId>kafka_2.11</artifactId>
		    <version>0.10.2.1</version>
		    <exclusions>
		    	<exclusion>
		    		<groupId>org.slf4j</groupId>
		    		<artifactId>slf4j-log4j12</artifactId>
		    	</exclusion>
		    </exclusions>
        </dependency>
        
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>KafkaTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

将slf4j-log4j12.jar从相关的jar中排除,不然报

java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.Log4jLoggerFactory

                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>

将kafka-clients排除,默认加载的版本太低

                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>

消费kafka的数据,输出保存到一个文件中,Bolt逻辑KafkaBolt类的代码如下:

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class KafkaBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String word = (String) input.getValue(0);
		String out = "output:" + word;
		System.out.println(out);

		// 写文件
		try {
			DataOutputStream out_file = new DataOutputStream(new FileOutputStream("kafkastorm.out"));
			out_file.writeUTF(out);
			out_file.close();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		collector.emit(new Values(out));

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// declarer.declare(new Fields("message"));

	}
}

然后我们编写主类,类名为KafkaTopology,代码如下:


import java.util.ArrayList;
import java.util.List;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class KafkaTopology {

	public static void main(String[] args) throws Exception {
		// zookeeper的服务器地址 多个逗号分隔
		String zks = "localhost:2181";
		// 消息的topic
		String topic = "test";
		// kafka-strom在zookeeper上的根 用于记录其消费的offset位置
		String zkRoot = "/kafka-storm";
		// 类似group name
		String id = "spout-1";
		BrokerHosts brokerHosts = new ZkHosts(zks);
		SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
		// 记录Spout读取进度所用的zookeeper的host,即记录offset位置的zk
		List<String> servers  = new ArrayList<>();
		servers.add("localhost");
		spoutConf.zkServers = servers;
		spoutConf.zkPort = 2181;
		TopologyBuilder builder = new TopologyBuilder();
		// 线程数应该等于topic分区数
		builder.setSpout("SimpleSpout", new KafkaSpout(spoutConf), 1);
		builder.setBolt("SimpleBolt", new KafkaBolt(), 1).shuffleGrouping("SimpleSpout");
		Config conf = new Config();
		conf.setDebug(false);

		if (args != null && args.length > 0) {
			// 集群模式
			conf.setNumWorkers(2);
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		} else {
			// 本地模式
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("word-count", conf, builder.createTopology());
//			Thread.sleep(10000);
//			cluster.shutdown();
		}
	}
}

进行本地测试,通过终端生产消息,进行结果查看

./kafka-console-producer.sh broker-list localhost:9092 --topic test

 

© 著作权归作者所有

dhlsoft
粉丝 1
博文 30
码字总数 24879
作品 0
沈阳
其他
私信 提问
Splunk集成Kafka配置方法

Splunk是业界赫赫有名的数据分析工具,比较擅长BI和安全日志分析,我司很多部门都有购买其产品和服务。最近有个需求要把Splunk和分布式消息队列Kafka做个集成,Splunk官方提供的一个Kafka的插...

半夜菊花茶
2017/11/24
0
0
Spring Kafka 2.1.0 发布,Spring 集成框架扩展

Spring Kafka 2.1.0 已发布,Spring Kafka 是 Spring 官方提供的一个 Spring 集成框架的扩展,用来为使用 Spring 框架的应用程序提供 Kafka 框架的集成。 点此了解更多: https://docs.sprin...

淡漠悠然
2017/12/01
1K
2
Spring Kafka 1.1.7 和 1.2.3 发布

Spring Kafka 1.1.7 和 1.2.3 已发布,Spring Kafka 是 Spring 官方提供的一个 Spring 集成框架的扩展,用来为使用 Spring 框架的应用程序提供 Kafka 框架的集成。 点此了解更多: https://...

淡漠悠然
2017/09/21
522
1
Spring Kafka 2.0.0.M2 发布

Spring Kafka 2.0.0.M2 发布了,项目及版本参考内容: http://docs.spring.io/spring-kafka/docs/2.0.0.BUILD-SNAPSHOT/reference/html/ Spring Kafka 是 Spring 官方提供的一个 Spring 集成......

淡漠悠然
2017/05/15
530
0
Spring Boot集成Kafka

Spring Boot集成Kafka Spring Boot集成Kafka 前提介绍 Kafka Kafka安装与使用 Spring Boot集成 总结 参考资料 前提介绍 由于公司使用了微服务架构,很多业务拆成了很多小模块。 有个场景是这...

流水不腐小夏
2017/11/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
今天
5
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
今天
6
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
今天
4
0
OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
1K
11
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
40
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部