文档章节

Storm概念详解和工作原理,topology、spout、bolt的细节和API讲解之一

泡海椒
 泡海椒
发布于 2016/02/08 08:50
字数 1376
阅读 2170
收藏 1
点赞 2
评论 0

storm
datasource -->bolt
|               |
|               |
bolt-->     有向无环图bolt
storm与传统数据库区别
传统数据库先存后计算,而storm则是先算后存甚至不存
传统关系数据库很难部署实时计算,只能部署定时任务统计分析窗口数据
关系型数据库注重事务,并发控制,相对storm来说比较简陋
storm【速度】与hadoop【海量数据】,spark【内存计算框架】等流行的大数据方案
核心代码clojure实用程序python,使用java开发拓扑
wordcount逻辑
sentence spout -->split sentence bolt -->word count bolt -->report bolt

[root@localhost target]# cd ~/soft
[root@localhost soft]# ls
maven  Python-2.7.2  storm-0.9.1  storm-starter  zookeeper-3.3.6
[root@localhost soft]# cd zookeeper-3.3.6
[root@localhost zookeeper-3.3.6]# ls
bin          contrib     ivysettings.xml  NOTICE.txt  zookeeper-3.3.6.jar
build.xml    data        ivy.xml          README.txt  zookeeper-3.3.6.jar.asc
CHANGES.txt  dist-maven  lib              recipes     zookeeper-3.3.6.jar.md5
conf         docs        LICENSE.txt      src         zookeeper-3.3.6.jar.sha1
[root@localhost zookeeper-3.3.6]# cd bin
[root@localhost bin]# ls
README.txt    zkCli.cmd  zkEnv.cmd  zkServer.cmd  zookeeper.out
zkCleanup.sh  zkCli.sh   zkEnv.sh   zkServer.sh
[root@localhost bin]# ./zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[storm, zookeeper]
[zk: localhost:2181(CONNECTED) 1]
storm管理命令
storm rebalance增加节点之后/activate/deactivate/kill 拓扑名
topology运行流程
storm提交后,会把代码首先存放到nimbus节点的inbox目录下,之后会把当前storm
运行配置生成一个stormconf.ser文件放到nimbus节点的stormdist目录下,在此目录下还有序列化后的toplogy代码文件
2.在设定topology所关联的spout和bolts时,可以同时设置当前spout和bolt的
executor数目和task数目,默认情况下,一个topology的task总和是executor的总和一致的,之后,系统根据workerd的数
尽量平均这些task的执行,work在哪个supervisor节点运行是由本身决定的
3.任务分配好后,niimbus节点会将任务信息提交到zoopeeker集群,同时在zoopecker集群中会有workerbeats
节点,这里存储了所有worker进程的心跳信息
4supervisor节点会不断的轮训zookeeper集群,在zookeeper的assignment节点保存了所有toplogy的任务分配信息
代码存储目录之间的关联关系,supervisor通过轮训此节点的内容,来领取自己的任务,启动worker进程
5.一个topogy运行之后,就会不断的通过spout来发送spout流,通过bolts来不断处理接收的stream流,stream流式误解的
本地运行的提交方式
LocalCluster cluster=new LocalCluster();
cluster.submitTopology(TOPLOGY_NAME,conf,builder.createTopology())
Thread.sleep(2000)
cluster.shutdown();
分布式提交方式:
StormSubmitter。submitToplogy(TOPLOGY_NAME,conf,builder.createTopology())
topology的运行
需要注意的是,在storm代码编写完成之后,需要打成jar包放在nimbus中运行
打包的时候不要加依赖的包,否者会出现重复的配置文件,因为他运行之前会加载
本地的storm。yaml配置文件
storm jar StormTology.jar mainclass
storm守护进程
nimbus
toUI
DRPC
JAR storm jar topology_jar topology_class args
jar是用于提交集群拓扑他运行topology_class main方法,上传jar到nimbus,由nimbus发布到集群
一旦提交,storm会激活拓扑并开始处理topology_class main方法,main方法会调用stormsubmit.submittopology
方法, 并且提供一个唯一的拓扑名, 若这个名字存在那么失败,常见方法是用命令方法来指定拓扑名称
maven新建一个项目
mvn archetype:create/generate  -DgroupId=storm.test -DartifactId=teststorm -DpackageName=cn.dataguru.storm


pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>storm.test</groupId>
  <artifactId>teststorm</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>teststorm</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<scope>provided</scope>
</dependency>
  </dependencies>
 
  <repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>


<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass />
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
 
</project>


package cn.dataguru.storm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class LearningStormBolt extends BaseBasicBolt {
    private static final long serialVersionUID = 1L;

    public void execute(Tuple input, BasicOutputCollector collector) {
        // fetched the field "site" from input tuple.
        String test = input.getStringByField("site");
        // print the value of field "site" on console.
        System.out.println("Name of input site is : " + test);
    }

    public void declareOutputFields(OutputFieldsDeclarer delarer) {

    }
}

