文档章节

windows 安装 storm 及 eclipse 调试 TopN 实例

大数据之路
 大数据之路
发布于 2012/06/08 22:03
字数 1417
阅读 560
收藏 0

一:安装JDK

下载地址:地址一  地址二

配置Java环境变量 JAVA_HOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考):

D:\java\jdk1.8

%JAVA_HOME%/bin;%JAVA_HOME%/jre/bin

.;%JAVA_HOME%/lib/dt.jar;%JAVA_HOME%/lib/tools.jar (要加.表示当前路径)

二:安装 Python

这是为了测试安装效果,我们将部署 storm-starter project案例中word coun程序,用的是python写的multi-lang bolt,使用python 2.7.11,安装路径在:

C:\Python27\

三:安装并运行ZooKeeper

Download Apache Zookeeper 3.4.8 ,解压配置:

> cd zookeeper-3.4.8 
> copy conf\zoo_sample.cfg conf\zoo.cfg 
> .\bin\zkServer.cmd

四:安装Storm

Storm的windows官方版还没有释放,here.下载,源码here下载。

注意1:

源码一定要用这个版本,否则启动会报各种错误,而这些错误和 jdk、python、zookeeper、eclipse 版本都无关。

http://dl.dropboxusercontent.com/s/iglqz73chkul1tu/storm-0.9.1-incubating-SNAPSHOT-12182013.zip

配置Storm环境变量

  • Storm需要STORM_HOME和JAVA_HOME,比如STORM_HOME为:

C:\storm-0.9.1-incubating-SNAPSHOT-12182013\

  • 在PATH中加入:

%STORM_HOME%\bin;C:\Python27\Lib\site-packages\;C:\Python27\Scripts\

此处与参考文章略有不同,下图是参考文章给出的配置

stormpathsetoasubfh

 JAVA_HOME已经在安装JDK时手动配置了环境变量,而Python好像是默认自动就会配置好环境变量的,

我的Python目录下没有Scripts文件夹,暂时这样配置就可以了,不影响下面的使用。

五:启动Nimbus, Supervisor, and Storm UI Daemons

  • Nimbus

注意2:

一定要在 STORM_HOME 目录下执行后续命令,否则会报错。

ERROR backtype.storm.event - Error when processing event
java.lang.RuntimeException: java.io.InvalidClassException: clojure.lang.APersistentMap; local class incompatible: stream classdesc serialVersionUID = 8648225932767613808, local class serialVersionUID = 270281984708184947
        at backtype.storm.utils.Utils.deserialize(Utils.java:86) ~[storm-core-0.9.1-incubating-SNAPSHOT-12182013.jar:na]

> cd %STORM_HOME%      

> storm nimbus

  • Supervisor

> cd %STORM_HOME% 

> storm supervisor

  • Storm UI                          # 可选,也可以用 storm list 查看所有 storm 任务

> cd %STORM_HOME% 

> storm ui

浏览器打开http://localhost:8080/ 可看到Storm运行。

六:部署 Word count

下载download a pre-built jar

部署这个jar在本地:

> storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost

如果你刷新 Storm UI页面,会看到 “WordCount” topology显示列出,点按链接确认它处理数据。

七:eclipse 调试 TopN 实例

storm 求 csdn 密码库中密码出现的 topN,并直接在 eclipse 中调试运行:

package com.bj.test.top10;

