文档章节

storm trident

 张欢19933
发布于 2016/12/20 20:44
字数 4149
阅读 93
收藏 2

简介

Storm是一个实时流计算框架,Trident是对storm的一个更高层次的抽象,Trident最大的特点以batch的形式处理stream。

     一些最基本的操作函数有Filter、Function,Filter可以过滤掉tuple,Function可以修改tuple内容,输出0或多个tuple,并能把新增的字段追加到tuple后面。

     聚合有partitionAggregate和Aggregator接口。partitionAggregate对当前partition中的tuple进行聚合,它不是重定向操作。Aggregator有三个接口:CombinerAggregator, ReducerAggregator,Aggregator,它们属于重定向操作,它们会把stream重定向到一个partition中进行聚合操作。

     重定向操作会改变数据流向,但不会改变数据内容,重定向操会产生网络传输,可能影响一部分效率。而Filter、Function、partitionAggregate则属于本地操作,不会产生网络传输。

     GroupBy会根据指定字段,把整个stream切分成一个个grouped stream,如果在grouped stream上做聚合操作,那么聚合就会发生在这些grouped stream上而不是整个batch。如果groupBy后面跟的是aggregator,则是聚合操作,如果跟的是partitionAggregate,则不是聚合操作。

Trident主要有5类操作:

1、作用在本地的操作,不产生网络传输。

2、对数据流的重分布,不改变流的内容,但是产生网络传输。

3、聚合操作,有可能产生网络传输。

4、作用在分组流(grouped streams)上的操作。

5、Merge和join

partition

概念

partition中文意思是分区,有人将partition理解为Storm里面的task,即并发的基本执行单位。我理解应该是像数据库里面的分区,是将一个batch的数据分区,分成多个partition,或者可以理解为多个子batch,然后多个partition可以并发处理。这里关键的区别是:partition是数据,不是执行的代码。你把数据(tuple)分区以后,如果你没有多个task(并发度)来处理这些分区后的数据,那分区也是没有作用的。所以这里的关系是这样的:先有batch,因为Trident内部是基于batch来实现的;然后有partition;分区后再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现的。

操作

既然有partition的概念,那么也就有partition的操作。Trident提供的分区操作,类似于Storm里面讲的grouping。分区操作有:

重分区操作通过运行一个函数改变元组在任务之间的分布,也可以调整分区的数量(比如重分区之后将并行度调大),重分区需要网络传输的参与。重分区函数包含以下这几个:

  1. shuffle:使用随机轮询算法在所有目标分区间均匀分配元组;
  2. broadcast:每个元组复制到所有的目标分区。这在DRPC中非常有用,例如,需要对每个分区的数据做一个stateQuery操作;
  3. partitionBy:接收一些输入字段,根据这些字段输入字段进行语义分区。通过对字段取hash值或者取模来选择目标分区。partitionBy保证相同的字段一定被分配到相同的目标分区;
  4. global:所有的元组分配到相同的分区,该分区是流种所有batch决定的;
  5. batchGlobal:同一个batch中的元组被分配到相同的目标分区,不同batch的元组有可能被分配到不同的目标分区;
  6. partition:接收一个自定义的分区函数,自定义分区函数需要实现backtype.storm.grouping.CustomStreamGrouping接口。

注意,除了这里明确提出来的分区操作,Trident里面还有aggregate()函数隐含有分区的操作,它用的是global()操作,这个在后面接收聚合操作的时候还会再介绍。

API

each() 方法

     作用:操作batch中的每一个tuple内容,一般与Filter或者Function函数配合使用。

     下面通过一个例子来介绍each()方法,假设我们有一个FakeTweetsBatchSpout,它会模拟一个Stream,随机产生一个个消息。我们可以通过设置这个Spout类的构造参数来改变这个Spout的batch Size的大小。

    1.Filter类:过滤tuple

     一个通过actor字段过滤消息的Filter:

public static class PerActorTweetsFilter extends BaseFilter {
  String actor;

