文档章节

聊聊storm的window trigger

go4it
 go4it
发布于 11/12 23:13
字数 852
阅读 21
收藏 1

本文主要研究一下storm的window trigger

WindowTridentProcessor.prepare

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

    public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
        this.topologyContext = context;
        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
        if (parents.size() != 1) {
            throw new RuntimeException("Aggregation related operation can only have one parent");
        }

        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

        this.tridentContext = tridentContext;
        collector = new FreshCollector(tridentContext);
        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

        windowStore = windowStoreFactory.create(stormConf);
        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

        tridentWindowManager = storeTuplesInStore ?
                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

        tridentWindowManager.prepare();
    }
  • 这里调用了tridentWindowManager.prepare()

AbstractTridentWindowManager.prepare

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
                                        Aggregator aggregator, BatchOutputCollector delegateCollector) {
        this.windowTaskId = windowTaskId;
        this.windowStore = windowStore;
        this.aggregator = aggregator;
        this.delegateCollector = delegateCollector;

        windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;

        windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());

        WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
        EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
        windowManager.setEvictionPolicy(evictionPolicy);
        triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
        windowManager.setTriggerPolicy(triggerPolicy);
    }

    public void prepare() {
        preInitialize();

        initialize();

        postInitialize();
    }

    private void postInitialize() {
        // start trigger once the initialization is done.
        triggerPolicy.start();
    }
  • AbstractTridentWindowManager在构造器里头调用windowStrategy.getTriggerPolicy获取triggerPolicy;prepare方法调用了postInitialize,而它触发triggerPolicy.start()

SlidingDurationWindowStrategy.getTriggerPolicy

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java

    /**
     * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
     *
     * @param triggerHandler
     * @param evictionPolicy
     * @return
     */
    @Override
    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
    }
  • 以SlidingDurationWindowStrategy为例,这里创建的是TimeTriggerPolicy,其duration为windowConfig.getSlidingLength(),而triggerHandler则为WindowManager

TimeTriggerPolicy.start

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/TimeTriggerPolicy.java

    public void start() {
        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    }

   private Runnable newTriggerTask() {
        return new Runnable() {
            @Override
            public void run() {
                // do not process current timestamp since tuples might arrive while the trigger is executing
                long now = System.currentTimeMillis() - 1;
                try {
                    /*
                     * set the current timestamp as the reference time for the eviction policy
                     * to evict the events
                     */
                    if (evictionPolicy != null) {
                        evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
                    }
                    handler.onTrigger();
                } catch (Throwable th) {
                    LOG.error("handler.onTrigger failed ", th);
                    /*
                     * propagate it so that task gets canceled and the exception
                     * can be retrieved from executorFuture.get()
                     */
                    throw th;
                }
            }
        };
    }
  • start方法注册了一个调度任务,每隔duration触发(windowConfig.getSlidingLength());而run方法是触发handler.onTrigger(),即WindowManager.onTrigger()

WindowManager.onTrigger

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

    /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }

  • 这里调用了windowLifecycleListener.onActivation(events, newEvents, expired),而windowLifecycleListener为AbstractTridentWindowManager的TridentWindowLifeCycleListener

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    /**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

   private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
  • TridentWindowLifeCycleListener.onActivation方法主要是execAggregatorAndStoreResult
  • 而execAggregatorAndStoreResult则依次调用aggregator的init、aggregate及complete方法
  • 最后将TriggerResult放入pendingTriggers

小结

  • storm在TimeTriggerPolicy.start的时候注册了定时任务TriggerTask,以SlidingDurationWindowStrategy为例,它的调度间隔为windowConfig.getSlidingLength()
  • TriggerTask定时触发WindowManager.onTrigger方法,该方法会回调windowLifecycleListener.onActivation
  • AbstractTridentWindowManager提供了TridentWindowLifeCycleListener,它的onActivation主要是调用execAggregatorAndStoreResult;而execAggregatorAndStoreResult方法主要完成对aggregator的一系列调用,先是调用init方法,然后遍历resultTuples挨个调用aggregate方法,最后complete方法(从这里可以清晰看到Aggregator接口的各个方法的调用逻辑及顺序)

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 70
博文 816
码字总数 684755
作品 0
深圳
私信 提问
聊聊storm的WindowedBoltExecutor

序 本文主要研究一下storm的WindowedBoltExecutor WindowedBoltExecutor storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java WindowedBoltExecutor实现......

go4it
10/25
0
0
聊聊storm TridentWindowManager的pendingTriggers

序 本文主要研究一下storm TridentWindowManager的pendingTriggers TridentBoltExecutor.finishBatch storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecut......

go4it
11/13
0
0
聊聊storm的reportError

序 本文主要研究一下storm的reportError IErrorReporter storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java ISpoutOutputCollector、IOutputCollector、IBasicO......

go4it
10/23
0
0
聊聊storm WindowTridentProcessor的FreshCollector

序 本文主要研究一下storm WindowTridentProcessor的FreshCollector 实例 这个实例在window操作之后跟了一个each操作 WindowTridentProcessor storm-core-1.2.2-sources.jar!/org/apache/st......

go4it
11/14
0
0
聊聊storm的JoinBolt

序 本文主要研究一下storm的JoinBolt 实例 JoinBolt storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java JoinBolt继承了BaseWindowedBolt,定义了Selector selectorType......

go4it
10/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

SpringBoot与pageHelper版本问题

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.6.RELEASE</version></parent> <dependency>......

WALK_MAN
6分钟前
0
0
PHP开发支付宝微信个人免签支付接口实例

这是一个PHP开发支付宝微信个人免签支付接口实例,支付宝微信即时到帐接口,使用原生支付宝即时到帐接口修改而来,即可实现多接口收款功能,开发只需要按照支付宝即时到帐接口开发即可,减少...

sucaihuo
11分钟前
1
0
《孩子,你慢慢来》的读书笔记与读后感2600字

《孩子,你慢慢来》的读书笔记与读后感2600字: 龙——保护儿童的思维: 今天读《孩子,你慢慢来》龙这一节,安安的妈妈是中国人,她在安安两岁的时候就认识到安安有着固执的个性。安安正是处...

原创小博客
22分钟前
2
0
kubernetes每个节点创建一个服务的Pod

1. 问题场景 希望一个worker节点上仅部署同样的服务一个. 比如: kubernets有三个worker节点,三个节点部署N个副本的api服务, 为了提高服务效率希望加入缓存,需要为三个节点个部署一个redis服务...

jimmywa
26分钟前
5
0
搭建Git服务器

Git本身是没有服务器和客户端的区别,但是如果我们要共享git仓库时,就需要ssh、http,它们就有服务器和客户端的区别。 Windows平台下搭建Git服务器 1、在自己电脑搭建Git服务器,且只有自己...

国仔饼
41分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部