文档章节

BaseWindowedBolt.java

世吉
 世吉
发布于 2017/09/02 14:13
字数 1307
阅读 9
收藏 0
点赞 0
评论 0

/**
 * storm1.1.1的窗口bolt的核心代码
 * 窗口可以从时间或数量上来划分,由如下两个因素决定:窗口的长度,可以是时间间隔或Tuple数量;滑动间隔(sliding Interval),可以是时间间隔或Tuple数量。
 * 比如:每两秒统计最近6秒的请求数量;每接收2个Tuple就统计最近接收的6个Tuple的平均值......。
 * storm1.0支持的时间和数量的排列组合有如下:
 * withWindow(Count windowLength, Count slidingInterval)  
          每收到slidingInterval条数据统计最近的windowLength条数据。
 * withWindow(Count windowLength)
  每收到1条数据统计最近的windowLength条数据。
 * withWindow(Count windowLength, Duration slidingInterval)
  每过slidingInterval秒统计最近的windowLength条数据。
 * withWindow(Duration windowLength, Count slidingInterval)
  每收到slidingInterval条数据统计最近的windowLength秒的数据。
 * withWindow(Duration windowLength, Duration slidingInterval)
  每过slidingInterval秒统计最近的windowLength秒的数据。
 * withWindow(Duration windowLength)
  每收到1条数据统计最近的windowLength秒的数据。

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.topology.base.my;

import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IWindowedBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TupleFieldTimestampExtractor;
import org.apache.storm.windowing.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public abstract class BaseWindowedBolt implements IWindowedBolt {
    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);

    protected final transient Map<String, Object> windowConfiguration;
    protected TimestampExtractor timestampExtractor;

    /**
     * Holds a count value for count based windows and sliding intervals.
     */
    public static class Count {
        public final int value;

        public Count(int value) {
            this.value = value;
        }

        /**
         * Returns a {@link Count} of given value.
         *
         * @param value the count value
         * @return the Count
         */
        public static Count of(int value) {
            return new Count(value);
        }

        @Override
        public String toString() {
            return "Count{" +
                    "value=" + value +
                    '}';
        }
    }

    /**
     * Holds a Time duration for time based windows and sliding intervals.
     */
    public static class Duration {
        public final int value;

        public Duration(int value, TimeUnit timeUnit) {
            this.value = (int) timeUnit.toMillis(value);
        }

        /**
         * Returns a {@link Duration} corresponding to the the given value in milli seconds.
         *
         * @param milliseconds the duration in milliseconds
         * @return the Duration
         */
        public static Duration of(int milliseconds) {
            return new Duration(milliseconds, TimeUnit.MILLISECONDS);
        }

        /**
         * Returns a {@link Duration} corresponding to the the given value in days.
         *
         * @param days the number of days
         * @return the Duration
         */
        public static Duration days(int days) {
            return new Duration(days, TimeUnit.DAYS);
        }

        /**
         * Returns a {@link Duration} corresponding to the the given value in hours.
         *
         * @param hours the number of hours
         * @return the Duration
         */
        public static Duration hours(int hours) {
            return new Duration(hours, TimeUnit.HOURS);
        }

        /**
         * Returns a {@link Duration} corresponding to the the given value in minutes.
         *
         * @param minutes the number of minutes
         * @return the Duration
         */
        public static Duration minutes(int minutes) {
            return new Duration(minutes, TimeUnit.MINUTES);
        }

        /**
         * Returns a {@link Duration} corresponding to the the given value in seconds.
         *
         * @param seconds the number of seconds
         * @return the Duration
         */
        public static Duration seconds(int seconds) {
            return new Duration(seconds, TimeUnit.SECONDS);
        }

        @Override
        public String toString() {
            return "Duration{" +
                    "value=" + value +
                    '}';
        }
    }

    protected BaseWindowedBolt() {
        windowConfiguration = new HashMap<>();
    }

    private BaseWindowedBolt withWindowLength(Count count) {
        if (count.value <= 0) {
            throw new IllegalArgumentException("Window length must be positive [" + count + "]");
        }
        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
        return this;
    }

    private BaseWindowedBolt withWindowLength(Duration duration) {
        if (duration.value <= 0) {
            throw new IllegalArgumentException("Window length must be positive [" + duration + "]");
        }

        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
        return this;
    }

    private BaseWindowedBolt withSlidingInterval(Count count) {
        if (count.value <= 0) {
            throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]");
        }
        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
        return this;
    }

    private BaseWindowedBolt withSlidingInterval(Duration duration) {
        if (duration.value <= 0) {
            throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]");
        }
        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
        return this;
    }

    /**
     * Tuple count based sliding window configuration.
     *
     * @param windowLength    the number of tuples in the window
     * @param slidingInterval the number of tuples after which the window slides
     */
    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * Tuple count and time duration based sliding window configuration.
     *
     * @param windowLength    the number of tuples in the window
     * @param slidingInterval the time duration after which the window slides
     */
    public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * Time duration and count based sliding window configuration.
     *
     * @param windowLength    the time duration of the window
     * @param slidingInterval the number of tuples after which the window slides
     */
    public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * Time duration based sliding window configuration.
     *
     * @param windowLength    the time duration of the window
     * @param slidingInterval the time duration after which the window slides
     */
    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * A tuple count based window that slides with every incoming tuple.
     *
     * @param windowLength the number of tuples in the window
     */
    public BaseWindowedBolt withWindow(Count windowLength) {
        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    }

    /**
     * A time duration based window that slides with every incoming tuple.
     *
     * @param windowLength the time duration of the window
     */
    public BaseWindowedBolt withWindow(Duration windowLength) {
        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    }

    /**
     * A count based tumbling window.
     *
     * @param count the number of tuples after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Count count) {
        return withWindowLength(count).withSlidingInterval(count);
    }

    /**
     * A time duration based tumbling window.
     *
     * @param duration the time duration after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Duration duration) {
        return withWindowLength(duration).withSlidingInterval(duration);
    }

    /**
     * Specify a field in the tuple that represents the timestamp as a long value. If this
     * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
     *
     * @param fieldName the name of the field that contains the timestamp
     */
    public BaseWindowedBolt withTimestampField(String fieldName) {
        return withTimestampExtractor(TupleFieldTimestampExtractor.of(fieldName));
    }

    /**
     * Specify the timestamp extractor implementation.
     *
     * @param timestampExtractor the {@link TimestampExtractor} implementation
     */
    public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
        if (this.timestampExtractor != null) {
            throw new IllegalArgumentException("Window is already configured with a timestamp extractor: " + timestampExtractor);
        }
        this.timestampExtractor = timestampExtractor;
        return this;
    }

    @Override
    public TimestampExtractor getTimestampExtractor() {
        return timestampExtractor;
    }

    /**
     * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the
     * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field.
     * It must be defined on a per-component basis, and in conjunction with the
     * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
     *
     * @param streamId the name of the stream used to emit late tuples on
     */
    public BaseWindowedBolt withLateTupleStream(String streamId) {
        windowConfiguration.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, streamId);
        return this;
    }


    /**
     * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
     * cannot be out of order by more than this amount.
     *
     * @param duration the max lag duration
     */
    public BaseWindowedBolt withLag(Duration duration) {
        windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, duration.value);
        return this;
    }

    /**
     * Specify the watermark event generation interval. For tuple based timestamps, watermark events
     * are used to track the progress of time
     *
     * @param interval the interval at which watermark events are generated
     */
    public BaseWindowedBolt withWatermarkInterval(Duration interval) {
        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value);
        return this;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // NOOP
    }

    @Override
    public void cleanup() {
        // NOOP
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // NOOP
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return windowConfiguration;
    }
}
 

