jstorm进阶-ack机制及KafkaSpout
博客专区 > whaon 的博客 > 博客详情
jstorm进阶-ack机制及KafkaSpout
whaon 发表于5个月前
jstorm进阶-ack机制及KafkaSpout
  • 发表于 5个月前
  • 阅读 375
  • 收藏 7
  • 点赞 1
  • 评论 2
摘要: 因为有使用到storm-kafka,所以例子会采用KafkaSpout

安装部署使用

请参考我的另一篇文章jstorm部署及使用

ack机制

ack机制原理

这里不讲什么是ack机制,可以参考官网的文档Ack 机制
我们只要知道它是使用异或xor的原理即可:

A xor A = 0
A xor B xor B xor A = 0

使用ack机制

要想使用ack机制,需要做以下工作:

Topology的处理

构建topology时设置acker不为0,方法如下:

config.setNumAckers(1);

该方法实际是设置以Config.TOPOLOGY_ACKER_EXECUTORS为key的value,说明如下:

     /**
     * How many executors to spawn for ackers.
     * <p/>
     * <p>
     * If this is set to 0, then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.
     * </p>
     */
    public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";

Spout的处理

使用spout发送数据时,带上msgid,接口说明如下:

    /**
     * Emits a new tuple to the default output stream with the given message ID.
     * When Storm detects that this tuple has been fully processed, or has
     * failed to be fully processed, the spout will receive an ack or fail
     * callback respectively with the messageId as long as the messageId was not
     * null. If the messageId was null, Storm will not track the tuple and no
     * callback will be received. The emitted values must be immutable.
     *
     * @return the list of task ids that this tuple was sent to
     */
    public List<Integer> emit(List<Object> tuple, Object messageId) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

我们看下KafkaSpout是怎么做的:

    @Override
    public void nextTuple() {
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {

            try {
                // in case the number of managers decreased
                _currPartitionIndex = _currPartitionIndex % managers.size();
                EmitState state = managers.get(_currPartitionIndex).next(_collector);
                if (state != EmitState.EMITTED_MORE_LEFT) {
                    _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
                }
                if (state != EmitState.NO_EMITTED) {
                    break;
                }
            } catch (FailedFetchException e) {
                LOG.warn("Fetch failed", e);
                _coordinator.refresh();
            }
        }

        long now = System.currentTimeMillis();
        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
            commit();
        }
    }

注意上面的EmitState state = managers.get(_currPartitionIndex).next(_collector);我们进去看看 :

     public EmitState next(SpoutOutputCollector collector) {
        if (_waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
            if ((tups != null) && tups.iterator().hasNext()) {
                for (List<Object> tup : tups) {
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break;
            } else {
                ack(toEmit.offset);
            }
        }
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }

看到了吧,collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));emit的时候指定了messageId,而这个KafkaMessageId是一个静态内部类,包括分区和偏移量2个属性

    static class KafkaMessageId {
        public Partition partition;
        public long offset;

        public KafkaMessageId(Partition partition, long offset) {
            this.partition = partition;
            this.offset = offset;
        }
    }

Bolt的处理

一般我们写bolt的时候有两种方式,一种使用IRichBolt接口或者它的抽象实现类BaseRichBolt,一种使用IBasicBolt或者它的抽象实现类BaseBasicBolt,这2种是有区别的,主要在于影响ack机制

使用IRichBolt

使用IRichBolt意味着你要实现的接口如下:

void execute(Tuple input);

也意味着你要操作的类为OutputCollector
使用OutputCollector来emit tuple给下个bolt的时候必须要用anchored的方式,接口如下:

    /**
     * Emits a new tuple to the default stream anchored on a single tuple. The
     * emitted values must be immutable.
     *
     * @param anchor the tuple to anchor to
     * @param tuple the new output tuple from this bolt
     * @return the list of task ids that this new tuple was sent to
     */
    public List<Integer> emit(Tuple anchor, List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, anchor, tuple);

    /**
     * Emits a new tuple to the default stream anchored on a group of input
     * tuples. The emitted values must be immutable.
     *
     * @param anchors the tuples to anchor to
     * @param tuple the new output tuple from this bolt
     * @return the list of task ids that this new tuple was sent to
     */
    public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, anchors, tuple);
    }

