文档章节

二、Storm入门之Hello Storm(续)

datapro
 datapro
发布于 2015/06/16 19:57
字数 1666
阅读 274
收藏 1

上一页

package bolts;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
public class WordNormalizer implements IRichBolt{
    private OutputCollector collector;
 
    public void cleanup(){}
 
    /**
    * The bolt will receive the line from the
    * words file and process it to Normalize this line
    *
    * The normalize will be put the words in lower case
    * and split the line to get all words in this
    */
 
    public void execute(Tuple input) {
        String sentence = input.getString(0);
        String[]words= sentence.split(" ");
        for(String word:words){
            word =word.trim();
            if(!word.isEmpty()){
                word =word.toLowerCase();
                //Emit the word
                List a =new ArrayList();
                a.add(input);
                collector.emit(a,new Values(word));
            }
        }
        // Acknowledge the tuple
        collector.ack(input);
    }
 
    public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
        this.collector=collector;
    }
 
    /**
    * The bolt will only emit the field "word"
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
 
}

提示:在这个类中,每调用一次execute()方法,会发送多个元组。例如,当execute()方法收到“This is the Storm book”这个句子时,该方法会发送5个新元组。

第二个bolt,WordCounter,负责统计每个单词个数。当topology结束时(cleanup()方法被调用时),显示每个单词的个数。

提示:第二个bolt中什么也不发送,本例中,将数据添加到一个map对象中,但是现实生活中,bolt可以将数据存储到一个数据库中。

package bolts;
 
import java.util.HashMap;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
 
public class WordCounter implements IRichBolt{
    Integer id;
    String name;
    Map<String,Integer>counters;
 
    private OutputCollector collector;
 
    /**
    * At the end of the spout (when the cluster is shutdown
    * We will show the word counters
    */
 
    @Override
 
    public void cleanup(){
        System.out.println("-- Word Counter ["+name+"-"+id+"]--");
        for(Map.Entry<String,Integer>entry: counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }
 
    /**
    * On each word We will count
    */
    @Override
 
    public void execute(Tuple input) {
        String str =input.getString(0);
        /**
        * If the word dosn't exist in the map we will create
        * this, if not We will add 1
        */
        if(!counters.containsKey(str)){
            counters.put(str,1);
        }else{
            Integer c =counters.get(str) +1;
            counters.put(str,c);
        }
        //Set the tuple as Acknowledge
        collector.ack(input);
    }
 
    /**
    * On create
    */
 
    @Override
 
    public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
        this.counters=newHashMap<String,Integer>();
        this.collector=collector;
        this.name=context.getThisComponentId();
        this.id=context.getThisTaskId();
    }
 
    @Override
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
 
}

execute()方法使用一个映射(Map类型)采集单词并统计这些单词个数。当topology结束的时候,cleanup()方法被调用并且打印出counter映射。(这仅仅是个例子,通常情况下,当topology关闭时,你应该使用cleanup()方法关闭活动链接和其他资源。)

主类

在主类中,你将创建topology和一个LocalCluster对象,LocalCluster对象使你可以在本地测试和调试topology。LocalCluster结合Config对象允许你尝试不同的集群配置。例如,如果不慎使用一个全局变量或者类变量,当配置不同数量的worker测试topology的时候,你将会发现这个错误。(关于config对象在第三章会有更多介绍)

提示:所有的topology结点应该可以在进程间没有数据共享的情形下独立运行(也就是说没有全局或者类变量),因为当topology运行在一个真实的集群上时,这些进程可能运行在不同的机器上。

你将使用TopologyBuilder创建topology,TopologyBuilder会告诉Storm怎么安排节点顺序、它们怎么交换数据。

TopologyBuilder builder =new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));

本例中spout和bolt之间使用随机分组(shuffleGrouping)连接,这种分组类型告诉Storm以随机分布的方式从源节点往目标节点发送消息。

接着,创建一个包含topology配置信息的Config对象,该配置信息在运行时会与集群配置信息合并,并且通过prepare()方法发送到所有节点。

Config conf =new Config();
conf.put("wordsFile",args[0]);
conf.setDebug(false);

将wordFile属性设置为将要被spout读取的文件名称(文件名在args参数中传入),并将debug属性设置为true,因为你在开发过程中,当debug为true时,Storm会打印节点间交换的所有消息和其他调试数据,这些信息有助于理解topology是如何运行的。

前面提到,你将使用LocalCluster来运行topology。在一个产品环境中,topology会持续运行,但是在本例中,你仅需运行topology几秒钟就能看到结果。

LocalCluster cluster =new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();

使用createTopology和submitTopology创建、运行topology,睡眠两秒(topology运行在不同的线程中),然后通过关闭集群来停止topology。

例2-3将上面代码拼凑到一起。

例2-3.src/main/java/TopologyMain.java

import spouts.WordReader;
import bolts.WordCounter;
import bolts.WordNormalizer;
 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
 
