文档章节

storm 1.0以后的新版本 滑动窗口的实现及原理

世吉
 世吉
发布于 2017/09/02 16:01
字数 1067
阅读 30
收藏 0
点赞 0
评论 0

简单的演示如何使用storm1.0实现滑动窗口的功能,先编写spout类,RandomSentenceSpout负责发送一个整形数值,数值每次发送都会自动加一,且RandomSentenceSpout固定每隔两秒向bolt发送一次数据。RandomSentenceSpout和前面关于spout的讲解一样。

复制代码

1.public class RandomSentenceSpout extends BaseRichSpout {
2.
3.    private static final long serialVersionUID = 5028304756439810609L;  
4.
5.    private SpoutOutputCollector collector;  
6.
7.    int intsmaze=0;
8.
9.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
10.        declarer.declare(new Fields("intsmaze"));
11.    }
12.
13.    public void open(Map conf, TopologyContext context, 
14.                          SpoutOutputCollector collector) {
15.        this.collector = collector;
16.    }
17.
18.    public void nextTuple() {
19.        System.out.println("发送数据:"+intsmaze);
20.        collector.emit(new Values(intsmaze++));
21.        try {
22.            Thread.sleep(2000);
23.//         Thread.sleep(1000);
24.        } catch (InterruptedException e) {
25.            e.printStackTrace();
26.        }
27.    }
}

复制代码

 

 

滑动窗口的逻辑实现的重点是bolt类,这里我们编写SlidingWindowBolt类让它继承一个新的类名为BaseWindowedBolt来获得窗口计数的功能。BaseWindowedBolt和前面的BaseBaseBolt和BaseWindowedBolt提供的方法名都一样,只是execute方法的参数类型为TupleWindow,TupleWindow参数里面装载了一个窗口长度类的tuple数据。通过对TupleWindow遍历,我们可以计算这一个窗口内tuple数的平均值或总和等指标。具体见代码12-16行,统计了一个窗口内的数值型数据的总和。

复制代码

1.public class SlidingWindowBolt extends BaseWindowedBolt {
2.
3.    private OutputCollector collector;
4.
5.    @Override
6.    public void prepare(Map stormConf, TopologyContext context, 
7.            OutputCollector collector) {
8.        this.collector = collector;
9.    }
10.
11.    public void execute(TupleWindow inputWindow) {        
12.        int sum=0;
13.        System.out.print("一个窗口内的数据");
14.        for(Tuple tuple: inputWindow.get()) {
15.            int str=(Integer) tuple.getValueByField("intsmaze");
16.            System.out.print(" "+str);
17.            sum+=str;
18.        }
19.        System.out.println("======="+sum);
20. //        collector.emit(new Values(sum));
21.    }
22.
23.    @Override
24.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
25.//       declarer.declare(new Fields("count"));
26.    }
}

复制代码

 

 

我们已经实现了窗口计数的逻辑代码,现在我们需要提供topology来指明各个组件的关系,以及指定SlidingWindowBolt的窗口的组合,这里我们演示了如何每两秒统计最近6秒的数值总和,如果注释掉10-13行代码,去掉5-8行的注释,这个topology就是告诉SlidingWindowBolt每接收到两条tuple就统计最近接收到的6条tuple的数值的总和。

复制代码

1.public class WindowsTopology {
2.
3.    public static void main(String[] args) throws Exception {
4.       TopologyBuilder builder = new TopologyBuilder();
5.       builder.setSpout("spout1", new RandomSentenceSpout(), 1);
6.//       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
7.//       .withWindow(new Count(6), new Count(2)),1)
8.//       .shuffleGrouping("spout");
9.//滑窗 窗口长度:tuple数, 滑动间隔: tuple数 每收到2条数据统计当前6条数据的总和。  
10.     
11.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
12.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
13.               new Duration(2, TimeUnit.SECONDS)),1)
14.       .shuffleGrouping("spout");//每两秒统计最近6秒的数据       
15.
16.       Config conf = new Config();
17.       conf.setNumWorkers(1);
18.       LocalCluster cluster = new LocalCluster();
19.       cluster.submitTopology("word-count", conf, builder.createTopology());
20.   }
}

复制代码

 

 

