storm1.1.0集成kafka

原创
2017/06/14 15:02
阅读数 357

storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景。

下面代码运行的前提是zookeeper、kafka、storm正常运行,建立一个maven项目,在pom.xml写入如下代码:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>strom</groupId>
  <artifactId>strom</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>strom</name> 
  	<repositories>
		<repository>
		    <id>nexus-aliyun</id>
		    <name>Nexus aliyun</name>
		    <layout>default</layout>
		    <url>http://maven.aliyun.com/nexus/content/groups/public</url>
		    <snapshots>
		        <enabled>false</enabled>
		    </snapshots>
		    <releases>
		        <enabled>true</enabled>
		    </releases>
		</repository>
	</repositories>
     <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
		    <groupId>org.apache.storm</groupId>
		    <artifactId>storm-kafka</artifactId>
		    <version>1.1.0</version>
		    <exclusions>
		    	<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
            </exclusions>
		</dependency>
        <dependency>
		    <groupId>org.apache.kafka</groupId>
		    <artifactId>kafka_2.11</artifactId>
		    <version>0.10.2.1</version>
		    <exclusions>
		    	<exclusion>
		    		<groupId>org.slf4j</groupId>
		    		<artifactId>slf4j-log4j12</artifactId>
		    	</exclusion>
		    </exclusions>
        </dependency>
        
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>KafkaTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

将slf4j-log4j12.jar从相关的jar中排除,不然报

java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.Log4jLoggerFactory

                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>

将kafka-clients排除,默认加载的版本太低

                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>

消费kafka的数据,输出保存到一个文件中,Bolt逻辑KafkaBolt类的代码如下:

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class KafkaBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String word = (String) input.getValue(0);
		String out = "output:" + word;
		System.out.println(out);

		// 写文件
		try {
			DataOutputStream out_file = new DataOutputStream(new FileOutputStream("kafkastorm.out"));
			out_file.writeUTF(out);
			out_file.close();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		collector.emit(new Values(out));

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// declarer.declare(new Fields("message"));

	}
}

然后我们编写主类,类名为KafkaTopology,代码如下:


import java.util.ArrayList;
import java.util.List;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class KafkaTopology {

	public static void main(String[] args) throws Exception {
		// zookeeper的服务器地址 多个逗号分隔
		String zks = "localhost:2181";
		// 消息的topic
		String topic = "test";
		// kafka-strom在zookeeper上的根 用于记录其消费的offset位置
		String zkRoot = "/kafka-storm";
		// 类似group name
		String id = "spout-1";
		BrokerHosts brokerHosts = new ZkHosts(zks);
		SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
		// 记录Spout读取进度所用的zookeeper的host,即记录offset位置的zk
		List<String> servers  = new ArrayList<>();
		servers.add("localhost");
		spoutConf.zkServers = servers;
		spoutConf.zkPort = 2181;
		TopologyBuilder builder = new TopologyBuilder();
		// 线程数应该等于topic分区数
		builder.setSpout("SimpleSpout", new KafkaSpout(spoutConf), 1);
		builder.setBolt("SimpleBolt", new KafkaBolt(), 1).shuffleGrouping("SimpleSpout");
		Config conf = new Config();
		conf.setDebug(false);

		if (args != null && args.length > 0) {
			// 集群模式
			conf.setNumWorkers(2);
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		} else {
			// 本地模式
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("word-count", conf, builder.createTopology());
//			Thread.sleep(10000);
//			cluster.shutdown();
		}
	}
}

进行本地测试,通过终端生产消息,进行结果查看

./kafka-console-producer.sh broker-list localhost:9092 --topic test

 

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部