文档章节

Storm之trident聚合操作介绍

岩之有理
 岩之有理
发布于 2014/12/12 11:19
字数 1782
阅读 1153
收藏 2

Trident主要有5类操作:

1、作用在本地的操作,不产生网络传输。

2、对数据流的重分布,不改变流的内容,但是产生网络传输。

3、聚合操作,有可能产生网络传输。

4、作用在分组流(grouped streams)上的操作。

5、Mergejoin

 

这里主要介绍一下34,希望对大家有所帮助,如有错误请指正!

首先说几个名词:

PartitionStorm中并发的最小执行单元是task;在tridentpartition相当于task的角色。

Grouped streams对数据流做groupBy操作后,将key相同的流组织在一起,形成若干组流。

Global aggregation没有groupBy的聚合,即全局聚合。

AggregatorTrident中定义的用于实现聚合方法的接口。

下面开始介绍:

作用在tridentStream对象上的与聚合相关的主要方法:

aggregate

partitionAggregate

persistentAggregate

groupBy

partitionBy

partitionPersist

parallelismHint

作为聚合操作的一个参数,实现聚合功能的主要接口有:

Aggregator<T>

CombinerAggregator<T>

ReducerAggregator<T>

----------------------华丽丽的分割线----------------------------------

使用过Hive的人都知道,不含group by的聚合SQL在转化成Hadoop作业后,在编译时就确定了只能有1reduce,因为全局聚合在汇总阶段只能由1个计算单元完成。同样的道理,当aggregate方法用于无groupByglobal aggregation时,每个批次(batch)的流也只能在1partition中执行(使用AggregatorReducerAggregator接口,而CombinerAggregator例外,后面会讲到)。当我们使用aggregate计算global aggregation时,如果通过parallelismHint设置了并发数为ntrident的做法是通过轮循的方式让不同的批次依次在npartition中执行,实际上还是在串行执行,意义不大。因此使用aggregateglobal aggregation时,并不能实现并发的功能,只适用于数据量不大的场景,这时候最好把并发设成1,否则对资源是一种浪费。

值得一提的是,如果实现了自定义分组的CustomStreamGrouping接口,后面再跟global aggregation,例如:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .partition(new  MyCustomStreamGrouping())
       .aggregate(new MyAggregator(), new  Fields(“out1”))
       .parallelismHint(10);

这时候实际上我们的自定义分组是不起作用的,因为上面已经说明,此时并发并没有真正开启,而是采取的轮循策略。只有将aggregate换成partitionAggregate,自定义的分组才会起作用。

使用aggregate做分组聚合是它的强项,此时可以充分发挥并发的特性。但是需要注意,假设并发度设置为10,而我们groupBykey的不同值实际上只有2个,那势必有很多partition在空跑,造成资源浪费。

partitionAggregate通常用于global aggregation时的本地化聚合,类似于Hadoop中的map阶段。partitionAggregate是在每一个partition内独立调用自己的聚合操作,互不干涉。最后还需要把局部聚合值emit出来,通过网络传输供后面的aggregate做全局聚合。通过这种策略,可以实现global aggregation的并发。partitionAggregate的前面不能跟groupBy方法,因为groupBy方法返回的GroupedStream对象没有partitionAggregate方法。

Aggregator<T>接口是三种实现聚合功能的接口中最通用的一种。Aggregator<T>要实现5个方法:

prepare只在启动拓扑时调用1次。如果设置了并发度,则每一个partition调用1次。
cleanup
只在正常关闭拓扑时调用1次。如果设置了并发度,则每一个partition调用1次。
init
对于global aggregation,每个批次调用1次。如果使用的是partitionAggregate,则每一个批次的每一个partition调用1次。对于Grouped Streams,每个相同的key组成的数据流调用1次。需要注意的是,如果使用的是事务型spout,同时某个批次处理失败后导致该批次消息重发,则在接下来处理时init有可能会调用多次。因此init里面的代码逻辑应该要支持同一批的重复调用。
aggregate
1tuple调用1次。
complete
对于global aggregation,每个批次调用1次。如果使用的是partitionAggregate,则每一个批次的每一个partition调用1次。对于Grouped Streams,每个相同的key组成的数据流调用1次。

再说一下CombinerAggregator<T>,它比较有趣,前面提到使用aggregateglobal aggregation无法开启并发。但是当CombinerAggregator<T>aggregate配合使用时,例如:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .parallelismHint(10)
       .aggregate(new MyCombinerAggregator(), new Fields(“out1”));

