文档章节

Storm1.0 以前的老版本 实现滑动窗口计数和TopN排序

世吉
 世吉
发布于 2017/09/02 15:59
字数 1930
阅读 7
收藏 0
点赞 0
评论 0

参考http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/

  

这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping

     

 String spoutId = "wordGenerator";
 String counterId = "counter";
 String intermediateRankerId = "intermediateRanker";
 String totalRankerId = "finalRanker";
 builder.setSpout(spoutId, new TestWordSpout(), 5);
 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));
 builder.setBolt(totalRankerId, new TotalRankingsBolt TOP_N)).globalGrouping(intermediateRankerId);

 

RollingCountBolt

首先使用RollingCountBolt, 并且此处是按照word进行fieldsGrouping的, 所以相同的word会被发送到同一个bolt, 这个field id是在上一级的declareOutputFields时指定的 

RollingCountBolt, 用于基于时间窗口的counting, 所以需要两个参数, the length of the sliding window in seconds和the emit frequency in seconds

  

new RollingCountBolt(9, 3), 意味着output the latest 9 minutes sliding window every 3 minutes

1. 创建SlidingWindowCounter(SlidingWindowCounter和SlotBasedCounter参考下面)     
counter = new SlidingWindowCounter(this.windowLengthInSeconds / this.windowUpdateFrequencyInSeconds);   
如何定义slot数? 对于9 min的时间窗口, 每3 min emit一次数据, 那么就需要9/3=3个slot   
那么在3 min以内, 不停的调用countObjAndAck(tuple)来递增所有对象该slot上的计数   
每3分钟会触发调用emitCurrentWindowCounts, 用于滑动窗口(通过getCountsThenAdvanceWindow), 并emit (Map<obj, 窗口内的计数和>, 实际使用时间)   
因为实际emit触发时间, 不可能刚好是3 min, 会有误差, 所以需要给出实际使用时间

 

2. TupleHelpers.isTickTuple(tuple), TickTuple

前面没有说的一点是, 如何触发emit? 这是比较值得说明的一点, 因为其使用Storm的TickTuple特性.   
这个功能挺有用, 比如数据库批量存储, 或者这里的时间窗口的统计等应用   
"__system" component会定时往task发送 "__tick" stream的tuple   
发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置, 可以在default.ymal里面配置   
也可以在代码里面通过getComponentConfiguration()来进行配置,

  

public Map<String, Object> getComponentConfiguration() {
     Map<String, Object> conf = new HashMap<String, Object>();
     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
     return conf;

配置完成后, storm就会定期的往task发送ticktuple   
只需要通过isTickTuple来判断是否为tickTuple, 就可以完成定时触发的功能

  

public static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) \\ SYSTEM_COMPONENT_ID == "__system"
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); \\ SYSTEM_TICK_STREAM_ID == "__tick"
}

 

最终, 这个blot的输出为, collector.emit(new Values(obj, count, actualWindowLengthInSeconds));   
obj, count(窗口内的计数和), 实际使用时间

 

SlotBasedCounter

基于slot的counter, 模板类, 可以指定被计数对象的类型T   
这个类其实很简单, 实现计数对象和一组slot(用long数组实现)的map, 并可以对任意slot做increment或reset等操作 

关键结构为Map<T, long[]> objToCounts, 为每个obj都对应于一个大小为numSlots的long数组, 所以对每个obj可以计numSlots个数   
incrementCount, 递增某个obj的某个slot, 如果是第一次需要创建counts数组   
getCount, getCounts, 获取某obj的某slot值, 或某obj的所有slot值的和   
wipeSlot, resetSlotCountToZero, reset所有对象的某solt为0, reset某obj的某slot为0   
wipeZeros, 删除所有total count为0的obj, 以释放空间

  

public final class SlotBasedCounter<T> implements Serializable {

    private static final long serialVersionUID = 4858185737378394432L;

    private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
    private final int numSlots;

