文档章节

聊聊storm TridentWindowManager的pendingTriggers

go4it
 go4it
发布于 11/13 23:29
字数 1497
阅读 9
收藏 0

本文主要研究一下storm TridentWindowManager的pendingTriggers

TridentBoltExecutor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

    private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }
  • 这里调用_bolt的finishBatch方法,这个_bolt有两个实现类,分别是TridentSpoutExecutor用于spout,一个是SubtopologyBolt用于普通的bolt

SubtopologyBolt.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java

    public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }
  • SubtopologyBolt.finishBatch调用了一系列TridentProcessor的finishBatch操作

WindowTridentProcessor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        // add tuple to the batch state
        Object state = processorContext.state[tridentContext.getStateIndex()];
        ((List<TridentTuple>) state).add(projection.create(tuple));
    }

   public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if (pendingTriggerIds != null) {
                for (Integer pendingTriggerId : pendingTriggerIds) {
                    triggerKeys.add(triggerKey(pendingTriggerId));
                }
                triggerValues = windowStore.get(triggerKeys);
            }
        }

        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if (!pendingTriggerIds.isEmpty()) {
                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
                }
            }
        }

        collector.setContext(processorContext);
        int i = 0;
        for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
        }
        collector.setContext(null);
    }
  • WindowTridentProcessor所在的bolt,ack一个batch的所有tuple之后,会执行finishBatch操作
  • WindowTridentProcessor的execute,接收到一个tuple,堆积到processorContext.state
  • finishBatch的时候,从processorContext.state取出这一批tuple,然后调用tridentWindowManager.addTuplesBatch(batchId, tuples)
  • 之后调用tridentWindowManager.getPendingTriggers()获取pendingTriggerIds存入store,同时获取待触发的triggerValues
  • 最后将triggerValues挨个构造TriggerInfo以及resultValue发送出去

StoreBasedTridentWindowManager.addTuplesBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java

    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
        List<WindowsStore.Entry> entries = new ArrayList<>();
        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
        }

        // tuples should be available in store before they are added to window manager
        windowStore.putAll(entries);

        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            addToWindowManager(i, key, tridentTuple);
        }

    }

    private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
        TridentTuple actualTuple = null;
        if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
            actualTuple = tridentTuple;
        }
        currentCachedTuplesSize.incrementAndGet();
        windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
    }
  • StoreBasedTridentWindowManager的addTuplesBatch方法,将这批tuple放入到windowStore,然后挨个addToWindowManager添加到windowManager

WindowManager.add

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

    private final ConcurrentLinkedQueue<Event<T>> queue;

    /**
     * Add an event into the window, with {@link System#currentTimeMillis()} as
     * the tracking ts.
     *
     * @param event the event to add
     */
    public void add(T event) {
        add(event, System.currentTimeMillis());
    }

    /**
     * Add an event into the window, with the given ts as the tracking ts.
     *
     * @param event the event to track
     * @param ts    the timestamp
     */
    public void add(T event, long ts) {
        add(new EventImpl<T>(event, ts));
    }

    /**
     * Tracks a window event
     *
     * @param windowEvent the window event to track
     */
    public void add(Event<T> windowEvent) {
        // watermark events are not added to the queue.
        if (!windowEvent.isWatermark()) {
            queue.add(windowEvent);
        } else {
            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
        }
        track(windowEvent);
        compactWindow();
    }
  • 添加tuple到ConcurrentLinkedQueue中

WindowManager.onTrigger

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

   /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • onTrigger方法首先调用scanEvents方法获取windowEvents,之后区分为events及newEvents,然后回调windowLifecycleListener.onActivation(events, newEvents, expired)方法

