文档章节

Storm[TOPN -排序] - RollingCountBolt

止静
 止静
发布于 2014/09/03 18:01
字数 960
阅读 1646
收藏 4


阅读背景:

   1 : 您需要对滑动窗口要初步了解

   2  :   您需要了解滑动窗口在滑动的过程之中,滑动chunk的计算过程,尤其是每发射一次,就需要清空一次。

package com.cc.storm.bolt;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * 1 在这里我们需要去实现一个滑动窗口,请注意,在我们实现滑动窗口的过程之中清空的是当前滑动窗口的下一个
 * 
 * 
 * 
 * @author Yin Shuai
 * 
 */
public class RollingCountBolt implements IRichBolt {

	private static final long serialVersionUID = 1765379339552134320L;

	private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>();
	private int _numBuckets;
	private transient Thread cleaner;
	private OutputCollector _collector;

	/**
	 * _trackMinute
	 * 是我们整个滑动窗口的大小,滑动窗口的大小,本质上决定了我们的时间区间,也就是说,假设我们目前滑动窗口的总体大小为15分钟。
	 * 那我们的商品点击的实时排序的指标值,好比商品浏览量的计算值,也就是15分钟
	 * 
	 * 而单个窗口的大小也就是我,我们这个三十分钟在随着时间不断的在推移
	 * 
	 * 举例说明:在最初的构造过程之中,如果我们的桶的数目为10,那么单个窗口的时间长度为3.
	 * 
	 * [0,30],[3,33],[6,36],[9,39],[12,42] 统计的数值处在不断的变化之中
	 * 
	 */
	private int _trackMinutes;

	public RollingCountBolt(int numBuckets, int trackMinutes) {
		this._numBuckets = numBuckets;
		this._trackMinutes = trackMinutes;
	}

	public long totalObjects(Object obj) {
		long[] curr = _objectCounts.get(obj);
		long total = 0;
		for (long l : curr) {
			total += l;
		}
		return total;
	}

	public int currentBucket(int buckets) {
		return currentSecond() / secondsPerBucket(buckets) % buckets;
	}

	public int currentSecond() {
		return (int) (System.currentTimeMillis() / 1000);
	}

	/**
	 * 
	 * @param buckets
	 *            你设定的桶的数量
	 * @return 依据我们默认的_trackMinutes / buckets 得到每一个桶的数量
	 */
	public int secondsPerBucket(int buckets) {
		return _trackMinutes * 60 / buckets;
	}

	public long millisPerBucket(int buckets) {
		return (long) 1000 * secondsPerBucket(buckets);
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		// TODO Auto-generated method stub
		_collector = collector;
		cleaner = new Thread(new Runnable() {

			@SuppressWarnings("unchecked")
			@Override
			public void run() {
				// TODO Auto-generated method stub
				int lastBucket = currentBucket(_numBuckets);

				while (true) {

					int currBucket = currentBucket(_numBuckets);
					p("线程while循环: 当前的桶为:" + currBucket);

					if (currBucket != lastBucket) {
						p("线程while循环:之前的桶数为:" + lastBucket);

						int bucketToWipe = (currBucket + 1) % _numBuckets;
						p("线程while循环:要擦除掉的桶为:" + bucketToWipe);

						synchronized (_objectCounts) {
							Set objs = new HashSet(_objectCounts.keySet());

							for (Object obj : objs) {

								long[] counts = _objectCounts.get(obj);
								long currBucketVal = counts[bucketToWipe];
								p("线程while循环:擦除掉的值为:" + currBucketVal);
								counts[bucketToWipe] = 0;
								long total = totalObjects(obj);
								if (currBucketVal != 0) {
									p("线程while循环:擦除掉的值为不为0:那就发射数据:obj total"
											+ obj + ":" + total);
									_collector.emit(new Values(obj, total));

								}
								if (total == 0) {

									p("线程while循环: 总数为0以后,将obj对象删除");
									_objectCounts.remove(obj);

								}
							}
						}
						lastBucket = currBucket;
					}

					long delta = millisPerBucket(_numBuckets)
							- (System.currentTimeMillis() % millisPerBucket(_numBuckets));
					Utils.sleep(delta);

					p("\n");
				}
			}
		});
		cleaner.start();
	}