/** 
* @Author:tester 
* @DateTime:2016年6月21日 下午7:58:45 
* @Description:  Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。
* @Version:1.0
*/
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class PasswdSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;
	private FileReader fileReader;
	private boolean completed = false;

	public void ack(Object msgId) {
		System.out.println("==============OK:" + msgId);
	}

	public void close() {
	}

	public void fail(Object msgId) {
		System.out.println("++++++++++++++FAIL:" + msgId);
	}

	/** 
     * 这是Spout最主要的方法,在这里我们读取文本文件,并把它的每一行发射出去(给bolt) 
     * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下 
     * **/  
	public void nextTuple() {
		/**
		 * The nextuple it is called forever, so if we have been readed the file
		 * we will wait and then return
		 */
		if (completed) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// Do nothing
			}
			return;
		}
		String line;
		// Open the reader
		BufferedReader reader = new BufferedReader(fileReader);
		try {
			// Read all lines
			while ((line = reader.readLine()) != null) {
				String[] words = line.split("#");
				String passwd = words[1].trim();
				// Emit the word  
                collector.emit(new Values(passwd));
		        /*for(String word : words){
		            word = word.trim();
		            if(!word.isEmpty()){
		                word = word.toLowerCase();
		                // Emit the word  
		                collector.emit(new Values(word));
		            }
		        }*/
			}
		} catch (Exception e) {
			throw new RuntimeException("Error reading tuple", e);
		} finally {
			completed = true;
		}
	}

	/** 
     * 这是第一个方法,里面接收了三个参数,第一个是创建Topology时的配置, 
     * 第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt 
     * **/  
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		try {
			//获取创建Topology时指定的要读取的文件路径 
			this.fileReader = new FileReader(conf.get("wordsFile").toString());
		} catch (FileNotFoundException e) {
			throw new RuntimeException("Error reading file [" + conf.get("wordFile") + "]");
		}
		//初始化发射器  
		this.collector = collector;
	}

	/**
	 * Declare the output field "word"
	 */
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}


/////////////////////////////////////////////////////////////////////////////////////////////

package com.bj.test.top10;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

import static com.bj.test.top10.SortMapByValue.*;

public class Top10Bolt extends BaseBasicBolt {

	Integer id;
	String name;
	NavigableMap<String, Integer> counters;

	/** 
     * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里 
     * 因为这只是个Demo,我们用它来打印我们的计数器 
     * */  
	@Override
	public void cleanup() {
		System.out.println(">>>>>>>>>>>> Word Counter ["+name+"-"+id+"] <<<<<<<<<<<");
		/*for(Map.Entry<String, Integer> entry : counters.entrySet()){
			System.out.println(entry.getKey()+": "+entry.getValue());
		}*/
		printMap(list2Map(sortMapByValuesTopN(counters, 10)));
	}

	/**
	 * On create 
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		this.counters = new TreeMap<String, Integer>().descendingMap();
		this.name = context.getThisComponentId();
		this.id = context.getThisTaskId();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {}

	//	Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用
	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String word = input.getString(0);
		/**
		 * If the word dosn't exist in the map we will create
		 * this, if not We will add 1 
		 */
		if(!counters.containsKey(word)){
			counters.put(word, 1);
		}else{
			Integer count = counters.get(word) + 1;
			counters.put(word, count);
		}
	}
}


/////////////////////////////////////////////////////////////////////////////////////////////

package com.bj.test.top10;

/** 
* @Author:tester 
* @DateTime:2016年6月21日 下午7:52:32 
* @Description: 
* @Version:1.0
*/
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class TopologyMain {
	public static void main(String[] args) throws InterruptedException {

		// 定义一个Topology
		TopologyBuilder builder = new TopologyBuilder();
		// executor的数目, set parallelism hint to 4
		builder.setSpout("PasswdSpout", new PasswdSpout(), 1);
		// set tasks number to 4
		builder.setBolt("Top10Bolt", new Top10Bolt(), 1).setNumTasks(1).fieldsGrouping("PasswdSpout",
				new Fields("word"));

		// 配置
		Config conf = new Config();
		conf.put("wordsFile", "H:\\mysql\\csdn_database\\www.csdn.net.100.sql");
		//		conf.put("wordsFile", "H:\\mysql\\csdn_database\\www.csdn.net.sql");
		conf.setDebug(false);
		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		// use two worker processes
		// conf.setNumWorkers(4);

		// 创建一个本地模式cluster
		LocalCluster cluster = new LocalCluster();
		// 提交Topology
		cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
		Thread.sleep(1000);
		cluster.shutdown();
	}
}

Refer:

[1] windows安装storm

http://blog.csdn.net/jiutianhe/article/details/41211403

[2] storm异常集锦

        http://ganliang13.iteye.com/blog/2117722

        http://bimoziyan.iteye.com/blog/1981116

[2] storm教程二、安装部署

http://www.cnblogs.com/jinhong-lu/p/4634912.html   

