文档章节

六、Storm的高级原语之Transactional Topology

datapro
 datapro
发布于 2015/06/24 17:29
字数 2369
阅读 381
收藏 11

1、什么是Transactional Topology?

    ○ 是一个每个tuple仅被处理一次的框架

    ○ 由Storm0.7引入,于Storm0.9被弃用,被triden取而代之

    ○ 底层依靠spout\bolt\topology\stream抽象的一个特性

2、Transactional Topology设计思路

    ○ 一次只处理一次tuple

        基于Storm处理tuple失败时会重发(replay),如何确保replay的记录不被重复记录,换句话说就是如何保证tuple仅被处理一次,这就依赖于一个称作强顺序性的思想。

        强顺序性:每个tuple与一个transaction id相关联,transaction id实际就是一个数字,每一个tuple都有一个按照顺序的transaction id(例如:tuple1的transaction id 为 1,tuple2的transaction id 为 2,...以此类推),只有当前的tuple处理并存储完毕,下一个tuple(处于等待状态)才能进行存储,tuple被存储时连同transaction id一并存储,此时考虑两种情况:

                        tuple处理失败时:重新发送一个和原来一模一样的transaction id

                        tuple处理成功时:发送的transaction id会和存储的transaction id对比,如果不存在transaction id,表示第一次记录,直接存储;如果发现存在,则忽略该tuple。

        这一思想是由Kafka开发者提出来的。

    ○ 一次处理一批tuple

        基于上面的一个优化,将一批tuple直接打包成一个batch,然后分配一个transaction id ,让batch与batch之间保证强顺序性,且batch内部的tuples可以并行。

    ○ Storm是如何采用的?

        两个步骤:

            1、并行计算batch中的tuple数量

            2、batch强顺序性存储

            在batch强顺序性存储的同时让其他等待存储的batch内部进行并行运算,不必等到下一个batch存储时才进行内部运算。

        在Storm上面的两个步骤表现为processing阶段commit阶段

3、一些设计细节

使用Transactional Topology时,storm提供如下操作:

    ○ 管理状态

        将需要处理的状态如:transaction id 、batch meta等状态信息放在zookeeper

    ○ 协调事务

        指定某个时间段执行processing操作和commit操作

    ○ 错误检测

        storm使用acking框架自动检测batch被成功或失败处理,然后相应的重发(replay)

    ○ 内置批处理API

        通过对普通的bolt进行包装,提供一套对batch处理的API、协调工作(即某个时刻处理某个processing或者commit),并且storm会自动清除中间结果

Transactional Topology是可以完全重发一个特定batch的消息队列系统,在 Kakfa中正是有这样的需求,为此Storm在storm-contrib里面的Storm-Kafka中为Kafka实现了一个事务性的spout。

4、来自Storm-Starter.jar的例子

    计算来自输入流中tuple的个数

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
        .shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
        .globalGrouping("partial-count");

    ○ 通过TransactionalTopologyBuilder类构建Transactional

        参数:

        Transaction ID:transactional topology的ID,在zookeeper中用于保存进度状态,重启topology时可以直接从执行的进度开始执行而不用重头到尾又执行一遍

        Spout ID:位于整个Topology的Spout的ID

        Spout Object:Transactional中的Spout对象

        Spout:Trasactional中的Spout的并行数

    ○ MemoryTransactionalSpout用于从一个内存变量中读取数据

        DATA:数据

        tuple fields:字段

        tupleNum:在batch中最大的tuple数

    ○ Bolts

        第一个Bolt采用随机分组的方式随机分发到各个task

public static class BatchCount extends BaseBatchBolt {
    Object _id;
    BatchOutputCollector _collector;
    int _count = 0;
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }
    @Override
    public void execute(Tuple tuple) {
        _count++;
    }
    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "count"));
    }
}

        BatchBolt对象运行在BatchBoltExecutor中,BatchBoltExecutor负责BatchBolt对象的创建和清理 

        BatchBolt的ID在context对象中,该ID是一个TransactionAttempt对象.

        BatchBolt在DRPC中也可以使用,只是txid类型不一样,如果在Transactional Topology中使用BatchBolt,可以继承BaseTransactionalBolt.

        在Tranasctional Topology中所有的Tuple都必须以TransactionAttempt作为第一个field,然后storm才能根据该field判断Tuple所属的BatchBolt,所以在发射Tuple必须满足此条件。

        TransactionAttempt对象中有两个属性:

            transaction id:强顺序性,无论重发多少次都是一样的数字

            attempt id:对每一个Batch标识的ID,每次重发都其值不一致,通过该ID可以区分每次重发的Tuple的不同版本