    public SlotBasedCounter(int numSlots) {
        if (numSlots <= 0) {
            throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots
                + ")");
        }
        this.numSlots = numSlots;
    }

    public void incrementCount(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        if (counts == null) {
            counts = new long[this.numSlots];
            objToCounts.put(obj, counts);
        }
        counts[slot]++;
    }

    public long getCount(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        if (counts == null) {
            return 0;
        }
        else {
            return counts[slot];
        }
    }

    public Map<T, Long> getCounts() {
        Map<T, Long> result = new HashMap<T, Long>();
        for (T obj : objToCounts.keySet()) {
            result.put(obj, computeTotalCount(obj));
        }
        return result;
    }

    private long computeTotalCount(T obj) {
        long[] curr = objToCounts.get(obj);
        long total = 0;
        for (long l : curr) {
            total += l;
        }
        return total;
    }

    /**
     * Reset the slot count of any tracked objects to zero for the given slot.
     * 
     * @param slot
     */
    public void wipeSlot(int slot) {
        for (T obj : objToCounts.keySet()) {
            resetSlotCountToZero(obj, slot);
        }
    }

    private void resetSlotCountToZero(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        counts[slot] = 0;
    }

    private boolean shouldBeRemovedFromCounter(T obj) {
        return computeTotalCount(obj) == 0;
    }

    /**
     * Remove any object from the counter whose total count is zero (to free up memory).
     */
    public void wipeZeros() {
        Set<T> objToBeRemoved = new HashSet<T>();
        for (T obj : objToCounts.keySet()) {
            if (shouldBeRemovedFromCounter(obj)) {
                objToBeRemoved.add(obj);
            }
        }
        for (T obj : objToBeRemoved) {
            objToCounts.remove(obj);
        }
    }
}

 

SlidingWindowCounter

SlidingWindowCounter只是对SlotBasedCounter做了进一步的封装, 通过headSlot和tailSlot提供sliding window的概念

incrementCount, 只能对headSlot进行increment, 其他slot作为窗口中的历史数据

核心的操作为, getCountsThenAdvanceWindow   
1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map   
2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间   
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口   
    advanceHead的实现, 如何在数组实现循环的滑动窗口

  

public final class SlidingWindowCounter<T> implements Serializable {

    private static final long serialVersionUID = -2645063988768785810L;

    private SlotBasedCounter<T> objCounter;
    private int headSlot;
    private int tailSlot;
    private int windowLengthInSlots;

    public SlidingWindowCounter(int windowLengthInSlots) {
        if (windowLengthInSlots < 2) {
            throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
                + windowLengthInSlots + ")");
        }
        this.windowLengthInSlots = windowLengthInSlots;
        this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);

        this.headSlot = 0;
        this.tailSlot = slotAfter(headSlot);
    }

    public void incrementCount(T obj) {
        objCounter.incrementCount(obj, headSlot);
    }

    /**
     * Return the current (total) counts of all tracked objects, then advance the window.
     * 
     * Whenever this method is called, we consider the counts of the current sliding window to be available to and
     * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
     * objects within the next "chunk" of the sliding window.
     * 
     * @return
     */
    public Map<T, Long> getCountsThenAdvanceWindow() {
        Map<T, Long> counts = objCounter.getCounts();
        objCounter.wipeZeros();
        objCounter.wipeSlot(tailSlot);
        advanceHead();
        return counts;
    }

    private void advanceHead() {
        headSlot = tailSlot;
        tailSlot = slotAfter(tailSlot);
    }

    private int slotAfter(int slot) {
        return (slot + 1) % windowLengthInSlots;
    }
}

  

 

IntermediateRankingsBolt

这个bolt作用就是对于中间结果的排序, 为什么要增加这步, 应为数据量比较大, 如果直接全放到一个节点上排序, 会负载太重   
所以先通过IntermediateRankingsBolt, 过滤掉一些   
这里仍然使用, 对于obj进行fieldsGrouping, 保证对于同一个obj, 不同时间段emit的统计数据会被发送到同一个task

IntermediateRankingsBolt继承自AbstractRankerBolt(参考下面)   
并实现了updateRankingsWithTuple,

  

void updateRankingsWithTuple(Tuple tuple) {
    Rankable rankable = RankableObjectWithFields.from(tuple);
    super.getRankings().updateWith(rankable);
}

  

逻辑很简单, 将Tuple转化Rankable, 并更新Rankings列表

  

参考AbstractRankerBolt, 该bolt会定时将Ranking列表emit出去

 

Rankable

Rankable除了继承Comparable接口, 还增加getObject()和getCount()接口

  

public interface Rankable extends Comparable<Rankable> {
    Object getObject();
    long getCount();
}

RankableObjectWithFields

RankableObjectWithFields实现Rankable接口   
1. 提供将Tuple转化为RankableObject   
Tuple由若干field组成, 第一个field作为obj, 第二个field作为count, 其余的都放到List<Object> otherFields中

2. 实现Rankable定义的getObject()和getCount()接口

3. 实现Comparable接口, 包含compareTo, equals

  

public class RankableObjectWithFields implements Rankable

  