  public PerActorTweetsFilter(String actor) {
    this.actor = actor;
  }
  @Override
  public boolean isKeep(TridentTuple tuple) {
    return tuple.getString(0).equals(actor);
  }
}

Topology:

topology.newStream("spout", spout)
  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  .each(new Fields("actor", "text"), new Utils.PrintFilter());

从上面例子看到,each()方法有一些构造参数

  • 第一个构造参数:作为Field Selector,一个tuple可能有很多字段,通过设置Field,我们可以隐藏其它字段,仅仅接收指定的字段(其它字段实际还在)。
  • 第二个是一个Filter:用来过滤掉除actor名叫"dave"外的其它消息。

     2.Function类:加工处理tuple内容

     一个能把tuple中text内容变成大写的Function:

public static class UppercaseFunction extends BaseFunction {
  @Override
  public void execute(TridentTuple tuple, TridentCollector collector) {
    collector.emit(new Values(tuple.getString(0).toUpperCase()));
  }
}

Topology:

topology.newStream("spout", spout)
  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
  .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

首先,UppercaseFunction函数的输入是Fields("text", "actor"),其作用是把其中的"text"字段内容都变成大写。

  其次,它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后面,在本例中,执行完Function之后的tuple内容多了一个"uppercased_text",并且这个字段排在最后面。

    3. Field Selector与project

   我们需要注意的是,上面每个each()方法的第一个Field字段仅仅是隐藏掉没有指定的字段内容,实际上被隐藏的字段依然还在tuple中,如果想要彻底丢掉它们,我们就需要用到project()方法。

   投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: [“a”, “b”, “c”, “d”],运行如下代码:

mystream.project(new Fields("b", "d"))

则输出的流仅包含 [“b”, “d”]字段。

aggregation的介绍

首先聚合操作分两种:partitionAggregate(),以及aggregate()。

    1.partitionAggregate

partitionAggregate()的操作是在partition上,一个batch的tuple被分成多个partition后,每个partition都会单独运行partitionAggregate中指定的聚合操作。分区聚合在一批tuple的每一个分区上运行一个函数。与函数不同的是,分区聚合的输出元组会覆盖掉输入元组。请看如下示例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假设你有一个包含a,b两个字段的输入流,元组的分区情况如下:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]

运行上面的那一行代码将会输出如下的元组,这些元组只包含一个sum字段: 

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]

    2.aggregate

aggregate()隐含了一个global分区操作,也就是它做的是全局聚合操作。它针对的是整个batch的聚合计算。

这两种聚合操作,都可以传入不同的aggregator实现具体的聚合任务。Trident中有三种aggregator接口,分别为:ReducerAggregator,CombinerAggregator,Aggregator。

下面是CombinerAggregator接口的定义:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

CombinerAggregator返回只有一个字段的一个元组。CombinerAggregator在每个输入元组上运行init函数,然后通过combine函数聚合结果值直到只剩下一个元组。如果分区中没有任何元组,CombinerAggregator将返回zero函数中定义的元组。比如,下面是Count聚合器的实现:

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

ReducerAggregator接口的定义如下:

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

ReducerAggregator通过init函数得到一个初始的值,然后对每个输入元组调用reduce方法计算值,产生一个元组作为输出。比如Count的ReducerAggregator实现如下:

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}

最常用的聚合器的接口是Aggregator,它的定义如下:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

Aggregator能够发射任意数量,任意字段的元组。并且可以在执行期间的任何时候发射元组,它的执行流程如下:

  1. 处理batch之前调用init方法,init函数的返回值是一个表示聚合状态的对象,该对象会传递到aggregate和complete函数;
  2. 每个在batch分区中的元组都会调用aggregate方法,该方法能够更新聚合状态并且发射元组;
  3. 当batch分区中的所有元组都被aggregate函数处理完时调用complete函数。

下面是使用Aggregator接口实现的Count聚合器:

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

有些时候,我们需要通知执行很多个聚合器,则可以使用如下的链式调用执行:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