第二个Bolt使用GlobalGrouping汇总batch中的tuple数

 public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
    TransactionAttempt _attempt;
    BatchOutputCollector _collector;
    int _sum = 0;
 
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
        _collector = collector;
        _attempt = attempt;
    }
 
    @Override
    public void execute(Tuple tuple) {
        _sum+=tuple.getInteger(1);
    }
 
    @Override
    public void finishBatch() {
        Value val = DATABASE.get(GLOBAL_COUNT_KEY);
        Value newval;
        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
            newval = new Value();
            newval.txid = _attempt.getTransactionId();
            if(val==null) {
                newval.count = _sum;
            } else {
                newval.count = _sum + val.count;
            }
            DATABASE.put(GLOBAL_COUNT_KEY, newval);
        } else {
            newval = val;
        }
        _collector.emit(new Values(_attempt, newval.count));
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "sum"));
    }
}

    ICommitter接口:实现该接口的Bolt会在commit阶段调用finishBatch方法,该方法的调用会按照强顺序性,此外还可以使用TransactionalTopologyBuilder的setCommiterBolt来添加Bolt实现和该接口一样的功能。

    executor方法:在processing阶段和commit阶段都可以执行。

    关于更多的transactional topology例子可以看看storm-starter中的TransactionalWords类,该例子会在一个事务中更新多个数据库

 

5、Transaction Topology API

    Bolt类

    BaiscBolt:该Bolt不跟batch中的tuples交互,仅基于单个传来的tuple和产生新的tuple

    BatchBolt:该Bolt处理batch中的tuples,对于每一个tuple调用executor方法,整个batch完成时调用finishBatch方法

    被Committer标记的Bolt:在commit阶段才调用finishBatch方法,commit具有强顺序性,标记Bolt为commit阶段执行finishBatch的方法有两种:1、实现ICommiter接口。2、TransactionalTopologyBuilder的setCommiterBolt来添加Bolt。

    Processing阶段和Commit阶段

    

    红色轮廓的Bolt被标记过为commit

    Spout向Bolt A发送整个Batch

    Bolt A处理完整个Batch之后调用finishBatch方法分别向Bolt B 和 Bolt C发送Batch

    Bolt B接收到Bolt A传递过来的tuple进行处理(此时还尚未处理完毕)不会调用finishBatch方法

    Bolt C接口Bolt A传递的tuple,尽管处理完Bolt A传递来的tuple,但是由于Bolt B还尚未commit,所以Bolt C处于等待Bolt B commit的状态,不会调用finishBatch方法

    Bolt D接收来自Bolt C调用executor方法时发送的所有tuple

    此时一旦Bolt B进行commit进行finishBatch操作,那么Bolt C就会确认接收到所有Bolt B的tuple,Bolt C也调用finishBatch方法,最终Bolt D也接收到所有来自Bolt C的batch。

    在这里尽管Bolt D是一个committer,它在接收到整个batch的tuple之后不需要等待第二个commit信号。因为它是在commit阶段接收到的整个batch,它会调用finishBatch来完成整个事务。

    Acking

注意,当使用transactional topology的时候你不需要显式地去做任何的acking或者anchoring,storm在背后都做掉了。(storm对transactional topolgies里面的acking机制进行了高度的优化)

    Failing a transaction

在使用普通bolt的时候, 你可以通过调用OutputCollector的fail方法来fail这个tuple所在的tuple树。Transactional Topology对用户隐藏了acking框架, 它提供一个不同的机制来fail一个batch(从而使得这个batch被replay):只要抛出一个FailedException就可以了。跟普通的异常不一样, 这个异常只会导致当前的batch被replay, 而不会使整个进程崩溃掉。

    Transactional spout

TransactionalSpout接口跟普通的Spout接口完全不一样。一个TransactionalSpout的实现会发送一批一批(batch)的tuple, 而且必须保证同一批次tuples的transaction id始终一样。

在transactional topology运行的时候, transactional spout看起来是这样的一个结构:

coordinator是一个普通的storm的spout——它一直为事务的batch发射tuple。

Emitter则像一个普通的storm bolt,它负责为每个batch实际发射tuple,emitter以all grouping的方式订阅coordinator的”batch emit”流。
关于如何实现一个TransactionalSpout的细节可以参见Javadoc

    Partitioned Transactional Spout