所谓的anchor即为Bolt的execute方法里面的tuple,也即上游发给你的tuple
注意不能使用unanchored 的方式,说明如下:

    /**
     * Emits a new unanchored tuple to the default stream. Beacuse it's
     * unanchored, if a failure happens downstream, this new tuple won't affect
     * whether any spout tuples are considered failed or not. The emitted values
     * must be immutable.
     *
     * @param tuple the new output tuple from this bolt
     * @return the list of task ids that this new tuple was sent to
     */
    public List<Integer> emit(List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple);
    }

同时在emit后要手动执行collector.ack(tuple);方法

使用IbasicBolt

使用IbasicBolt则编程会简单的多,因为它会帮我做很多事情,我们要做的仅仅是调用emit方法即可,先看要实现的接口:

    /**
     * Process the input tuple and optionally emit new tuples based on the input tuple.
     * 
     * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
     */
    void execute(Tuple input, BasicOutputCollector collector);

这个execute方法和上述不一样了,他给我们注入了BasicOutputCollector类,我们操作它即可,其实这个类里面有一个上述OutputCollector out属性,并且,自动注入了inputTuple,使用它来emit tuple即可,暴露的emit的方法只有2个:

    public List<Integer> emit(String streamId, List<Object> tuple) {
        return out.emit(streamId, inputTuple, tuple);
    }

    public List<Integer> emit(List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple);
    }

正如上述代码所示,它实际调用的是OutputCollector的emit方法,并且自动帮我们使用anchor的方式,这里用到了我们熟悉的设计模式中的代理的模式
大家可能有注意到了,这里并没有显示的调用collector.ack(tuple);方法,这里猜猜也会知道,应该是用到了模板模式,在调用该方法的调用者那里,调用了execute方法后,调用ack方法,查下代码,果然没错,在BasicBoltExecutor类里面,方法如下:

    public void execute(Tuple input) {
        _collector.setContext(input);
        try {
            _bolt.execute(input, _collector);
            _collector.getOutputter().ack(input);
        } catch (FailedException e) {
            if (e instanceof ReportedFailedException) {
                _collector.reportError(e);
            }
            _collector.getOutputter().fail(input);
        }
    }

仔细看看,发现它还帮我们处理了异常,只要我们抛出FailedException,它就会自动执行fail方法

关闭ack

ack机制并不是必须的,并且会消耗一部分性能,如果可以容忍部分数据丢失,想要更高的性能则可以关闭ack机制

方法

  1. spout 在发送数据的时候不带上msgid
  2. 设置acker数等于0
  3. 使用 unanchored的方式

以上方法任一种都可以,推荐使用第二种方式

性能和事务

事务

jstorm支持事务操作,这里所谓的事务即是,顺序处理tuple,如果这次的tuple没有被完整的处理完,就不会处理下一个tuple,可以看到这样大大降低了并发性,性能不会太好。所以可以采用批量的思想个时候,一个batch为一个transaction处理单元,当一个batch处理完毕,才能处理下一个batch。还可以采用分阶段处理的方式,在processing阶段并发,实际commit的时候按顺序

Trident

Trident是Storm之上的高级抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。Trident将stream中的tuples分成batches进行处理,API封装了对这些batches的处理过程,保证tuple只被处理一次。处理batches中间结果存储在TridentState对象中。

性能

很明显的,按照性能来说, trident < transaction < 使用ack机制普通接口 < 关掉ack机制的普通接口
我们也可以通过增加ack的并发数来提高线程

ack和fail

ack方法和fail方法只有在Spout中才有
ack, 当spout收到一条ack消息时,触发的动作
fail, 当spout收到一条fail消息时,触发的动作

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
    }

参数为msgId,即为前面说的Spout里面发送数据的msgId,失败了是否需要重发tuple完全取决于你的实现,比如KafkaSpout就有自己的实现,代码这里就不贴了
需要注意的是,一般我们会有多个Bolt,在Topology处理流程上的任意Bolt处理失败都会触发Spout执行fail方法,如果你的程序在fail方法里面会重发tuple的话,那么这个tuple仍将会被所有的Bolt执行一遍,举例如下:
假设topology的流程为:SpoutA->BoltB->BoltC->BoltD 如果BoltC处理失败,则SpoutA将重发tuple,并且将再次按照topology的流程走一遍。可以看到,BoltB处理了2遍Bolt,如果在BoltB里有插入数据库的操作则会出现问题。
好在一般情况下,我们也只是在最末尾的Bolt中执行入库的操作,前面执行的Bolt基本都是内存计算,不落地,所以执行多遍也就不会有问题了

多线程