package cn.dataguru.storm;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class LearningStormSpout extends BaseRichSpout {
    private static final long serialVersionUID = 1L;
    private SpoutOutputCollector spoutOutputCollector;
    private static final Map<Integer, String> map = new HashMap<Integer, String>();
    static {
        map.put(0, "google");
        map.put(1, "facebook");
        map.put(2, "twitter");
        map.put(3, "youtube");
        map.put(4, "linkedin");
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector spoutOutputCollector) {
        // Open the spout

        this.spoutOutputCollector = spoutOutputCollector;
    }

    public void nextTuple() {
        // Storm cluster repeatedly calls this method to emit a continuous
        // stream of tuples.
        final Random rand = new Random();
        // generate the random number from 0 to 4.
        int randomNumber = rand.nextInt(5);
        spoutOutputCollector.emit(new Values(map.get(randomNumber)));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("site"));
    }
}

package cn.dataguru.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;

public class LearningStormTopology {
    public static void main(String[] args) throws AlreadyAliveException,
            InvalidTopologyException {
        // create an instance of TopologyBuilder class
        TopologyBuilder builder = new TopologyBuilder();
        // set the spout class
        builder.setSpout("LearningStormSpout", new LearningStormSpout(), 2);
        // set the bolt class
        builder.setBolt("LearningStormBolt", new LearningStormBolt(), 4)
                .shuffleGrouping("LearningStormSpout");
        Config conf = new Config();
        conf.setDebug(true);
        // create an instance of LocalCluster class for
        // executing topology in local mode.
        LocalCluster cluster = new LocalCluster();
        // LearningStormTopolgy is the name of submitted topology.
        cluster.submitTopology("LearningStormToplogy", conf,
                builder.createTopology());
        try {
            Thread.sleep(10000);
        } catch (Exception exception) {
            System.out.println("Thread interrupted exception : " + exception);
        }
        // kill the LearningStormTopology
        cluster.killTopology("LearningStormToplogy");
        // shutdown the storm test cluster
        cluster.shutdown();
    }
}

mvn install
[root@localhost teststorm]# cd target
[root@localhost target]# ls
archive-tmp       test-classes
classes           teststorm-0.0.1-SNAPSHOT.jar
maven-archiver    teststorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar
maven-status      teststorm-1.0-SNAPSHOT.jar
surefire-reports  teststorm-1.0-SNAPSHOT-jar-with-dependencies.jar

[root@localhost teststorm]# mvn compile exec:java -Dexec:java -Dexec.classpathScope=compile -Dexec.mainClass=cn.dataguru.storm.LearningStormTopology
                                                                                                             cn.dataguru.storm.LearningStormTopology

[root@localhost teststorm]# storm jar teststorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar cn.dataguru.storm.LearningStormTopology

集群方式
conf.setNumWorkers(3);
StormSubmitter.submitTopology("name", conf, builder.createTopology());
以下的注释掉
        cluster.submitTopology("LearningStormToplogy", conf,
                builder.createTopology());
        try {
            Thread.sleep(10000);
            StormSubmitter.submitTopology("name", conf, builder.createTopology());
        } catch (Exception exception) {
            System.out.println("Thread interrupted exception : " + exception);
        }
        // kill the LearningStormTopology
        cluster.killTopology("LearningStormToplogy");
        // shutdown the storm test cluster
        cluster.shutdown();







© 著作权归作者所有

共有 人打赏支持
泡海椒
粉丝 10
博文 256
码字总数 291526
作品 0
成都
程序员
大数据学习之(Storm)-原理详解!

角色 Client client的主要作用是提交topology到集群 Worker Worker是运行在Supervisor节点上的一个独立的JVM进程,主要作用是运行topology,一个topology可以包含多个worker,但一个worker只...

qq5af153121eb2c ⋅ 05/08 ⋅ 0

Storm笔记整理(四):Storm核心概念与验证——并行度与流式分组