Trident会把拓扑自动拆分成2bolt,第一个bolt做局部聚合,类似于Hadoop中的map;第二个bolt通过接收网络传输过来的局部聚合值最后做一个全局聚合,类似于Hadoop中的reduce。在上面的例子中,局部聚合开启了10个并发,这就实现了使用aggregateglobal aggregation时真正开启并发。当然,使用partitionAggregate可以实现同样的功能。类似于:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .partitionAggregate(new  MyAggregator(), new  Fields(“out1”))
       .parallelismHint(10)
       .aggregate(new Fields(“out1”), new MyAggregator(), new  Fields(“out2”));
有三点需要注意:
1
、自动优化后的第一个bolt是本地化操作,因此它可以和它前面或者后面挨着的所有each合并在同一个bolt里面。
2
parallelismHint(n)要写在aggregate的前面,如果写在aggregate后面,将导致本地化操作的第一个bolt的并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮循而已。
3
、综合12,把parallelismHint(n)写在aggregate的前面会导致spout同时开启n的并发度,因此要注意自己实现的spout类是否支持并发发送。

CombinerAggregator<T>需要实现3个方法:
init
每条tuple调用1次,对tuple做预处理。
combine
每条tuple调用1次,和之前的聚合值做combine。如果是第一条tuple则和zero返回的值做combine
zero
当没有数据流时的处理逻辑。
整个CombinerAggregator<T>会在每批次结束时将combine的结果做一次emit

persistentAggregate是实现聚合的另外一种方式。前面介绍的聚合可以看成是对每个批次的数据做本批次内的聚合计算,至于批次之间如何merge需要自己处理。而persistentAggregate可以看成是对源源不断发送过来数据流做一个总的聚合,每个批次的聚合值只是一个中间状态,通过与trident新提出的state概念结合,实现中间状态的持久化,同时支持事务性。persistentAggregate不能使用Aggregator<T>,只能使用CombinerAggregator<T>或者ReducerAggregator<T>

关于state接口,它的使用场景非常多,这里先不做详细介绍。它可以作为Stream.stateQuery的参数按批次对持久化的数据做查询;也可以配合Stream.partitionPersist按批次做持久化操作,类似于IBatchBolt<T>.finishBatch所能实现的功能。

EOF

本文转载自:http://blog.sina.com.cn/s/blog_6ff05a2c0101k6xj.html

岩之有理
粉丝 7
博文 18
码字总数 6134
作品 0
徐汇
高级程序员
私信 提问
聊聊storm trident的operations

序 本文主要研究一下storm trident的operations function filter projection Function storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Function.java Function定义了exe......

go4it
2018/11/04
22
0
聊聊storm trident spout的_maxTransactionActive

序 本文主要研究一下storm trident spout的_maxTransactionActive MasterBatchCoordinator storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java ......

go4it
2018/11/18
18
0
【Storm】Storm中的Trident

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/86561953 1、Trident简介 Trident是在原Storm实时流计算基础上的高层次抽象,...

魏晓蕾
01/20
0
0
基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats)

基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats) 网盘地址:https://pan.baidu.com/s/1-59o76H32Jfp0Vp-o8zLYg 提取码: im8f 备用地址(腾讯微云):https://share...

小小搞笑弟
2018/11/16
0
0
聊聊storm TridentWindowManager的pendingTriggers

序 本文主要研究一下storm TridentWindowManager的pendingTriggers TridentBoltExecutor.finishBatch storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecut......

go4it
2018/11/13
11
0

没有更多内容

加载失败,请刷新页面

加载更多

川普给埃尔多安和内堪尼亚胡的信

任性 https://twitter.com/netanyahu/status/1186647558401253377 https://edition.cnn.com/2019/10/16/politics/trump-erdogan-letter/index.htm...

Iridium
28分钟前
10
0
golang-mysql-原生

db.go package mainimport ("database/sql""time"_ "github.com/go-sql-driver/mysql")var (db *sql.DBdsn = "root:123456@tcp(127.0.0.1:3306)/test?charset=u......

李琼涛
56分钟前
5
0
编程作业20191021092341

1编写一个程序,把用分钟表示的时间转换成用小时和分钟表示的时 间。使用#define或const创建一个表示60的符号常量或const变量。通过while 循环让用户重复输入值,直到用户输入小于或等于0的值...

1李嘉焘1
57分钟前
7
0
Netty整合Protobuffer

现在我们都知道,rpc的三要素:IO模型,线程模型,然后就是数据交互模型,即我们说的序列化和反序列化,现在我们来看一下压缩比率最大的二进制序列化方式——Protobuffer,而且该方式是可以跨...

算法之名
今天
19
0
如何用C++实现栈

栈的定义 栈(stack)又名堆栈,它是一种运算受限的线性表。限定仅在表尾进行插入和删除操作的线性表。这一端被称为栈顶,相对地,把另一端称为栈底。向一个栈插入新元素又称作进栈、入栈或压...

BWH_Steven
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部