上面的代码将会在每一个分区执行Count和Sum聚合器,输出结果是包含count和sum两个字段的元组。

最重要的区别是CombinerAggregator,它是先在partition上做partial aggregate,然后再将这些部分聚合结果通过global分区到一个总的分区,在这个总的分区上对结果进行汇总。

groupBy()分组操作

首先它包含两个操作,一个是分区操作,一个是分组操作。

如果后面是partitionAggregate()的话,就只有分组操作:在每个partition上分组,分完组后,在每个分组上进行聚合;

如果后面是aggregate()的话,先根据partitionBy分区,在每个partition上分组,,分完组后,在每个分组上进行聚合。

parallelismHint并发度的介绍

它设置它前面所有操作的并发度,直到遇到某个repartition操作为止。

topology.newStream("spout", spout)
      .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
      .parallelismHint(5)
      .each(new Fields("actor", "text"), new Utils.PrintFilter());

意味着:parallelismHit之前的spout,each都是5个相同的操作一起并发,对,一共有5个spout同时发射数据,其实parallelismHint后面的each操作,也是5个并发。分区操作是作为Bolt划分的分界点的。

如果想单独设置Spout怎么办?要在Spout之后,Bolt之前增加一个ParallelismHint,并且还要增加一个分区操作:

topology.newStream("spout", spout)
	  .parallelismHint(2)
	  .shuffle()
	  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	  .parallelismHint(5)
	  .each(new Fields("actor", "text"), new Utils.PrintFilter());      

很多人只是设置了Spout的并发度,而没有调用分区操作,这样是达不到效果的,因为Trident是不会自动进行分区操作的。像我之前介绍的,先分区,再设置并发度。如果Spout不设置并发度,只设置shuffle,默认是1个并发度,这样后面设置5个并发度不会影响到Spout,因为并发度的影响到shuffle分区操作就停止了。

例子

groupBy+aggregate+parallelismHint

package com.demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;



public class MyAgg extends BaseAggregator<Map<String, Integer>> {
	

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	/**
	 * 属于哪个分区
	 */
	private int partitionId;

	/**
	 * 分区数量
	 */
	private int numPartitions;
	private String batchId;
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		partitionId = context.getPartitionIndex();
		numPartitions = context.numPartitions();
		
	}

	public void aggregate(Map<String, Integer> val, TridentTuple tuple,
			TridentCollector collector) {
		String word = tuple.getString(0);
		Integer value = val.get(word);
		if (value == null) {
			value = 0;
		}
		value++;
		// 把数据保存到一个map对象中
		val.put(word, value);
		System.err.println("I am partition [" + partitionId
				+ "] and I have kept a tweet by: " + numPartitions + " " + word + " " +batchId);
	}

	public void complete(Map<String, Integer> val, TridentCollector collector) {
		collector.emit(new Values(val));
	}

	public Map<String, Integer> init(Object arg0, TridentCollector arg1) {
		this.batchId = arg0.toString();
		return new HashMap<String, Integer>();
	}

}
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.groupBy(new Fields("sentence"))
				.aggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
			    .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [1] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

groupBy+partitionAggregate+parallelismHint

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.groupBy(new Fields("sentence"))
				.partitionAggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map")))
				.toStream()
			    .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [1] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

由于shuffle已经把tuple平均分配给5个partition了,用groupBy+partitionAggregate来聚合又没有partitionBy分区的作用,所以,直接在5个分区上进行聚合,结果就是每个分区各有一个tuple。

而用groupBy+aggregate,虽然也是shuffle,但是由于具有partitiononBy分区的作用,值相同的tuple都分配到同一个分区,结果就是每个分区根据不同的值来做汇聚。

aggregate+parallelismHint(没有groupBy)

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.aggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
			    .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [0] and I have kept a tweet by: 2 d 2:0
I am partition [1] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

partitionAggregate+parallelismHint(没有groupBy操作)

		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.partitionAggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
				.toStream()
			    .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 2:0
I am partition [0] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

我们可以发现,partitionAggregate加上groupBy,或者不加上groupBy,对结果都一样:groupBy对于partitionAggregate没有影响。但是对于aggregate来说,加上groupBy,就不是做全局聚合了,而是对分组做聚合;不加上groupBy,就是做全局聚合。

如果spout设置并行度,但是没有加shuffle,不会起作用,分区默认为1,;如果不设置并行度并且没有加shuffle,分区默认为1。

Merge和Joins

api的最后一部分便是如何把各种流汇聚到一起。最简单的方式就是把这些流汇聚成一个流。我们可以这么做:   

topology.merge(stream1, stream2, stream3);

Trident指定新的合并之后的流中的字段为stream1中的字段。
另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的Stream。join应用在来自Spout的每一个小Batch中。

下面的例子中,stream1流包含key,val1,val2三个字段,stream2流包含x,val1两个字段:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

stream1流的key字段与stream2流的x字段组join操作,另外,Trident要求所有新流的输出字段被重命名,因为输入流可能包含相同的字段名称。连接流发射的元组将会包含:

  1. 连接字段的列表。在上面的例子中,字段key对应stream1的key,stream2的x;
  2. 来自所有流的所有非连接字段的列表,按照传递到连接方法的顺序排序。在上面的例子中,字段a与字段b对应stream1的val1和val2,c对应于stream2的val1.

     当join的是来源于不同Spout的stream时,这些Spout在发射数据时需要同步,一个Batch所包含的tuple会来自各个Spout。   

© 著作权归作者所有

粉丝 46
博文 538
码字总数 247200
作品 0
海淀
私信 提问
基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats)

基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats) 网盘地址:https://pan.baidu.com/s/1-59o76H32Jfp0Vp-o8zLYg 提取码: im8f 备用地址(腾讯微云):https://share...

小小搞笑弟
2018/11/16
0
0
Apache Storm 1.2.3 发布,分布式实时计算

Apache Storm 1.2.3 发布了,更新内容如下: 新特性 [STORM-3233] - zookeeper 客户端升级到最新版本 (3.4.13) 改进 [STORM-3077] - Disruptor 升级至 3.3.11 [STORM-3083] - HikariCP 升级至...

xplanet
07/20
817
0
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
704
1
聊聊storm trident的operations

序 本文主要研究一下storm trident的operations function filter projection Function storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Function.java Function定义了exe......

go4it
2018/11/04
22
0
聊聊storm trident spout的_maxTransactionActive

序 本文主要研究一下storm trident spout的_maxTransactionActive MasterBatchCoordinator storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java ......

go4it
2018/11/18
18
0

没有更多内容

加载失败,请刷新页面

加载更多

3_数组

3_数组

行者终成事
今天
7
0
经典系统设计面试题解析:如何设计TinyURL(二)

原文链接:https://www.educative.io/courses/grokking-the-system-design-interview/m2ygV4E81AR 编者注:本文以一道经典的系统设计面试题:《如何设计TinyURL》的参考答案和解析为例,帮助...

APEMESH
今天
7
0
使用logstash同步MySQL数据到ES

概述   在生成业务常有将MySQL数据同步到ES的需求,如果需要很高的定制化,往往需要开发同步程序用于处理数据。但没有特殊业务需求,官方提供的logstash就很有优势了。   在使用logstas...

zxiaofan666
今天
10
0
X-MSG-IM-分布式信令跟踪能力

经过一周多的鏖战, X-MSG-IM的分布式信令跟踪能力已基本具备, 特点是: 实时. 只有要RX/TX就会实时产生信令跟踪事件, 先入kafka, 再入influxdb待查. 同时提供实时sub/pub接口. 完备. 可以完整...

dev5
今天
7
0
OpenJDK之CyclicBarrier

OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下 图1 CyclicBarrier是Doug Lea在JDK1.5中引入的,作用就不详细描述了,主要有如下俩个方法使用: await()方法,如果当前线...

克虏伯
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部