文档章节

如何在eclipse调试storm程序

cloud-coder
 cloud-coder
发布于 2014/02/16 10:28
字数 837
阅读 10089
收藏 5

 一、介绍 

     storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。

      Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies

      因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。

  二、实施步骤

      如何基于eclipse+maven调试storm程序,步骤如下:

      1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)

       2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)

          Github上的pom.xml,引入的依赖太多,有些不需要,详细可以参考:

          https://github.com/nathanmarz/storm-starter/blob/master/m2-pom.xml

       3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount

          重要的是LocalCluster cluster = new LocalCluster();这一句

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>storm.starter</groupId>
	<artifactId>storm-starter</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<repositories>
		<repository>
			<id>github-releases</id>
			<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
		</repository>
		<repository>
			<id>clojars.org</id>
			<url>http://clojars.org/repo</url>
		</repository>
	</repositories>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>storm</groupId>
			<artifactId>storm</artifactId>
			<version>0.9.0.1</version>
			<!-- keep storm out of the jar-with-dependencies -->
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>commons-collections</groupId>
			<artifactId>commons-collections</artifactId>
			<version>3.2.1</version>
		</dependency>
	</dependencies>
</project>

storm程序

package storm.starter;

import java.util.HashMap;
import java.util.Map;

import storm.starter.spout.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * This topology demonstrates Storm's stream groupings and multilang
 * capabilities.
 */
public class WordCountTopology {
	public static class SplitSentence extends BaseBasicBolt {
		@Override
		public void execute(Tuple input, BasicOutputCollector collector) {
			try {
				String msg = input.getString(0);
				System.out.println(msg + "-------------------");
				if (msg != null) {
					String[] s = msg.split(" ");
					for (String string : s) {
						collector.emit(new Values(string));
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			declarer.declare(new Fields("word"));
		}
	}

	public static class WordCount extends BaseBasicBolt {
		Map<String, Integer> counts = new HashMap<String, Integer>();

		@Override
		public void execute(Tuple tuple, BasicOutputCollector collector) {
			String word = tuple.getString(0);
			Integer count = counts.get(word);
			if (count == null)
				count = 0;
			count++;
			counts.put(word, count);
			collector.emit(new Values(word, count));
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			declarer.declare(new Fields("word", "count"));
		}
	}

	public static void main(String[] args) throws Exception {

		TopologyBuilder builder = new TopologyBuilder();

		builder.setSpout("spout", new RandomSentenceSpout(), 5);

		builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(
				"spout");
		builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",
				new Fields("word"));

		Config conf = new Config();
		conf.setDebug(true);

		if (args != null && args.length > 0) {
			conf.setNumWorkers(3);

			StormSubmitter.submitTopology(args[0], conf,
					builder.createTopology());
		} else {
			conf.setMaxTaskParallelism(3);

			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("word-count", conf, builder.createTopology());

			Thread.sleep(10000);

			cluster.shutdown();
		}
	}
}
package storm.starter.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

public class RandomSentenceSpout extends BaseRichSpout {
  SpoutOutputCollector _collector;
  Random _rand;


  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
    _rand = new Random();
  }

  @Override
  public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    String sentence = sentences[_rand.nextInt(sentences.length)];
    _collector.emit(new Values(sentence));
  }

  @Override
  public void ack(Object id) {
  }

  @Override
  public void fail(Object id) {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

}

三、参考资料

https://xumingming.sinaapp.com/163/twitter-storm-%E6%9C%AC%E5%9C%B0%E6%A8%A1%E5%BC%8F%E7%AE%80%E4%BB%8B/

https://github.com/nathanmarz/storm/wiki/Tutorial

© 著作权归作者所有

cloud-coder
粉丝 247
博文 194
码字总数 141537
作品 0
广州
架构师
私信 提问
加载中

评论(1)

晨磊
晨磊
你好,我用eclipse 执行你的代码,报错误
3101 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper at port 2000 and dir C:\Users\ADMINI~1\AppData\Local\Temp\/898dbbc3-c3cf-4b0b-8243-40d570804404
3290 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "C:\\Users\\ADMINI~1\\AppData\\Local\\Temp\\/3c504e3
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
526
1
Apache Storm 2.0.0 发布,基于 Java ​​​​​​​的新架构

Apache Storm 2.0.0 发布了,距离它上次更新已过去一年,新版本在性能、新功能和与外部系统的集成方面进行了重大改进,下面是一些主要功能及改进: 用 Java 实现的新架构 在之前的版本中,S...

xplanet
06/03
3K
5
使用 Twitter Storm 处理实时的大数据

使用 Twitter Storm 处理实时的大数据 流式处理大数据简介 IBM DW/M. Tim Jones, 独立作家, 顾问 简介: Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与...

IBMdW
2012/12/06
6.4K
3
Storm同步调用之DRPC模型探讨

  摘要:Storm的编程模型是一个有向无环图,决定了storm的spout接收到外部系统的请求后,spout并不能得到bolt的处理结果并将结果返回给外部请求。所以也就决定了storm无法提供对外部系统的同...

刘洋intsmaze
2017/09/28
0
0
由提交storm项目jar包引发对jar的原理的探索

序:在开发storm项目时,提交项目jar包当把依赖的第三方jar包都打进去提交storm集群启动时报了发现多个同名的文件错误由此开始了一段对jar包的深刻理解之路。 这里说明stom集群环境中有storm...

刘洋intsmaze
2016/10/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如何查看ubuntu的版本

cat /etc/issue

南桥北木
10分钟前
0
0
超详细Linux下QT使用appimage打包程序

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/kuer1379/article/details/82885810 一 打包工具 1.linuxd...

shzwork
16分钟前
1
0
Checkstyle的style

checkstyle是什么? 是代码规范检查,关于各种格式的利弊这里就不说了,但是业内有一些总结的规范利于goole或者阿里有自己的代码规范,就需要用到checkstyle。我个人很讨厌这东西,奈何项目再...

stayStand
19分钟前
2
0
左边竖条的实现方法

下面这个图形,只使用一个标签,可以有多少种实现方式: 假设我们的单标签是一个 div : 1 < div > div> 定义如下通用CSS: 1 2 3 4 5 6 div{ position : relative ; width : 200px ; height ...

前端老手
38分钟前
2
0
java利用ECHARTS.JS在前台显示图表

步骤1: (1)在java后台,使用MSQL分组函数,列出所有线在对应的点的值, (2)组成的Map如图所示: 注意: key为0的value表示X轴需要的数据;key为其他的值表示图表线条的名字,value为x轴的点对应的y...

文文1
40分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部