文档章节

storm 1.0以后的新版本 滑动窗口的实现及原理

世吉
 世吉
发布于 2017/09/02 16:01
字数 1067
阅读 177
收藏 0

简单的演示如何使用storm1.0实现滑动窗口的功能,先编写spout类,RandomSentenceSpout负责发送一个整形数值,数值每次发送都会自动加一,且RandomSentenceSpout固定每隔两秒向bolt发送一次数据。RandomSentenceSpout和前面关于spout的讲解一样。

复制代码

1.public class RandomSentenceSpout extends BaseRichSpout {
2.
3.    private static final long serialVersionUID = 5028304756439810609L;  
4.
5.    private SpoutOutputCollector collector;  
6.
7.    int intsmaze=0;
8.
9.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
10.        declarer.declare(new Fields("intsmaze"));
11.    }
12.
13.    public void open(Map conf, TopologyContext context, 
14.                          SpoutOutputCollector collector) {
15.        this.collector = collector;
16.    }
17.
18.    public void nextTuple() {
19.        System.out.println("发送数据:"+intsmaze);
20.        collector.emit(new Values(intsmaze++));
21.        try {
22.            Thread.sleep(2000);
23.//         Thread.sleep(1000);
24.        } catch (InterruptedException e) {
25.            e.printStackTrace();
26.        }
27.    }
}

复制代码

 

 

滑动窗口的逻辑实现的重点是bolt类,这里我们编写SlidingWindowBolt类让它继承一个新的类名为BaseWindowedBolt来获得窗口计数的功能。BaseWindowedBolt和前面的BaseBaseBolt和BaseWindowedBolt提供的方法名都一样,只是execute方法的参数类型为TupleWindow,TupleWindow参数里面装载了一个窗口长度类的tuple数据。通过对TupleWindow遍历,我们可以计算这一个窗口内tuple数的平均值或总和等指标。具体见代码12-16行,统计了一个窗口内的数值型数据的总和。

复制代码

1.public class SlidingWindowBolt extends BaseWindowedBolt {
2.
3.    private OutputCollector collector;
4.
5.    @Override
6.    public void prepare(Map stormConf, TopologyContext context, 
7.            OutputCollector collector) {
8.        this.collector = collector;
9.    }
10.
11.    public void execute(TupleWindow inputWindow) {        
12.        int sum=0;
13.        System.out.print("一个窗口内的数据");
14.        for(Tuple tuple: inputWindow.get()) {
15.            int str=(Integer) tuple.getValueByField("intsmaze");
16.            System.out.print(" "+str);
17.            sum+=str;
18.        }
19.        System.out.println("======="+sum);
20. //        collector.emit(new Values(sum));
21.    }
22.
23.    @Override
24.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
25.//       declarer.declare(new Fields("count"));
26.    }
}

复制代码

 

 

我们已经实现了窗口计数的逻辑代码,现在我们需要提供topology来指明各个组件的关系,以及指定SlidingWindowBolt的窗口的组合,这里我们演示了如何每两秒统计最近6秒的数值总和,如果注释掉10-13行代码,去掉5-8行的注释,这个topology就是告诉SlidingWindowBolt每接收到两条tuple就统计最近接收到的6条tuple的数值的总和。

复制代码

1.public class WindowsTopology {
2.
3.    public static void main(String[] args) throws Exception {
4.       TopologyBuilder builder = new TopologyBuilder();
5.       builder.setSpout("spout1", new RandomSentenceSpout(), 1);
6.//       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
7.//       .withWindow(new Count(6), new Count(2)),1)
8.//       .shuffleGrouping("spout");
9.//滑窗 窗口长度:tuple数, 滑动间隔: tuple数 每收到2条数据统计当前6条数据的总和。  
10.     
11.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
12.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
13.               new Duration(2, TimeUnit.SECONDS)),1)
14.       .shuffleGrouping("spout");//每两秒统计最近6秒的数据       
15.
16.       Config conf = new Config();
17.       conf.setNumWorkers(1);
18.       LocalCluster cluster = new LocalCluster();
19.       cluster.submitTopology("word-count", conf, builder.createTopology());
20.   }
}

复制代码

 

 

这里演示的是bolt节点并发度为1的窗口功能,实际生产中,因为数据量很大,往往将bolt节点的并发度设置为多个,这个时候我们的SlidingWindowBolt就无法统计出一个窗口的数值总和了。因为每一个bolt的并行节点只能统计自己一个窗口接收到数据的总和,无法统计出一个窗口内全局数据的总和,借助redis来实现是可以的,但是必须引入redis的事务机制或者借助分布式锁,否则会出现脏数据的情况。在这里我们介绍另一种实现方式就是灵活的使用storm提供的窗口功能,只是窗口的tuple数。