public class TopologyMain{
    public static void main(String[]args)throws InterruptedException{
    //Topology definition
        TopologyBuilder builder =new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
        builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));
 
    //Configuration
        Config conf =new Config();
        conf.put("wordsFile",args[0]);
        conf.setDebug(false);
 
    //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
        LocalCluster cluster =new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
        Thread.sleep(1000);
        cluster.shutdown();
    }
 
}

运行本项目

现在开始准备运行第一个topology!如果你新建一个文本文件(src/main/resources/words.txt)并且每行一个单词,则可以通过如下命令运行这个topology:

mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt”

例如,如果你使用如下words.txt文件:

Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great

在日志中,你将会看到类似如下信息:

is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1

在本例中,你只使用了每个结点的一个单一实例,假如此时有一个非常大的日志文件怎么去统计每个单词的个数?此时可以很方便地改系统中节点数量来并行工作,如创建WordCounter的两个实例:

1
builder.setBolt(     "word-counter"     ,     new     WordCounter(),     2     ).shuffleGrouping(     "word-normalizer"     );

重新运行这个程序,你将看到:

– Word Counter [word-counter-2] –
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
– Word Counter [word-counter-3] –
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1

太棒了!改变并行度,so easy(当然,在实际生活中,每个实例运行在不同的机器中)。但仔细一看似乎还有点问题:“is”和“great”这两个单词在每个WordCounter实例中都被计算了一次。Why?当使用随机分组(shuffleGrouping)时,Storm以随机分布的方式向每个bolt实例发送每条消息。在这个例子中,将相同的单词发送到同一个WordCounter实例是更理想的。为了实现这个,你可以将shuffleGrounping(“word-normalizer”)改成fieldsGrouping(“word-normalizer”,new Fields(“word”))。尝试一下并重新运行本程序来确认结果。后面的章节你将看到更多关于分组和消息流的内容。

总结

本章我们讨论了Storm的本地操作模式和远程操作模式的不同,以及用Storm开发的强大和简便。同时也学到了更多关于Storm的基本概念,我们将在接下来的章节深入解释这些概念。

上一页

本文转载自:http://www.flyne.org/article/42/3

datapro
粉丝 16
博文 37
码字总数 22063
作品 0
广州
高级程序员
私信 提问
加载中

评论(0)

Twitter Storm入门

.通过学习tutorial了解storm的整体架构(https://github.com/nathanmarz/storm/wiki/Tutorial) 通过学习Concepts了解storm的关键概念(https://github.com/nathanmarz/storm/wiki/Concepts......

加油_张
2013/09/14
365
0
SODBASE CEP学习(四):类SQL语言EPL与Storm或jStorm集成

开发者社区活动,SODBASE产品的用户现在可以领礼品啦 Storm框架原本是设计用来做互联网短文本处理和一些统计工作的,是一种分布式流式计算框架。在一些场合,特别是在已经用了Storm架构以后,...

wishuhappyyear
2015/04/30
0
0
Apache Storm 1.0.3 发布,分布式实时计算

Apache Storm 1.0.3 发布了,Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像 Hadoop 一样实现实时批处理。Storm 很简单,可用于任意编程语言。Apache Storm ...

王练
2017/02/05
1.2K
0
如何在eclipse调试storm程序

一、介绍 storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。 Storm has two modes of operation: local mode and distributed mode. In loca...

cloud-coder
2014/02/16
1W
1
Storm入门 第三章 Storm安装部署步骤

本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出。 3.1 Storm集群组件 Storm集群中包含...

坏坏一笑
2014/12/03
126
0

没有更多内容

加载失败,请刷新页面

加载更多

Paragon NTFS for Mac(Mac读写ntfs磁盘工具) v15.5.106

Paragon NTFS for Mac(Mac读写ntfs磁盘工具)。它专门开发用来弥补Windows和Mac OS X之间的不兼容性,通过在Mac OS X系统下提供对任何版本的NTFS文件系统完全的读写访问服务来弥合这种不兼容性...

麦克W
17分钟前
9
0
DeepFaceLab上云之滴滴云使用全攻略!

以下内容为转载,转载出处请注意文末。 几天前写了一遍关于云服务器文章,简单的说了下目前国内有些云GPU,大概做了一个价格比较,不少人想要尝试下,但是不知道如何操作。今天就来说说具体操...

滴滴云
22分钟前
11
0
面试官问:cookie是什么?session是什么?,以及它们之间的区别

JavaWeb这个时间有点久了,不能学了前面忘后面,是时候总结一下了 Cookie cookie由服务器生成,发送给浏览器,保存在浏览器上。 cookie生命周期 会话级别: 本次浏览器关闭 保存在 浏览器内存...

庭前云落
28分钟前
15
0
NIO

行者终成事
29分钟前
28
0
多线程 ReentrantLock 中 Lock,tryLock,lockInterruptibly

链接:https://www.zhihu.com/question/36771163/answer/68974735 ReentrantLock 锁有好几种,除了常用的lock ,tryLock ,其中有个lockInterruptibly 。 先把API粘贴上来 lockpublic voi...

moon888
34分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部