文档章节

storm 入门教程+简单实例

西二旗之猫
 西二旗之猫
发布于 2017/03/15 17:38
字数 2034
阅读 562
收藏 0

     写在前面:

       本篇博客主要介绍 storm 基本概念和一个简单实例,storm版本1.0.2

     storm基本概念

      storm的集群架构

           storm 是一个主从架构,一个主节点,n个从节点,主节点和从节点之间通过zk集群来进行交互,

 

(这张图是盗的......)

     

       简述一下集群搭建:

        1,解压并且修改 storm.yaml,并且scp到其他节点机器上

        2,在主节点启动 nimbus服务,ui界面服务和logviewer日志服务

        3, 在从节点启动 supervisor服务和logviewer日志服务

      主要进程介绍

         nimbus : nimbus 是storm集群的主控制节点

         zookeeper :集群协调中心

         supervisor: 负责接收任务 管理 worker的进程,work进程处理具体逻辑

         logviewer:日志服务进程

         ui: web ui 管理界面进程

 

         编程模型:DAG有向无环图

            正如 Hadoop的编程模型是  map-reduce,storm也有自己的编程模型,这种模型直白的表述就是有向无环图,这里面引用官网的图例

 

         

 

 

          storm 是做流数据处理,所有选择了一种像"水流"一样的编程模型,更准确的说是一种"水槽",数据就像水流一样,在这条"水槽"中"流淌"(被处理)

         storm 编程有如下几个基本概念

         spout:  相当于"水流"(数据流)的源头

         bolt: 相当于"水槽的处理站",对流经的数据进行处理

         Topology: 整个这个 有向无环图 , 这个 topology  将作为一个任务 提交给 storm集群处理

         tuple: 元组 相当于 每一个水滴 每一条数据

    编写一个storm程序就是编写若干个spout接入数据发射(emit)数据给bolt,编写若干个bolt处理数据并且e发射(emit)数据给其他的bolt,在这个过程中每一个bolt都有能力把数据持久化,如果没有任何一个bolt持久化数据,数据将被丢弃,最后由一个 topology将这些节点连接起来,提交给集群执行任务或者在本地模拟集群进行任务执行.

     一个简单的实例  wordcount

        实例流程: 随机读取文本行, 统计整个文本的每个单词的个数

 SpoutWordline类:产生数据的源头

package leap.storm.simple;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

/**
 * Created by FromX on 2017/3/15.
 */
public class SpoutWordline extends BaseRichSpout {

    SpoutOutputCollector _collector;
    Random _rand;


    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        _rand = new Random();
    }

    @Override
    public void nextTuple() {
        //每隔 100 mill 发射一个随机文本行
        Utils.sleep(100);
        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        String sentence = sentences[_rand.nextInt(sentences.length)];
        _collector.emit(new Values(sentence));
    }

    @Override
    public void ack(Object id) {
    }

    @Override
    public void fail(Object id) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

解释

     spout类负责数据接入,定义spout类可以通过继承父类或实现接口-->extend BaseRichSpout或者implements IRichSpout,我们选用的是继承的方式.

      open方法 是一个初始化方法,在task任务被调用的是会执行这个初始化方法,这个方法会初始化一些全局的变量比如 SpoutOutputCollector

      nextTuple方法 是一个 被无限循环执行的方法 在任务提交后,这个方法会无限的循环,用来不断的处理被接入的数据,这里我们用这个方法来不断的产生随机的文本行,该方法中的 _collector.emit(new Values("文本行"))会将数据"发射"emit到下一个数据处理节点,也可以叫做"下游".

    declareOutputFields方法 是用来声明发射到下游去的字段名称的方法,它的作用是为发射emit出去的字段声明一个"key",new fields("key1","key2") ,是一个不定长数组,如果发射了多个值,则相应的设置多个"key"

         

BoltWordSpilt类:切割本行为单词 并发射到下一个bolt

package leap.storm.simple;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * Created by FromX on 2017/3/15.
 */