public static RankableObjectWithFields from(Tuple tuple) {
    List<Object> otherFields = Lists.newArrayList(tuple.getValues());
    Object obj = otherFields.remove(0);
    Long count = (Long) otherFields.remove(0);
    return new RankableObjectWithFields(obj, count, otherFields.toArray());
}

Rankings

Rankings维护需要排序的List, 并提供对List相应的操作

核心的数据结构如下, 用来存储rankable对象的list   
List<Rankable> rankedItems = Lists.newArrayList();

提供一些简单的操作, 比如设置maxsize(list size), getRankings(返回rankedItems, 排序列表)

核心的操作是, 

  

public void updateWith(Rankable r) {
    addOrReplace(r);
    rerank();
    shrinkRankingsIfNeeded();
}

上一级的blot会定期的发送某个时间窗口的(obj, count), 所以obj之间的排序是在不断变化的 
1. 替换已有的, 或新增rankable对象(包含obj, count) 
2. 从新排序(Collections.sort) 
3. 由于只需要topN, 所以大于maxsize的需要删除 

AbstractRankerBolt

首先以TopN为参数, 创建Rankings对象

  

private final Rankings rankings;
public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
    count = topN;
    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
    rankings = new Rankings(count);
}

在execute中, 也是定时触发emit, 同样是通过emitFrequencyInSeconds来配置tickTuple   
一般情况, 只是使用updateRankingsWithTuple不断更新Rankings   
这里updateRankingsWithTuple是abstract函数, 需要子类重写具体的update逻辑

  

public final void execute(Tuple tuple, BasicOutputCollector collector) {
    if (TupleHelpers.isTickTuple(tuple)) {
        emitRankings(collector);
    }
    else {
        updateRankingsWithTuple(tuple);
    }
}

最终将整个rankings列表emit出去 
  

private void emitRankings(BasicOutputCollector collector) {
    collector.emit(new Values(rankings));
    getLogger().info("Rankings: " + rankings);
}

 

TotalRankingsBolt 

该bolt会使用globalGrouping, 意味着所有的数据都会被发送到同一个task进行最终的排序.   
TotalRankingsBolt同样继承自AbstractRankerBolt

  

void updateRankingsWithTuple(Tuple tuple) {
    Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
    super.getRankings().updateWith(rankingsToBeMerged);
}

唯一的不同是, 这里updateWith的参数是个rankable列表, 在Rankings里面的实现一样, 只是多了遍历 

最终可以得到, 全局的TopN的Rankings列表

© 著作权归作者所有

共有 人打赏支持
世吉
粉丝 1
博文 4
码字总数 4547
作品 0
东城
程序员
Storm 实现滑动窗口计数和TopN排序 【转】

计算top N words的topology, 用于比如trending topics or trending images on Twitter. 实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码 Topology 这是一个稍微复杂些的topolog...

泡海椒
2016/04/02
340
0
Kylin性能调优记——业务技术两手抓

本文由 网易云 发布 作者:冯宇 本篇文章仅限内部分享,如需转载,请联系网易获取授权。 背景 最近开始使用了新版本的Kylin,在此之前对于新版本的了解只是代码实现和一些简单的新功能测试,...

wangyiyungw
05/10
0
0
PostgreSQL count-min sketch top-n 概率计算插件 cms_topn (结合窗口实现同比、环比、滑窗分析等) - 流计算核心功能之一

标签 PostgreSQL , 概率计算 , pipelinedb , cms_topn , count-min sketch top-n 背景 概率计算是流式计算中比较重要的基础,PostgreSQL生态中的pipelinedb提供了诸多概率计算的功能模块。 ...

德哥
04/18
0
0
Storm[TOPN -排序] - RollingCountBolt

阅读背景: 1 : 您需要对滑动窗口要初步了解 2 : 您需要了解滑动窗口在滑动的过程之中,滑动chunk的计算过程,尤其是每发射一次,就需要清空一次。 package com.cc.storm.bolt; import java...

止静
2014/09/03
0
1
PHP-利用二叉堆实现TopK-算法

介绍 在以往工作或者面试的时候常会碰到一个问题,如何实现海量TopN,就是在一个非常大的结果集里面快速找到最大的前10或前100个数,同时要保证内存和速度的效率,我们可能第一个想法就是利用...

简单方式
2017/04/23
0
0
聊聊 Redis 使用场景

使用场景说明 计数器 数据统计的需求非常普遍,通过原子递增保持计数。例如,点赞数、收藏数、分享数等。 排行榜 排行榜按照得分进行排序,例如,展示最近、最热、点击率最高、活跃度最高等等...

小神神的大草原
2016/11/22
16
0
10000亿数据寻找 最大 或者最小 n个 数 各种算法比较