	@Override
	public void execute(Tuple input) {

		Object obj1 = input.getValue(0);
		Object obj = input.getValue(1);

		int currentBucket = currentBucket(_numBuckets);

		p("execute方法:当前桶:bucket: " + currentBucket);

		synchronized (_objectCounts) {

			long[] curr = _objectCounts.get(obj);

			if (curr == null) {
				curr = new long[_numBuckets];
				_objectCounts.put(obj, curr);
			}

			curr[currentBucket]++;

			System.err
					.print(("execute方法:接受到的merchandiseIDS:" + obj.toString() + ",long数组:"));

			for (long number : curr) {
				System.err.print(number + ":");
			}

			p("execute方法:发射的数据: " + obj + ":" + totalObjects(obj));

			/**
			 * 我们不断的发射的也就是我们某一个商品id,在当前滑动窗口,也就是我们的时间周期内的指标计算值
			 * 要注意,在排序的过程之中,我们只针对key, 也就是我们的商品id,由此发射给后续的排序bolt依据包含了时间区间的信息
			 */

			// 每来一条数据,就会发射一次
			_collector.emit(new Values(obj, totalObjects(obj)));
			_collector.ack(input);
		}

		p("\n");
	}

	@Override
	public void cleanup() {
		// TODO Auto-generated method stub

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("merchandiseID", "count"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void p(Object o) {
		System.err.println(o.toString());
	}
}

  在这里,最需要我们关注的地方是,滑动窗口每滑动一次,将情况一组数据。 而发射数据的过程之中将统计这一组数

据。


© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
加载中

评论(1)

kt431128
kt431128
能给个QQ 不?
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
789
1
storm启动卡着不动,求高手

刚把storm集群部好,结果启动时卡着不动。 执行:./storm nimbus 提示: [root@WY_CentOS_100G bin]# ./storm nimbus Running: java -server -Dstorm.options= -Dstorm.home=/usr/local/src......

lja
2013/07/11
2.2K
1
Apache Slider + Storm

Apache Slider + Storm 系统环境 安装如下组件,部署可用环境 JDK 1.7.0_79 Apache Zookeeper 3.4.* Apache Zookeeper Apache Hadoop 2.6.* Apache Hadoop Apache Storm 0.9.4 Apache Storm......

Yulong_
2016/09/21
504
0
Storm实践3【TopN封装版】 - 1: 简要介绍

阅读前提:在阅读之前务必请先参看本ID的 Storm实践系列2 阅读目的 : 优化之前的封装 最新版本的Storm TOPN 整体的构造如下: 依然包含如下的几个package: 1 Spout: 模拟发射数据 2 bolt :...

止静
2014/09/13
300
1
storm client command

最近在研究实时日志分析,storm确实不错,以下是命令参数: storm help Syntax: storm jar topology-jar-path class 运行jar包中类的主函数和指定的参数 Commands: activate storm activate ...

China_OS
2014/02/22
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

2020,向死而生

或许2020年注定是非常艰难的一年,毕竟两个轮回前之的1900年,清廷过得也很艰难,义和团在北方闹得轰轰烈烈,紫禁城也被八国联军占领。次年(1901年)即签订了后世所谓的丧权辱国的辛丑条约,...

嘉树
30分钟前
5
0
git 常用配置

git config --global core.compression 0 git config --global http.lowSpeedLimit 0 git config --global http.lowSpeedTime 999999 git config --global http.postBuffer 524288000......

老码农008
30分钟前
6
0
Protel99SE WIN10系统下无法添加封装库的解决方法

Protel99SE WIN10系统下无法添加封装库的解决方法 Protel99SE这款PCB设计软件实在太古老了,导致与微软的最新操作系统有些功能不能兼容,比如WIN10系统下无法添加封装库;但是由于Protel99S...

demyar
32分钟前
3
0
大数据风控系统概述

为什么要做风控系统 不做的话,会有以下风险: 各种小号、垃圾账号泛滥 撞库攻击、盗号、毁号、拖库等 拉新 10w 留存率不到 5% 百万营销费用,却增加不了用户粘性 投票票数差距非常悬殊 各种...

大数据技术进阶
32分钟前
5
0
串口调试助手,VB6.0开发

1、为什么要自己开发一个串口调试助手 通常我们都是:在网上直接下载一个串口助手,可执行文件,直接使用,并无法得到其源码,在此我们提供了一个VB6.0开发的串口助手: (1)让你极速掌握串...

superman150
35分钟前
12
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部