文档章节

Storm 编程之wordcount

键盘上跳舞
 键盘上跳舞
发布于 2017/03/25 22:54
字数 664
阅读 61
收藏 0

1.storm编程实现wordcount的详细流程分析

            

2.代码实现

    本例以实现storm版的wordcount

    WordCountTopologMain.java

    

public class WordCountTopologMain
{
    public static void main(String[] args)
    {
        // 1.创建topology
        TopologyBuilder topology = new TopologyBuilder();
        // 设置spout和bolt以及并发度,myspout只是别名,spout分发数据到bolt
        topology.setSpout("mySpout", new MySpout(), 2);
        // 此时自设了两个Bolt,
        /**
         * mybolt1数据分组策略是随机分发数据到介质(下游bolt或者redis等)
         * 
         * mybolt2数据分组策略是分组合并数据,
         */
        topology.setBolt("mybolt1", new MySplitBolt(), 2).shuffleGrouping("mySpout");
        
        topology.setBolt("mybolt2", new MyCountBolt(), 4).fieldsGrouping("mybolt1", new Fields("word"));
        
        // 2.创建configuration,指定当前topology 需要的woker数量
        Config config = new Config();
        config.setNumWorkers(2);
        
        // 3.选择本地模式还是集群模式
        try
        {
            StormSubmitter.submitTopology("storm_wordcount", config, topology.createTopology());
        }
        catch (AlreadyAliveException | InvalidTopologyException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("storm_wordcount", config, topology.createTopology());
    }
}

    MySpout.java

    

/**
 * @ClassName MySpout
 * @author liukang
 * @Description TODO spout是获取外部数据
 
 * @version 1.0.0
 */
public class MySpout extends BaseRichSpout
{
    SpoutOutputCollector outputCollector;
    
    /**
     * 初始化方法
     */
    @Override
    public void open(Map conf, TopologyContext arg1, SpoutOutputCollector outputCollector)
    {
        this.outputCollector = outputCollector;
    }
    
    /**
     * storm框架内while(true)调用tuple方法,传送数据
     */
    @Override
    public void nextTuple()
    {
        /**
         * public Values(Object... vals) 
         * {
         *      super(vals.length);
         *      for(Object o: vals) 
         *      {
         *           add(o);
         *      }
         *   }
         */
        outputCollector.emit(new Values("you are my apple in my eye"));
    }
    /**
     * 声明tuple单元数据的别名,可以是多个,取决于nexttuple方法设的输出结果
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        declarer.declare(new  Fields("word"));
        
    }
    
}

    MySplitBolt.java

    

/**
 * @ClassName MySplitBolt
 * @author liukang
 * @Description TODO Bolt是逻辑处理节点,相当于Mapreduce中的map阶段功能相同,单词切割
 *              接受Spout发送的数据,或上游的bolt的发送的数据。根据业务逻辑进行处理。发送给下一个Bolt或者是存储到某种介质上

 * @version 1.0.0
 */
public class MySplitBolt extends BaseRichBolt
{
    OutputCollector collector;
    
    /**
     * 初始化
     */
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector)
    {
        this.collector = collector;
    }
    
    /*
     * 执行线程,执行bolt任务,tuple是最小的数据单元
     */
    @Override
    public void execute(Tuple tuple)
    {
        // 在本例中,spout只发送了一组数据过来
        /**
         * public String getString(int i) { return (String) values.get(i); }
         */
        String line = tuple.getString(0);
        // 切割
        String[] words = line.split(" ");
        for (String word : words)
        {
            // 将数据片tuple输出到下游Bolt继续处理
            collector.emit(new Values(word, 1));
        }
    }
    
    /*
     * 告诉下游的bolt此次bolt输出的结果别名
     * 
     * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        /*
         * 可变数组
         * public Fields(String... fields) {
         *   this(Arrays.asList(fields));
         *   }
         */
        declarer.declare(new  Fields("wordinfo","number"));
    }
    
}

    MyCountBolt.java

        