这里演示的是bolt节点并发度为1的窗口功能,实际生产中,因为数据量很大,往往将bolt节点的并发度设置为多个,这个时候我们的SlidingWindowBolt就无法统计出一个窗口的数值总和了。因为每一个bolt的并行节点只能统计自己一个窗口接收到数据的总和,无法统计出一个窗口内全局数据的总和,借助redis来实现是可以的,但是必须引入redis的事务机制或者借助分布式锁,否则会出现脏数据的情况。在这里我们介绍另一种实现方式就是灵活的使用storm提供的窗口功能,只是窗口的tuple数。

仍然是使用上面提供的类,只是我们增加一个bolt类,来统计每个SlidingWindowBolt节点发送给它的数值。

按 Ctrl+C 复制代码

 

按 Ctrl+C 复制代码

 

 

然后我们注释RandomSentenceSpout第22行代码,取消对23行代码的注释,方便观察结果。去掉SlidingWindowBolt类20和25行代码。

topology启动类如下:

复制代码

1.public class WindowsTopology {
2.
3.    public static void main(String[] args) throws Exception {
4.       TopologyBuilder builder = new TopologyBuilder();
5.       builder.setSpout("spout", new RandomSentenceSpout(), 1);
6.       
7.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
8.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
9.               new Duration(2, TimeUnit.SECONDS)),2)
10.       .shuffleGrouping("spout");//每两秒统计最近6秒的数据
11.       
12.       builder.setBolt("countwordbolt", new CountWord()
13.       .withWindow(new Count(2), new Count(2)),1)
14.       .shuffleGrouping("slidingwindowbolt");
15.       //每收到2条tuple就统计最近两条统的数据
16.       Config conf = new Config();
17.       conf.setNumWorkers(1);
18.       LocalCluster cluster = new LocalCluster();
19.       cluster.submitTopology("word-count", conf, builder.createTopology());
20.   }
}

© 著作权归作者所有

共有 人打赏支持
世吉
粉丝 1
博文 4
码字总数 4547
作品 0
东城
程序员
大数据经典学习路线(及供参考)之 三

3.Storm实时计算部分阶段 实时课程分为两个部分:流式计算核心技术和流式计算计算案例实战。 1.流式计算核心技术 流式计算核心技术主要分为两个核心技术点:Storm和Kafka,学完此阶段能够掌握...

柯西带你学编程 ⋅ 05/22 ⋅ 0

Kafka实战-Storm Cluster

1.概述   在《Kafka实战-实时日志统计流程》一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Sto...

smartloli ⋅ 2015/06/18 ⋅ 0

Apache Storm 1.1.3 和 1.2.2 发布,分布式实时计算

Apache Storm 1.1.3 和 1.2.2 已发布,这是一个常规维护版本,其中包含许多重要的错误修复,可以提高 Storm 的性能,稳定性和容错能力。建议以前版本的用户升级到最新版本。 更新内容较多,详...

局长 ⋅ 06/06 ⋅ 0

年薪40万的大数据工程师是如何安装Strom

Strom集群的安装配置 主机规划 一、准备服务器 l 关闭防火墙 chkconfig iptables off && setenforce 0 l 创建用户 groupadd hadoop && useradd hadoop  && usermod -a -G hadoop hadoop l ......

爱尚实训 ⋅ 04/23 ⋅ 0

Storm笔记整理(一):简介与设计思想