仍然是使用上面提供的类,只是我们增加一个bolt类,来统计每个SlidingWindowBolt节点发送给它的数值。

按 Ctrl+C 复制代码

 

按 Ctrl+C 复制代码

 

 

然后我们注释RandomSentenceSpout第22行代码,取消对23行代码的注释,方便观察结果。去掉SlidingWindowBolt类20和25行代码。

topology启动类如下:

复制代码

1.public class WindowsTopology {
2.
3.    public static void main(String[] args) throws Exception {
4.       TopologyBuilder builder = new TopologyBuilder();
5.       builder.setSpout("spout", new RandomSentenceSpout(), 1);
6.       
7.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
8.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
9.               new Duration(2, TimeUnit.SECONDS)),2)
10.       .shuffleGrouping("spout");//每两秒统计最近6秒的数据
11.       
12.       builder.setBolt("countwordbolt", new CountWord()
13.       .withWindow(new Count(2), new Count(2)),1)
14.       .shuffleGrouping("slidingwindowbolt");
15.       //每收到2条tuple就统计最近两条统的数据
16.       Config conf = new Config();
17.       conf.setNumWorkers(1);
18.       LocalCluster cluster = new LocalCluster();
19.       cluster.submitTopology("word-count", conf, builder.createTopology());
20.   }
}

© 著作权归作者所有

世吉
粉丝 1
博文 4
码字总数 4547
作品 0
东城
程序员
私信 提问
加载中

评论(0)

Apache Storm 2.0.0 发布,基于 Java ​​​​​​​的新架构

Apache Storm 2.0.0 发布了,距离它上次更新已过去一年,新版本在性能、新功能和与外部系统的集成方面进行了重大改进,下面是一些主要功能及改进: 用 Java 实现的新架构 在之前的版本中,S...

xplanet
2019/06/03
3.2K
5
SODBASE CEP学习(四):类SQL语言EPL与Storm或jStorm集成

开发者社区活动,SODBASE产品的用户现在可以领礼品啦 Storm框架原本是设计用来做互联网短文本处理和一些统计工作的,是一种分布式流式计算框架。在一些场合,特别是在已经用了Storm架构以后,...

wishuhappyyear
2015/04/30
0
0
Apache Storm 1.2.3 发布,分布式实时计算

Apache Storm 1.2.3 发布了,更新内容如下: 新特性 [STORM-3233] - zookeeper 客户端升级到最新版本 (3.4.13) 改进 [STORM-3077] - Disruptor 升级至 3.3.11 [STORM-3083] - HikariCP 升级至...

xplanet
2019/07/20
904
0
Storm Windowing(翻译)

窗口计算是流处理中常用的情况之一,在这种情况下,无边界数据流被按一定的标准(例如时间)分割成有限集合,并在每一组事件上应用计算。一个例子是在最近一小时内计算最热门的Twitter话题。...

Yulong_
2017/08/10
80
0
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
865
1

没有更多内容

加载失败,请刷新页面

加载更多

直接显示StackOverflow的答题日期, 增加评论区回复的时间显示 ,修改时间显示到小时分。

// ==UserScript==// @name 直接显示StackOverflow的答题日期, 增加评论区回复的时间显示 ,修改时间显示到小时分。// @namespace http://tampermonkey.net/// @version ...

FalconChen
今天
36
0
Shader笔记_005 纹理

纹理最初的目的就是使用一张图片来控制模型的外观,通过纹理映射技术 我们可以把一张图粘贴在物体表面,逐纹素的控制模型的颜色。 通常美术建模的时候也会在软件里利用纹理展开技术把纹理展开成...

STONE-CITY
今天
12
0
iOS MVVM 与RAC结合使用

MVVM配合 RAC 更能发挥的淋漓尽致。 我们把 MVVM 第一篇的例子 KVO 的事件 替换成 配合RAC 框架使用, OC的话直接导入 : pod 'ReactiveObjC' Swift 直接用 RXSwift就可以。 把 ViewModel里加...

T型人才追梦者
今天
22
1
OSChina 周一乱弹 —— 影响心情的三座大山

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《浮生(inst.)》- 忘乡 / 墨凡悦 手机党少年们想听歌,请使劲儿戳(这里) @凝小紫...

小小编辑
今天
57
0
Unity中头发渲染

头发与普通PBR 材质最大的区别是 头发是各项异性的高光, 参考实现主要为下面文章 http://web.engr.oregonstate.edu/~mjb/cs519/Projects/Papers/HairRendering.pdf 头发包含 2个高光,以及高...

liyong2
今天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部