[TOC] Storm核心概念之并行度 Work 1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的(...

xpleaf ⋅ 04/13 ⋅ 0

Storm笔记整理(五):可靠性分析、定时任务与Storm UI参数详解

[TOC] 特别说明:前面的四篇Storm笔记中,关于计算总和的例子中的spout,使用了死循环的逻辑,实际上这样做是不正确的,原因很简单,Storm提供给我们的API中,nextTuple方法就是循环执行了,...

xpleaf ⋅ 04/14 ⋅ 0

Kafka实战-Storm Cluster

1.概述   在《Kafka实战-实时日志统计流程》一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Sto...

smartloli ⋅ 2015/06/18 ⋅ 0

Storm笔记整理(一):简介与设计思想

[TOC] 实时计算概述 有别于传统的离线批处理操作(对很多数据的集合进行的操作),实时处理,说白就是针对一条一条的数据/记录进行操作,所有的这些操作进行一个汇总(截止到目前为止的所有的统...

xpleaf ⋅ 04/12 ⋅ 0

大数据Storm相比于Spark、Hadoop有哪些优势(摘录)

一、可能很多初学大数据的伙伴不知道strom是什么,先给大家介绍一下strom: 分布式实时计算系统,storm对于实时计算的意义类似于hadoop对于批处理的意义。 storm的适用场景。 流数据处理。S...

风火数据 ⋅ 06/01 ⋅ 0

Storm中的Stream grouping有哪几种方式?

在拓扑图中,每个bolt接受一个spout或者bolt的数据,但是每一个spout或者bolt有多个实例,因此哪一个接受哪一个bolt或者spout实例的数据需要stream grouping。 Storm定义了七种内置数据流分组...

无精疯 ⋅ 04/19 ⋅ 0

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中《大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是...

xpleaf ⋅ 04/16 ⋅ 0

Storm笔记整理(三):Storm集群安装部署与Topology作业提交

[TOC] Storm分布式集群安装部署 概述 Storm集群表面类似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不同的,...

xpleaf ⋅ 04/13 ⋅ 0

Storm笔记整理(二):Storm本地开发案例—总和计算与单词统计

[TOC] 概述 在Strom的API中提供了对象,这样在不用搭建Storm环境或者Storm集群的情况下也能够开发Storm的程序,非常方便。 基于Maven构建工程项目,其所需要的依赖如下: Storm本地开发案例1...

xpleaf ⋅ 04/12 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Sqoop

1.Sqoop: 《=》 SQL to Hadoop 背景 1)场景:数据在RDBMS中,我们如何使用Hive或者Hadoop来进行数据分析呢? 1) RDBMS ==> Hadoop(广义) 2) Hadoop ==> RDBMS 2)原来可以通过MapReduce I...

GordonNemo ⋅ 36分钟前 ⋅ 0

全量构建和增量构建的区别

1.全量构建每次更新时都需要更新整个数据集,增量构建只对需要更新的时间范围进行更新,所以计算量会较小。 2.全量构建查询时不需要合并不同Segment,增量构建查询时需要合并不同Segment的结...

无精疯 ⋅ 46分钟前 ⋅ 0

如何将S/4HANA系统存储的图片文件用Java程序保存到本地

我在S/4HANA的事务码MM02里为Material维护图片文件作为附件: 通过如下简单的ABAP代码即可将图片文件的二进制内容读取出来: REPORT zgos_api.DATA ls_appl_object TYPE gos_s_obj.DA...

JerryWang_SAP ⋅ 今天 ⋅ 0

云计算的选择悖论如何对待?

导读 人们都希望在工作和生活中有所选择。但心理学家的调查研究表明,在多种选项中进行选择并不一定会使人们更快乐,甚至不会产生更好的决策。心理学家Barry Schwartz称之为“选择悖论”。云...

问题终结者 ⋅ 今天 ⋅ 0

637. Average of Levels in Binary Tree - LeetCode

Question 637. Average of Levels in Binary Tree Solution 思路:定义一个map,层数作为key,value保存每层的元素个数和所有元素的和,遍历这个树,把map里面填值,遍历结束后,再遍历这个map,把每...

yysue ⋅ 今天 ⋅ 0

IDEA配置和使用

版本控制 svn IDEA版本控制工具不能使用 VCS-->Enable Version Control Integration File-->Settings-->Plugins 搜索Subversion,勾选SVN和Git插件 删除.idea文件夹重新生成项目 安装SVN客户......

bithup ⋅ 今天 ⋅ 0

PE格式第三讲扩展,VA,RVA,FA的概念

作者:IBinary 出处:http://www.cnblogs.com/iBinary/ 版权所有,欢迎保留原文链接进行转载:) 一丶VA概念 VA (virtual Address) 虚拟地址的意思 ,比如随便打开一个PE,找下它的虚拟地址 这边...

simpower ⋅ 今天 ⋅ 0

180623-SpringBoot之logback配置文件

SpringBoot配置logback 项目的日志配置属于比较常见的case了,之前接触和使用的都是Spring结合xml的方式,引入几个依赖,然后写个 logback.xml 配置文件即可,那么在SpringBoot中可以怎么做?...

小灰灰Blog ⋅ 今天 ⋅ 0

冒泡排序

原理:比较两个相邻的元素,将值大的元素交换至右端。 思路:依次比较相邻的两个数,将小数放在前面,大数放在后面。即在第一趟:首先比较第1个和第2个数,将小数放前,大数放后。然后比较第...

人觉非常君 ⋅ 今天 ⋅ 0

Vagrant setup

安装软件 brew cask install virtualboxbrew cask install vagrant 创建project mkdir -p mst/vmcd mst/vmvagrant init hashicorp/precise64vagrant up hashicorp/precise64是一个box......

遥借东风 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部