public class MyCountBolt extends BaseRichBolt
{
    OutputCollector collector;
    
    Map<String, Integer> map = new HashMap<String, Integer>();
    
    /*
     * 初始化 :
     * 
     * @see backtype.storm.task.IBolt#prepare(java.util.Map, backtype.storm.task.TopologyContext,
     * backtype.storm.task.OutputCollector)
     */
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector)
    {
        this.collector = collector;
    }
    
    /*
     * 接受上游的bolt数据,进行逻辑处理 :
     * 
     * @see backtype.storm.task.IBolt#execute(backtype.storm.tuple.Tuple)
     */
    @Override
    public void execute(Tuple tuple)
    {
        // 本例中上游mybolt1输出两个结果,别名分别是wordinfo 和number
        String wordinfo = tuple.getString(0);
        Integer number = tuple.getInteger(1);
        // 判断单词
        if (map.containsKey(wordinfo))
        {
            Integer count = map.get(wordinfo);
            map.put(wordinfo, number + count);
        }
        else
        {
            map.put(wordinfo, number);
        }
        
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0)
    {
        // Bolt最终,不输出结果
    }
    
}

 

© 著作权归作者所有

键盘上跳舞
粉丝 19
博文 68
码字总数 66382
作品 0
海淀
程序员
私信 提问
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
702
1
如何在eclipse调试storm程序

一、介绍 storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。 Storm has two modes of operation: local mode and distributed mode. In loca...

cloud-coder
2014/02/16
10.1K
1
Storm On Yarn 安装部署

安装 JDK7 和 Maven 部署Hadoop2集群,并启动yarn http://my.oschina.net/zc741520/blog/362824 下载 Storm on Yarn [grid@hadoop4 ~]$ wget https://github.com/yahoo/storm-yarn/archive/m......

张超
2015/05/25
834
0
Storm入门 第二章 构建Topology

2.1 Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies Streams Spouts Bolts Stream groupings Reliability Tasks Workers Configuration Storm集群和Hadoop集群表面上...

坏坏一笑
2014/12/03
84
0
年薪40万的大数据工程师是如何安装Strom

Strom集群的安装配置 主机规划 一、准备服务器 l 关闭防火墙 chkconfig iptables off && setenforce 0 l 创建用户 groupadd hadoop && useradd hadoop  && usermod -a -G hadoop hadoop l ......

爱尚实训
2018/04/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Mybatis 源码(二)Mybatis 初始化

Mybatis 初始化是由SqlSessionFactoryBuilder来完成的,主要的工作解析XML文件,并将解析的类容封装到Configuration类中,最后将Configuration类封装到SqlSessionFactory中并返回,自此初始化...

xiaolyuh
13分钟前
3
0
约瑟夫环问题

约瑟夫环问题的原来描述为,设有编号为1,2,……,n的n(n>0)个人围成一个圈,从第1个人开始报数,报到m时停止报数,报m的人出圈,再从他的下一个人起重新报数,报到m时停止报数,报m的出圈,...

mskk
22分钟前
3
0
JEP解读与尝鲜系列1 - Java Valhalla与Java Inline class

涉及到的JEP: Project Valhalla JEP 169: Value Objects JEP 218: Generics over Primitive Types 这些特性将在JDK14实现 Valhalla项目背景 最主要的一点就是,让Java适应现代硬件:在Java语...

zhxhash
24分钟前
8
0
总结:Redis集群

一、redis集群方案 Master-slave方式,Master和Slave的数据一致,Slave从Master同步数据,然后通过Sentinal(哨兵)监控Master和Slave的健康状态,当异常的时候迅速切换,如Master宕机的时候...

浮躁的码农
28分钟前
5
0
三个盘子的汉诺塔

package base;/** * 汉诺塔 */public class TowerApp { static int nDisks = 3; public static void main(String[] args) { doTowers(nDisks, 'A','B',......

clean123
29分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部