public class BoltWordSpilt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for (String word : words) {
            word = word.trim();
            if (!word.isEmpty()) {
                // Emit the word
                collector.emit(new Values(word));
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

解释:

     prepare()方法 初始化方法初始化一些配置和OutputCollector等

     execute()方法 处理核心逻辑 ,负责处理每一个"上游"传过来的 tuple数据,做具体的逻辑处理,将字符串文本切割成单词传递给下游

     declareOutputFields方法  同上

 

BoltWordCount类  负责统计单词个数

package leap.storm.simple;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by FromX on 2017/3/15.
 */
public class BoltWordCount extends BaseRichBolt {


    Map<String, Integer> counts = new HashMap<String, Integer>();
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;

    }

    @Override
    public void execute(Tuple tuple) {

        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
            count++;
        counts.put(word, count);
//        此处可以持久化
//        try {
//            HbaseHandler.putRow("stormkafka_1","row_0","wordcount",word,count.toString());
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        collector.emit(new Values(word, count));

    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

 

解释

 execute()方法 可以进行讲数据持久化,比如存入hbase,因为这是一个简单的例子,具体过程不在演示,在下一篇 flume+kafka+storm+hbase的例子中讲有全部代码

 

TopologyWordCount类 负责构建 整个流程并且提交任务

package leap.storm.simple;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * Created by FromX on 2017/3/15.
 */
public class TopologyWordCount {


    public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {

        //定义一个TopologyBuilder
        TopologyBuilder builder = new TopologyBuilder();
        // 设置  spout  用来接入数据 参数分别是  id  实现类 和进程数2
        builder.setSpout("word-reader", new SpoutWordline(), 2);
        // 设置第一个 bolt  切割行 为逐个单词 shuffeGrouping   是随机发送给下游线程 线程数3
        builder.setBolt("word-spilt", new BoltWordSpilt(), 3)
                .shuffleGrouping("word-reader");
        //设置第二个 bolt  统计单词  fieldsGrouping 是根据  word 分组 分发 线程 2
        builder.setBolt("word-count", new BoltWordCount(), 2)
                .fieldsGrouping("word-spilt", new Fields("word"));

        /**
         * 提交任务配置
         */
        Config conf = new Config();
        conf.setDebug(true);
        //如果有任务id参数 则进行集群提交
        if (args != null && args.length > 0) {
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        }else{
            //设置任务最大进程数
            conf.setMaxTaskParallelism(10);
            //创建本地集群
            LocalCluster cluster = new LocalCluster();
            //提交任务 任务id word-count
            cluster.submitTopology("word-count", conf, builder.createTopology());
            // 睡眠 10000*6毫秒后手动结停止本地集权
            Thread.sleep(10000 * 6);
            cluster.shutdown();
        }
    }
}

解释:

    TopologyBuilder 负责创建/构建流程, 它通过设置spout 和bolt的分发关系来 构建整个流程,其中setSpout("id","实现类"),两个参数分别是id和实现类,id自己定义不重复即可,这个id将作为 xxGrouping()方法的第一个参数,来表示某个节点(bolt)的数据来自于那个 spout或者bolt ,其中 shuffleGrouping代表随机分发进行处理,fieldsGrouping表示 根据 "word"分组分发给进程处理,其中 "word" 来自于declareOutputFields方法的声明的字段名

打包/运行

      本地模式: 本地模式直接运行main方法即可,但是可以会报一个  没有C盘读写权限的错误,我也没有解决,错误产生的原因是,程序运行的临时数据会存到 c:/user/appdata/temp/下面 可能因为没有c盘执行权限而报错    

     远程集群模式: 直接打包 TopologyWordCount 类 为 jar 即可,注意 无需在打包的时候添加依赖的jar ,因为storm集群的 lib 下面已经存在的所需的依赖,我使用idea打包的,附上截图,如果执行中报错 classnotfound,直接把缺少的依赖包上传到lib下面即可.

远程集群模式提交任务:

在 storm/bin 下执行

./storm jar poc.jar leap.storm.simple.TopologyWordCount mytask

解释:

         ./storm jar 提交任务

         leap.storm.simple.TopologyWordCount  包路径.类名

         mytask   非重复的自定义任务名

提交之后可以在ui界面上看到任务和spout/bolt具体发射数据的情况

 

注意 : 在查看完结果之后 务必点击 "kill" 结束任务 否则 任务将一直执行,持续产生数据

源码下载:https://git.oschina.net/z2q_m/poc

 

© 著作权归作者所有

西二旗之猫
粉丝 7
博文 51
码字总数 37328
作品 0
私信 提问
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
321
1
使用 Twitter Storm 处理实时的大数据

使用 Twitter Storm 处理实时的大数据 流式处理大数据简介 IBM DW/M. Tim Jones, 独立作家, 顾问 简介: Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与...

IBMdW
2012/12/06
6.4K
3
Twitter Storm入门

.通过学习tutorial了解storm的整体架构(https://github.com/nathanmarz/storm/wiki/Tutorial) 通过学习Concepts了解storm的关键概念(https://github.com/nathanmarz/storm/wiki/Concepts......

加油_张
2013/09/14
353
0
大数据学习之(Storm)-原理详解!

角色 Client client的主要作用是提交topology到集群 Worker Worker是运行在Supervisor节点上的一个独立的JVM进程,主要作用是运行topology,一个topology可以包含多个worker,但一个worker只...

qq5af153121eb2c
2018/05/08
0
0
探秘Hadoop生态13:初探Storm和入门实例

这位大侠,这是我的公众号:程序员江湖。 分享程序员面试与技术的那些事。 干货满满,关注就送。 Storm:最火的流式处理框架 伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获...

你的猫大哥
2017/03/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

java数据类型

基本类型: 整型:Byte,short,int,long 浮点型:float,double 字符型:char 布尔型:boolean 引用类型: 类类型: 接口类型: 数组类型: Byte 1字节 八位 -128 -------- 127 short 2字节...

audience_1
42分钟前
6
0
太全了|万字详解Docker架构原理、功能及使用

一、简介 1、了解Docker的前生LXC LXC为Linux Container的简写。可以提供轻量级的虚拟化,以便隔离进程和资源,而且不需要提供指令解释机制以及全虚拟化的其他复杂性。相当于C++中的NameSpa...

Java技术剑
43分钟前
9
0
Wifiphisher —— 非常非常非常流氓的 WIFI 网络钓鱼框架

编者注:这是一个非常流氓的 WIFI 网络钓鱼工具,甚至可能是非法的工具(取决于你的使用场景)。在没有事先获得许可的情况下使用 Wifiphisher 攻击基础网络设施将被视为非法活动。使用时请遵...

红薯
今天
49
1
MongoDB 4 on CentOS 7安装指南

本教程为CentOS x86_64 7.x操作系统下,MongoDB Community x86_64 4.2(GA)安装指南。 安装方式一:yum repo在线安装 [此方式较为简单,官方推荐] Step1:新建MongDB社区版Yum镜像源。 # vim ...

王焱君
今天
7
0
go-micro 入门教程1.搭建 go-micro环境

微服务的本质是让专业的人做专业的事情,做出更好的东西。 golang具备高并发,静态编译等特性,在性能、安全等方面具备非常大的优势。go-micro是基于golang的微服务编程框架,go-micro操作简单...

非正式解决方案
今天
12
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部