[TOC] 实时计算概述 有别于传统的离线批处理操作(对很多数据的集合进行的操作),实时处理,说白就是针对一条一条的数据/记录进行操作,所有的这些操作进行一个汇总(截止到目前为止的所有的统...

xpleaf ⋅ 04/12 ⋅ 0

大数据Storm相比于Spark、Hadoop有哪些优势(摘录)

一、可能很多初学大数据的伙伴不知道strom是什么,先给大家介绍一下strom: 分布式实时计算系统,storm对于实时计算的意义类似于hadoop对于批处理的意义。 storm的适用场景。 流数据处理。S...

风火数据 ⋅ 06/01 ⋅ 0

大数据学习之(Storm)-原理详解!

角色 Client client的主要作用是提交topology到集群 Worker Worker是运行在Supervisor节点上的一个独立的JVM进程,主要作用是运行topology,一个topology可以包含多个worker,但一个worker只...

qq5af153121eb2c ⋅ 05/08 ⋅ 0

Storm笔记整理(五):可靠性分析、定时任务与Storm UI参数详解

[TOC] 特别说明:前面的四篇Storm笔记中,关于计算总和的例子中的spout,使用了死循环的逻辑,实际上这样做是不正确的,原因很简单,Storm提供给我们的API中,nextTuple方法就是循环执行了,...

xpleaf ⋅ 04/14 ⋅ 0

Storm笔记整理(二):Storm本地开发案例—总和计算与单词统计

[TOC] 概述 在Strom的API中提供了对象,这样在不用搭建Storm环境或者Storm集群的情况下也能够开发Storm的程序,非常方便。 基于Maven构建工程项目,其所需要的依赖如下: Storm本地开发案例1...

xpleaf ⋅ 04/12 ⋅ 0

大数据经典学习路线(及供参考)

转:https://blog.csdn.net/yuexianchang/article/details/52468291 目录(?)[+]

junzixing1985 ⋅ 04/15 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Java NIO之字符集

1 字符集和编解码的概念 首先,解释一下什么是字符集。顾名思义,就是字符的集合。它的初衷是把现实世界的符号映射为计算机可以理解的字节。比如我创造一个字符集,叫做sex字符集,就包含两个...

士别三日 ⋅ 36分钟前 ⋅ 0

Spring Bean基础

1、Bean之间引用 <!--如果Bean配置在同一个XML文件中,使用local引用--><ref bean="someBean"/><!--如果Bean配置在不同的XML文件中,使用ref引用--><ref local="someBean"/> 其实两种......

霍淇滨 ⋅ 41分钟前 ⋅ 0

05、基于Consul+Upsync+Nginx实现动态负载均衡

1、Consul环境搭建 下载consul_0.7.5_linux_amd64.zip到/usr/local/src目录 cd /usr/local/srcwget https://releases.hashicorp.com/consul/0.7.5/consul_0.7.5_linux_amd64.zip 解压consu......

北岩 ⋅ 44分钟前 ⋅ 0

Webpack 4 api 了解与使用

webpack 最近升级到了 v4.5+版 01 官方不再支持 node4 以下版本 官方不再支持 node4 以下版本官方不再支持 node4 以下的版本,所以如果你的node版本太低,先开始升级node吧!话说node10 ...

NDweb ⋅ 54分钟前 ⋅ 0

使用nodeJs安装Vue-cli

Vue脚手架就是一个Vue框架开发环境 脚手架的意思是帮你快速开始一个vue的项目,也就是给你一套vue的结构,包含基础的依赖库,只需要 npm install就可以安装,让我们不需要为了编辑或者一些其...

木筏笔歆 ⋅ 今天 ⋅ 0

【微信小程序开发实战】0x00.开发前准备工作

写在开始 本人资深后端码农一枚,近期项目需求,接触到了微信小程序,将学习过程整理成文分享给小伙伴们,由于是边学边整理难免有表述不对的地方,望大家及时指正,感谢。 本人微信号: dream...

dreamans ⋅ 今天 ⋅ 0

linux redis的安装和php7下安装redis扩展

安装redis服务器 (1)下载安装包: $ wget http://download.redis.io/releases/redis-2.8.17.tar.gz (2)编译程序: $ tar xzf redis-2.8.17.tar.gz $ cd redis-2.8.17 $ make $ cd src &&......

concat ⋅ 今天 ⋅ 0

Guava EventBus源码解析

一、EventBus使用场景示例 Guava EventBus是事件发布/订阅框架,采用观察者模式,通过解耦发布者和订阅者简化事件(消息)的传递。这有点像简化版的MQ,除去了Broker,由EventBus托管了订阅&...

SaintTinyBoy ⋅ 今天 ⋅ 0

http怎么做自动跳转https

Apache 版本 如果需要整站跳转,则在网站的配置文件的<Directory>标签内,键入以下内容: RewriteEngine on RewriteCond %{SERVER_PORT} !^443$ RewriteRule ^(.*)?$ https://%{SERVER_NAME......

Helios51 ⋅ 今天 ⋅ 0

Python爬虫,抓取淘宝商品评论内容

作为一个资深吃货,网购各种零食是很频繁的,但是能否在浩瀚的商品库中找到合适的东西,就只能参考评论了!今天给大家分享用python做个抓取淘宝商品评论的小爬虫! 思路 我们就拿“德州扒鸡”...

python玩家 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部