一种常见的TransactionalSpout是那种从多个queue broker读取数据然后再发射的tuple。比如TransactionalKafkaSpout就是这样工作的。IPartitionedTransactionalSpout把这些管理每个分区的状态以保证可以replay的幂等性的工作都自动化掉了。更多可以参考Javadoc。

    配置

Transactional Topologies有两个重要的配置:

Zookeeper:默认情况下,transactional topology会把状态信息保存在一个zookeeper里面(协调集群的那个)。你可以通过这两个配置来指定其它的zookeeper:”transactional.zookeeper.servers” 和 “transactional.zookeeper.port“。

同时活跃的batch数量:你必须设置同时处理的batch数量,你可以通过”topology.max.spout.pending” 来指定, 如果你不指定,默认是1。

6、实现

Transactional Topologies的实现是非常优雅的。管理提交协议,检测失败并且串行提交看起来很复杂,但是使用storm的原语来进行抽象是非常简单的。

1、transactional spout是一个子topology, 它由一个coordinator spout和一个emitter bolt组成。

2、coordinator是一个普通的spout,并行度为1;emitter是一个bolt,并行度为P,使用all分组方式连接到coordinator的“batch”流上。

3、coordinator使用一个acking框架决定什么时候一个batch被成功执行(process)完成,然后去决定一个batch什么时候被成功提交(commit)。

© 著作权归作者所有

上一篇: Flume入门
datapro
粉丝 16
博文 37
码字总数 22063
作品 0
广州
高级程序员
私信 提问
Storm入门 第五章 一致性事务

第五章 一致性事务 Storm是一个分布式的流处理系统,利用anchor和ack机制保证所有tuple都被成功处理。如果tuple出错,则可以被重传,但是如何保证出错的tuple只被处理一次呢?Storm提供了一套...

坏坏一笑
2014/12/03
88
0
大数据处理 Hadoop、HBase、ElasticSearch、Storm、Kafka、Spark

场景 伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高。举个搜索场景中的例子,当一个卖家发布了一条...

江河海流
2014/03/30
5.1K
0
Storm高级原语(二) -- DRPC详解

Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU密集型(CPU intensive)的计算任务。DRPC的stormtopology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出...

超人学院
2015/05/27
276
0
storm 原理简介及单机版安装指南(转)

本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的、高容错的实时计算系统。 Storm对于实时计算的的意义相当于Hadoop对于批处理的意义。Hadoop为我们提供...

Jacos
2014/12/03
2.4K
0
大数据Storm相比于Spark、Hadoop有哪些优势(摘录)

一、可能很多初学大数据的伙伴不知道strom是什么,先给大家介绍一下strom: 分布式实时计算系统,storm对于实时计算的意义类似于hadoop对于批处理的意义。 storm的适用场景。 流数据处理。S...

风火数据
2018/06/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如何设计一个高可用系统?要考虑哪些地方?

本文已经收录自笔者开源的 JavaGuide: https://github.com/Snailclimb (69k+Star【Java学习+面试指南】 一份涵盖大部分Java程序员所需要掌握的核心知识)如果觉得不错的还,不妨去点个Star,鼓...

SnailClimb
26分钟前
50
0
Go 每日一库之 viper

简介 上一篇文章介绍 cobra 的时候提到了 viper,今天我们就来介绍一下这个库。 viper 是一个配置解决方案,拥有丰富的特性: 支持 JSON/TOML/YAML/HCL/envfile/Java properties 等多种格式的...

darjunlee
56分钟前
113
0
Raspberry Pi 树莓派 搭建java8环境

更新软件源 apt-get upgradeapt-get update 移除自带的openjdk(如果有的话) apt-get remove openjdk* 安装oracle jdk,地址: https://www.oracle.com/technetwork/java/javase/downl......

ChangeZ
今天
145
0
树莓派3b 安装openwrt

https://downloads.openwrt.org/releases/19.07.0/targets/brcm2708/bcm2710/openwrt-19.07.0-brcm2708-bcm2710-rpi-3-squashfs-factory.img.gz 插入并启动树莓派3B+,顺便接上键盘和显示器,......

FalconChen
今天
236
0
OSChina 周一乱弹 —— 有些运动也常用到膝盖

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《火宵の月・テーマ~ピアノソロ》- 中村由利子 手机党少年们想听歌,请使劲儿戳(...

小小编辑
今天
5.6K
13

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部