© 著作权归作者所有

共有 人打赏支持
世吉
粉丝 1
博文 4
码字总数 4547
作品 0
东城
程序员

暂无相关文章

【elasticsearch】 随笔 Date datatype

一。时间类型的本质 首先json是没有时间类型的,对于es来说,时间类型的标示可以是下面三种情况 1.一个时间格式的字符串,如:"2014-11-27T08:05:32Z","2015-01-01" or "2015/01/01 12:10:3...

xiaomin0322 ⋅ 27分钟前 ⋅ 0

阿里云资源编排ROS使用教程

阿里云资源编排ROS详细内容: 阿里云资源编排ROS使用教程 资源编排(Resource Orchestration)是一种简单易用的云计算资源管理和自动化运维服务。用户通过模板描述多个云计算资源的依赖关系、...

mcy0425 ⋅ 29分钟前 ⋅ 0

适配器设计模式

1、适配器模式 把一个类的接口变换成客户端所期待的另一种接口 使原本因接口不匹配而无法在一起工作的两个类能够在一起工作 分为类的适配器模式和对象的适配器模式 2、类适配器模式 类的适配...

职业搬砖20年 ⋅ 33分钟前 ⋅ 0

npm操作报错 _stream_writable.js:61

有一天 不知道什么原因(估计和node的版本有关),无论你做什么npm的操作 都会报错/usr/local/lib/node_modules/npm/node_modules/readable-stream/lib/_stream_writable.js:61 这时候只要执...

lilugirl ⋅ 37分钟前 ⋅ 0

Eclipse安装插件的几种方式

Eclipse魅力之一就是支持可扩展的插件,来丰富自身的功能,这种方式也是建立在开源思想之上的。具体使用什么方式去安装插件,要看我们拿到的是什么。 1. 拿到的是一串URL,如http://subclips...

GordonNemo ⋅ 39分钟前 ⋅ 0

div图片叠加

css实现代码如下: <div style="position: relative;"><!--这个层为外面的父层,需设置相对位置样式--> <div style="position: absolute;"><!--子层,需设置绝对位置样式--> <i......

niithub ⋅ 41分钟前 ⋅ 0

作用域slot

如果父组件需要使用子组件中的内容怎么办,比如父组件需要控制子组件的显示 <div id="root"><child><template slot-scope="props"><h1>{{props.item}} <div>编辑</div></h1><......

金于虎 ⋅ 43分钟前 ⋅ 1

HongHu commonservice-eureka 项目构建过程

上一篇我们回顾了关于 spring cloud eureka的相关基础知识,现在我们针对于HongHu cloud的eureka项目做以下构建,整个构建的过程很简单,我会将每一步都构建过程记录下来,希望可以帮助到大家...

明理萝 ⋅ 46分钟前 ⋅ 1

xml和对象的相互转化

@Data//setter和getter方法,toString和equals,hashcode方法@EqualsAndHashCode//代表重写equals和hashcode方法@XmlAccessorType(XmlAccessType.FIELD)public class Classroom {@X......

拐美人 ⋅ 46分钟前 ⋅ 0

tableView cell的高度 分组头部尾部的高度 自适应

@property (nonatomic) CGFloat rowHeight; // default is UITableViewAutomaticDimension@property (nonatomic) CGFloat sectionHeaderHeight; // default is UITableViewA......

娜一片蓝色星海 ⋅ 47分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部