[3] Storm实战之WordCount

http://m635674608.iteye.com/blog/2221179

[4] Storm的并行度、Grouping策略以及消息可靠处理机制简介

http://m635674608.iteye.com/blog/2232221

[5] Storm的滑动窗口

http://zqhxuyuan.github.io/2015/09/10/2015-09-10-Storm-Window/

[6] [Storm中文文档]Trident教程

http://bit.ly/29vRNgm

http://blog.csdn.net/lujinhong2/article/details/47132313

[7] Storm Trident API 实践

http://blog.csdn.net/suifeng3051/article/details/41118721

[8] flume+kafka+storm运行实例

http://my.oschina.net/u/2000675/blog/613747

[9] Kafka+Storm+HDFS整合实践

http://shiyanjun.cn/archives/934.html

 

© 著作权归作者所有

大数据之路
粉丝 1605
博文 514
码字总数 333995
作品 0
武汉
架构师
私信 提问
加载中

评论(1)

紫电清霜
紫电清霜
从日志分类,侧边栏来看,楼主掌握的技术真多!额!!
如何在eclipse调试storm程序

一、介绍 storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。 Storm has two modes of operation: local mode and distributed mode. In loca...

cloud-coder
2014/02/16
10.1K
1
storm-环境搭建和第一个topology

从原理到操作,还是有点距离 :) 基于 Linux ubuntu 3.13.0-24-generic ------------- 预备工作 * java * python(>=2.6) * zeromq * jzmq * zookeeper 下载(不需安装) wget https://github......

深蓝苹果
2014/06/10
411
0
Apache Slider + Storm

Apache Slider + Storm 系统环境 安装如下组件,部署可用环境 JDK 1.7.0_79 Apache Zookeeper 3.4.* Apache Zookeeper Apache Hadoop 2.6.* Apache Hadoop Apache Storm 0.9.4 Apache Storm......

Yulong_
2016/09/21
453
0
SODBASE CEP学习(四):类SQL语言EPL与Storm或jStorm集成

开发者社区活动,SODBASE产品的用户现在可以领礼品啦 Storm框架原本是设计用来做互联网短文本处理和一些统计工作的,是一种分布式流式计算框架。在一些场合,特别是在已经用了Storm架构以后,...

wishuhappyyear
2015/04/30
0
0
Twitter Storm Ubuntu 单机安装

第 121 章 Twitter Storm 目录 121.1. 单机版121.2. lein 安装 121.1. 单机版 操作系统环境:Ubuntu 13.04 KVM虚拟机 安装 storm 涉及到安装以下包:python、zookeeper、zeromq、jzmq、storm...

netkiller-
2013/08/02
2K
0

没有更多内容

加载失败,请刷新页面

加载更多

Centos7 python2.7和yum完全卸载及重装

                                     完全重装python和yum 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 1、删除现有...

roockee
20分钟前
5
0
【软件工程】绪论,深入浅出理解软件工程

软件和软件工程 什么是软件工程 软件工程是贯穿整个软件生命周期的工程学和方法论及其使用的工具(我说的) 软件工程要解决那些问题 软件的研发周期过长 软件开发成本居高不下 软件在交付之前...

丌官尚雄
27分钟前
5
0
无回路有向图的拓扑排序

因公司业务需要,在表单中每个字段都会配置自动计算,但自动计算公式中会引用到其他字段中的值。所以希望可以根据计算公式,优先计算引用的公式。所以最终使用了无回路有向图的扩扑排序来实现...

兜兜毛毛
今天
6
0
如何抢占云栖大会C位?史上最强强强攻略来了

点击观看视频: APSARA云栖大会开发者情怀 原文链接 本文为云栖社区原创内容,未经允许不得转载。

阿里云官方博客
今天
6
0
Kubernetes 从懵圈到熟练:集群服务的三个要点和一种实现

作者 | 声东 阿里云售后技术专家 文章来源:Docker,点击查看原文。 以我的经验来讲,理解 Kubernetes 集群服务的概念,是比较不容易的一件事情。尤其是当我们基于似是而非的理解,去排查服务...

阿里巴巴云原生
今天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部