寻找最优的 TopN 算法 1 概要 在大量的数据记录中,依据某可排序的记录属性(一般为数字类型),找出最大的前 N 个记录,称为 TopN 问题。这是一个常常遇到的问题,也是一个比较简单的算法问...

颓废的幻想者
2013/10/23
0
0
Spark高级排序与TopN问题揭密

[TOC] 引入 前面进行过wordcount的单词统计例子,关键是,如何对统计的单词按照单词个数来进行排序? 如下: 下面的测试都需要引入maven的依赖 Spark二次排序 测试数据与说明 需要进行二次排...

xpleaf
04/28
0
0
Over:窗口函数(滑动聚合)

Over 窗口函数在Select 子句中,对查询的结果集进行“滑动-聚合”运算;如果使用count,那么基于滑动窗口的聚合语义同 base+1 累加;如果使用sum,那么基于滑动窗口的聚合语义等同于数据累加...

长征6号
2016/07/18
0
0
PostgreSQL 实践 - 实时广告位推荐 1 (任意字段组合、任意维度组合搜索、输出TOP-K)

标签 PostgreSQL , gin , 倒排 , rum , ginfuzzysearch_limit , 随机采样 , 分区索引 , 分段索引 , score分段 背景 店铺,广告推荐位,自动计算,高效检索,高效更新。 根据:本店、全网用户...

德哥
05/06
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

如何把你的Linux系统变的更加安全

做为一个小白,以为自己懂了点Linux知识,会搭建Linux各种服务就觉得自己牛的不要要的。在我们团队里面,我将使用了一台破电脑搭建Linux服务器,上面跑着Ftp服务存放着资源,ssh服务可以远程...

问题终结者
刚刚
0
0
lombok的使用和原理

一、项目背景 在写Java程序的时候经常会遇到如下情形: 新建了一个Class类,然后在其中设置了几个字段,最后还需要花费很多时间来建立getter和setter方法 lombok项目的产生就是为了省去我们手...

颖辉小居
刚刚
0
0
rsync至服务同步-系统日志-screen

rsync: 服务同步;配置文件:/etc/rsyncd.conf 默认端口:873 服务启动:rsync --daemon rsync -av /root/1.txt 192.168.1.2::test/2.txt (test为模块名称) /etc/rsync.conf配置样例: #指定...

ZHENG-JY
2分钟前
0
0
读取文件中内容转换成字符串

package com.lieni.ruyu.api.xmlTool; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.Unsuppo......

newdeng
3分钟前
0
0
《PHP和MySQL Web 开发》 第8章 设计Web数据库

LCL WARNING 这是我学习《PHP和MySQL Web 开发》的读书笔记,一些重要的知识点我会记录下来,当然只会写我觉得重要的。 如果有幸有人看到这个学习笔记了,你要结合着书看,不要光看这个笔记。...

十万猛虎下画山
10分钟前
0
0
Spring+jpaNo transactional EntityManager available

TransactionRequiredException: No transactional EntityManager availableEntityManager执行以下方法(refresh, persist, flush, joinTransaction, remove, merge) 都需要需要事务i......

wpfc
10分钟前
0
0
八幅漫画理解使用JSON Web Token设计单点登录系统

八幅漫画理解使用JSON Web Token设计单点登录系统 Sep 07, 2015 in Engineering 上次在《JSON Web Token - 在Web应用间安全地传递信息》中我提到了JSON Web Token可以用来设计单点登录系统。...

祖冲之
12分钟前
0
0
Spring框架中的设计模式(三)

Spring框架中的设计模式(三) 原创: 瑞查德-Jack 在之前的两篇文章中,我们看到了一些在Spring框架中实现的设计模式。这一次我们会发现这个流行框架使用的3种新模式。 本文将从描述两个创意...

瑞查德-Jack
15分钟前
1
0
[MicroPython]TPYBoard智能小车“飞奔的TPYBoard装甲一号”

智能小车作为现代的新发明,是以后的发展方向,他可以按照预先设定的模式在一个环境里自动的运作,不需要人为的管理,可应用于科学勘探等等的用途。智能小车能够实时显示时间、速度、里程,具...

bodasisiter
17分钟前
0
0
桌面虚拟化VDI(Virtual Desktop Infrastructure)

为了保证员工(客户)不把公司的资料复制、传输给别人。可以把员工平时办公放在服务器上做。所以使用桌面虚拟化。就是把一个服务器虚拟出很多桌面系统(如:windows)。 桌面虚拟化最大的优势...

王坤charlie
25分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部