WindowManager.scanEvents

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

    /**
     * Scan events in the queue, using the expiration policy to check
     * if the event should be evicted or not.
     *
     * @param fullScan if set, will scan the entire queue; if not set, will stop
     *                 as soon as an event not satisfying the expiration policy is found
     * @return the list of events to be processed as a part of the current window
     */
    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
        List<T> eventsToExpire = new ArrayList<>();
        List<Event<T>> eventsToProcess = new ArrayList<>();
        try {
            lock.lock();
            Iterator<Event<T>> it = queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = evictionPolicy.evict(windowEvent);
                if (action == EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                } else if (!fullScan || action == STOP) {
                    break;
                } else if (action == PROCESS) {
                    eventsToProcess.add(windowEvent);
                }
            }
            expiredEvents.addAll(eventsToExpire);
        } finally {
            lock.unlock();
        }
        eventsSinceLastExpiry.set(0);
        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
        if (!eventsToExpire.isEmpty()) {
            LOG.debug("invoking windowLifecycleListener.onExpiry");
            windowLifecycleListener.onExpiry(eventsToExpire);
        }
        return eventsToProcess;
    }
  • scanEvents方法从ConcurrentLinkedQueue中获取event,然后判断是否过期,将其分为expiredEvents、eventsToProcess两类,返回eventsToProcess的events

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    /**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

    private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
  • onActivation方法调用了execAggregatorAndStoreResult,它会调用window的aggregator,然后将结果存到windowStore,同时将resultantAggregatedValue作为TriggerResult添加到pendingTriggers中

小结

  • WindowTridentProcessor所在的TridentBoltExecutor,它在接收到spout的tuple的时候,调用processor的execute方法,将tuple缓存到ProcessorContext中;一系列的processor的execute方法执行完之后,就ack该tuple
  • 当WindowTridentProcessor所在的TridentBoltExecutor对一个batch的所有tuple ack完之后,会触发checkFinish操作,然后执行finishBatch操作,而finishBatch操作会调用一系列TridentProcessor的finishBatch操作(比如WindowTridentProcessor -> ProjectedProcessor -> PartitionPersistProcessor -> EachProcessor -> AggregateProcessor)
  • WindowTridentProcessor.finishBatch从processorContext.state取出这一批tuple,然后调用tridentWindowManager.addTuplesBatch(batchId, tuples),将这批tuple放入到windowStore,然后添加到windowManager的ConcurrentLinkedQueue中;之后调用tridentWindowManager.getPendingTriggers()获取pendingTriggerIds存入store,同时获取待触发的triggerValues,将triggerValues挨个构造TriggerInfo以及resultValue发送出去
  • 而WindowManager.onTrigger方法,在window操作时间窗口触发时被调用,它从windowManager的ConcurrentLinkedQueue中获取windowEvent,然后传递给TridentWindowLifeCycleListener.onActivation
  • TridentWindowLifeCycleListener.onActivation方法则会执行window的aggregator的init、aggregate、complete操作获取聚合结果resultantAggregatedValue,然后放入pendingTriggers,至此完成window trigger与WindowTridentProcessor的衔接

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 70
博文 816
码字总数 684755
作品 0
深圳
私信 提问
聊聊storm WindowTridentProcessor的FreshCollector

序 本文主要研究一下storm WindowTridentProcessor的FreshCollector 实例 这个实例在window操作之后跟了一个each操作 WindowTridentProcessor storm-core-1.2.2-sources.jar!/org/apache/st......

go4it
11/14
0
0
聊聊storm的window trigger

序 本文主要研究一下storm的window trigger WindowTridentProcessor.prepare storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java 这里调用了t......

go4it
11/12
0
0
聊聊storm trident batch的分流与聚合

序 本文主要研究一下storm trident batch的分流与聚合 实例 这里最后构造了3个bolt,分别为b-0、b-1、b-2 b-0主要是partitionAggregate,它的parallelismHint为3 b-1主要是处理CombinerAggre...

go4it
11/17
0
0
聊聊storm的reportError

序 本文主要研究一下storm的reportError IErrorReporter storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java ISpoutOutputCollector、IOutputCollector、IBasicO......

go4it
10/23
0
0
聊聊storm的CustomStreamGrouping

序 本文主要研究一下storm的CustomStreamGrouping CustomStreamGrouping storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java 这里定义了prepare以及cho......

go4it
10/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Cloud Feign 异常处理

问题 最近在项目开发中,使用 Feign 调用服务,当触发熔断机制时,遇到了以下问题: 异常信息形如:TestService#addRecord(ParamVO) failed and no fallback available.; 获取不到服务提供方...

xiaomin0322
9分钟前
0
0
解决OSX使用oh-my-zsh后.bash_profile自定义失效

场景描述 为了使OSX自带的终端在使用上更加顺手,便安装了oh-my-zsh插件, 但发现之前在.bash_profile自定义的一些内容都失效了。 问题分析 oh-my-zsh有自己的配置文件,覆盖了.bash_profile...

SuShine
12分钟前
0
0
java中线程读取配置文件properties

配置文件在很多方面可以用到,比如数据库连接,数据库工厂方法的调用,只要在配置文件中修改即可,不用修改程序,使用起来还是很方便的。 现在演示一下通过线程读取配置文件进行反射的一种方...

寒风中的独狼
15分钟前
1
0
面向接口编程详解-Java篇

  相信看到这篇文字的人已经不需要了解什么是接口了,我就不再过多的做介绍了,直接步入正题,接口测试如何编写。那么在这一篇里,我们用一个例子,让各位对这个重要的编程思想有个直观的印...

浮躁的码农
15分钟前
0
0
NPM install -save 和 -save-dev 傻傻分不清

本文原文地址:https://www.limitcode.com/detail/59a15b1a69e95702e0780249.html 回顾 npm install 命令 最近在写Node程序的时候,突然对 npm install 的-save和-save-dev 这两个参数的使用...

翔飘飘
17分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部