在jstorm中, spout中nextTuple和ack/fail运行在不同的线程中, 从而鼓励用户在nextTuple里面执行block的操作, 原生的storm,nextTuple和ack/fail在同一个线程,不允许nextTuple/ack/fail执行任何block的操作,否则就会出现数据超时,但带来的问题是,当没有数据时, 整个spout就不停的在空跑,极大的浪费了cpu, 因此,jstorm更改了storm的spout设计,鼓励用户block操作(比如从队列中take消息),从而节省cpu。
进一步说明如下:
当topology.max.spout.pending 设置不为1时(包括topology.max.spout.pending设置为null),spout内部将额外启动一个线程单独执行ack或fail操作, 从而nextTuple在单独一个线程中执行,因此允许在nextTuple中执行block动作,而原生的storm,nextTuple/ack/fail 都在一个线程中执行,当数据量不大时,nextTuple立即返回,而ack、fail同样也容易没有数据,进而导致CPU 大量空转,白白浪费CPU, 而在JStorm中, nextTuple可以以block方式获取数据,比如从disruptor中或BlockingQueue中获取数据,当没有数据时,直接block住,节省了大量CPU。
但因此带来一个问题, 处理ack/fail 和nextTuple时,必须小心线程安全性。
当topology.max.spout.pending为1时, 恢复为spout一个线程,即nextTuple/ack/fail 运行在一个线程中。

其他

重启

建议不超过1个月,强制重启一下supervisor, 因为supervisor是一个daemon进程, 不停的创建子进程,当使用时间过长时, 文件打开的句柄会非常多,导致启动worker的时间会变慢,因此,建议每隔一周,强制重启一次supervisor

输出到kafka

写入数据到kafka可以使用KafkaBolt这个类,它已经帮我做好了,我们只需要提供一些参数即可
上面讲的KafkaSpout和KafkaBolt都在storm-kafka这个框架里面,maven配置如下:

 <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>0.10.2</version>
        <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
        </exclusions>
 </dependency>

注意版本不为最新,为0.10.2,1.0.0版本后的包结构变了,和jstorm不兼容,不能使用
该项目为storm的官方插件项目,项目地址为:Storm Kafka

--------------------------------------我是分割线,2017年5月10日16:29:33加--------------------------------------
KafkaSpout的nextTuple方法里,每次都是调用

List<PartitionManager> managers = _coordinator.getMyManagedPartitions();

来获得分区的信息,而这个方法如下:

@Override
    public List<PartitionManager> getMyManagedPartitions() {
        if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
            refresh();
            _lastRefreshTime = System.currentTimeMillis();
        }
        return _cachedList;
    }

调用的时候判断是否超过了一定的时间,如果超过则重新获取分区的消息,这个时间默认为60s,为ZkHosts里的refreshFreqSecs属性

那么分区增加了或者减少了会不会有问题呢,答案是不会有问题,KafkaSpout已经帮我们做了很多了
如果分区数增加,在这60s内,我获取的是原来的分区进行消费,到60s后,刷新分区数,对新增加的分区进行消费,完全没有任何问题
如果分区数减少,比如之前有5个分区:0,1,2,3,4,现在减少为:0,1,2,当要消费分区3时会抛出异常并且在异常里面会重新刷新分区,这是分区数就会变为3,则直接跳出这个循环了,也不会有机会去消费分区4,所以也不会有任何问题

--------------------------------------我是分割线,2017年5月22日15:48:31加--------------------------------------
bolt中不要有静态变量和static{}方法 bolt中不要有静态变量和static{}方法 bolt中不要有静态变量和static{}方法 重要的事情说三遍

共有 人打赏支持
whaon
粉丝 42
博文 36
码字总数 31073
评论 (2)
userzgx
您好,在执行kafkaspout的topology的时候出现了java.lang.NoClassDefFoundError:com/google/common/baseStrings的错误,是什么原因?可以帮我解答一下吗。。。kafka的版本是0.9.0.1,storm的版本是1.0.3,kafkaspout也是相应的版本
whaon

引用来自“userzgx”的评论

您好,在执行kafkaspout的topology的时候出现了java.lang.NoClassDefFoundError:com/google/common/baseStrings的错误,是什么原因?可以帮我解答一下吗。。。kafka的版本是0.9.0.1,storm的版本是1.0.3,kafkaspout也是相应的版本
看下有没guava这个包,storm-kafka应该是会自动引入这个包的
×
whaon
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: