文档章节

Twitter Storm Stream Grouping编写自定义分组实现

震秦
 震秦
发布于 2014/04/17 09:45
字数 1045
阅读 798
收藏 12

##自定义Grouping测试

Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。

这是我写的一个自定义分组,总是把数据分到第一个Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping {
    private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);

    private List<Integer> tasks;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
		List<Integer> targetTasks) {
	    this.tasks = targetTasks;
	    log.info(tasks.toString());
    }	
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
	    log.info(values.toString());
	    return Arrays.asList(tasks.get(0));
    }
}

从上面的代码可以看出,该自定义分组会把数据归并到第一个Task<code>Arrays.asList(tasks.get(0));</code>,也就是数据到达后总是被派发到第一组。

测试代码:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
//自定义分组,
builder.setBolt("exclaim1", new DefaultStringBolt(), 3)
	    .customGrouping("words", new MyFirstStreamGrouping());

和之前的测试用例一样,Spout总是发送<code>new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}</code>列表的字符串。我们运行验证一下:

11878 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

从这个运行日志我们可以看出,数据总是派发到一个Blot:Thread-25-exclaim1。因为我时本地测试,Thread-25-exclaim1是线程名。而派发的线程是数据多个线程的。因此该测试符合预期,即总是发送到一个Task,并且这个Task也是第一个。

##理解自定义分组实现

自己实现一个自定义分组难吗?其实如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一样的道理。

Hadoop MapReduce的Map完成后会把Map的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的Reducer把数据划分成相应的分区,然后不同的分区进入不同的Reduce。我们先来看看Hadoop是怎样把数据怎样分组的,这是Partitioner唯一一个方法:

public class Partitioner<K, V> {
    @Override
    public int getPartition(K key, V value, int numReduceTasks) {
        return 0;
    }
}

上面的代码中:Map输出的数据都会经过getPartition()方法,用来确定下一步的分组。numReduceTasks是一个Job的Reduce数量,而返回值就是确定该条数据进入哪个Reduce。返回值必须大于等于0,小于numReduceTasks,否则就会报错。返回0就意味着这条数据进入第一个Reduce。对于随机分组来说,这个方法可以这么实现:

public int getPartition(K key, V value, int numReduceTasks) {
    return hash(key) % numReduceTasks;
}

其实Hadoop 默认的Hash分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。

搞通了Hadoop的Partitioner,我们来看看Storm的CustomStreamGrouping。

这是CustomStreamGrouping类的源码:

public interface CustomStreamGrouping extends Serializable {

   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

   List<Integer> chooseTasks(int taskId, List<Object> values); 
}

一模一样的道理,targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 <code> chooseTasks(int taskId, List<Object> values); </code> 就是让你选择,你的这条数据values,是要哪几个目标Task处理?

如上文文章开头的自定义分组器实现的代码,我选择的总是让第一个Task来处理数据,<code> return Arrays.asList(tasks.get(0)); </code> 。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List<Integer>.就是让你来在提供的 'List<Integer> targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。

由此,Storm的自定义分组策略也就不那么麻烦了吧?

© 著作权归作者所有

震秦
粉丝 192
博文 34
码字总数 35126
作品 0
西安
架构师
私信 提问
storm翻译(2):Concepts

Concepts:概念 原文:http://storm.apache.org/documentation/Concepts.html 这个列表展示了storm中的主要概念和相关详细信息。这些概念是: Topologies Streams Spouts Bolts Stream group...

岩之有理
2015/02/10
173
0
Storm入门 第二章 构建Topology

2.1 Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies Streams Spouts Bolts Stream groupings Reliability Tasks Workers Configuration Storm集群和Hadoop集群表面上...

坏坏一笑
2014/12/03
86
0
Storm笔记整理(四):Storm核心概念与验证——并行度与流式分组

[TOC] Storm核心概念之并行度 Work 1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的(...

xpleaf
2018/04/13
0
0
Kafka实战-Storm Cluster

1.概述   在《Kafka实战-实时日志统计流程》一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Sto...

smartloli
2015/06/18
0
0
Twitter Storm, 数据流分组策略,fieldsGrouping

Storm Grouping shuffleGrouping fieldsGrouping All grouping Global grouping None grouping Direct grouping fieldsGrouping 上面的资料我摘抄自:http://xumingming.sinaapp.com/127/twi......

震秦
2014/04/14
2.2K
0

没有更多内容

加载失败,请刷新页面

加载更多

Java中的原生关键字是什么?

在玩这个谜题 (这是Java关键字琐事游戏)时,我遇到了native关键字。 Java中的native关键字用于什么? #1楼 实现本机代码的函数被声明为本机。 Java本机接口(JNI)是一个编程框架,使在Jav...

技术盛宴
今天
54
0
博通与苹果达成150亿美元协议,覆盖未来3年苹果产品

  据外媒报道,芯片供应商博通(Broadcom)宣布已与苹果公司签署了一份协议,为其提供“高性能的无线组件和模块”,博通表示,这些芯片将在未来 3 年半的时间内用于自 2020 年 1 月份以后发...

水果黄瓜
今天
95
0
《算法》笔记 17 - 数据压缩

读写二进制数据 基因组数据的压缩 游程编码 位图 霍夫曼压缩 前缀码和单词查找树 构造前缀码的单词查找树 写入和读取单词查找树 使用前缀码压缩 使用前缀码展开 LZW压缩 LZW的单词查找树 LZ...

zhixin9001
今天
55
0
#技术分享# 再论DDD之【实体】建模

2020年春节第一天,早早醒来,出去晨跑一下,目前武汉以及周边的疫情处于红色报警状态,今年春节宅家是最安全的做法。闲暇之余,还需要探索技术,跑步中忽然想到了【忒修斯之船】的小故事,感...

边缘行者
今天
88
0
工作自由--2020年开篇,开启一个项目:工作自由 worksolo.cn

新年伊始,我突发奇想,也是很多人敢想而不敢做的事情,下面我以一个多年软件开发从业者的角度去思考,去设计这个项目,当然希望看到这篇文章的你可以给我更多思路: 项目名称:工作自由 域名...

_aron_
今天
51
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部