文档章节

Storm实践2-【storm实时排序TopN】 - TOP10

止静
 止静
发布于 2014/09/03 15:45
字数 314
阅读 999
收藏 0


阅读背景:1 您需要了解TOP 使用的场景

                2 您需要了解当前的TOPN 处理,和定时区间处理的区别

 看代码说话

    

package com.cc.storm;

import com.cc.storm.bolt.MergeBolt;
import com.cc.storm.bolt.RankBolt;
import com.cc.storm.bolt.RollingAllCountBolt;
import com.cc.storm.bolt.RollingCountBolt;
import com.cc.storm.spout.RandomEmitSpout;
import com.cc.storm.spout.RedisPubSubSpout;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * ToPN是一种常见模式,是对流式数据进行“Streaming topN”的计算:
 * 比如要计算的是最近一段时间内的热门话题,热门点击图片,热门商品浏览,热门商品购买
 * 
 * 既然敢要实时的处理,【】【】【】【】【】[] 【】 【】【】【】【】 [] 【】【】【】【】 【】 []
 * 
 * @author Yin Shuai
 */
public class TOP10 {

	public static void main(String[] args) throws AlreadyAliveException,
			InvalidTopologyException, InterruptedException {

		final int TOP_N = 10;
		final int time = 1;

		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("$datasource$", new RandomEmitSpout(), 1);

		builder.setBolt("$count$", new RollingCountBolt(3, time), 1)
				.fieldsGrouping("$datasource$", new Fields("merchandiseIDS"));

		builder.setBolt("$rank$", new RankBolt(TOP_N), 2).fieldsGrouping(
				"$count$", new Fields("merchandiseID"));

		builder.setBolt("$merge$", new MergeBolt(TOP_N)).globalGrouping(
				"$rank$");

		Config conf = new Config();
		conf.setDebug(false);
		conf.setNumWorkers(2);
		conf.setMaxSpoutPending(5000);

		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("Getting-Started-Toplogie", conf,
				builder.createTopology());

		Thread.sleep(5000);
	}
}



  整个处理的流程如图:

   

© 著作权归作者所有

止静
粉丝 121
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
702
1
storm client command

最近在研究实时日志分析,storm确实不错,以下是命令参数: storm help Syntax: storm jar topology-jar-path class 运行jar包中类的主函数和指定的参数 Commands: activate storm activate ...

China_OS
2014/02/22
1K
0
Apache Storm 1.2.3 发布,分布式实时计算

Apache Storm 1.2.3 发布了,更新内容如下: 新特性 [STORM-3233] - zookeeper 客户端升级到最新版本 (3.4.13) 改进 [STORM-3077] - Disruptor 升级至 3.3.11 [STORM-3083] - HikariCP 升级至...

xplanet
07/20
817
0
Hadoop、storm和Spark Streaming简单介绍(非原创)

文章大纲 一、Hadoop是什么 二、storm是什么 三、Spark Streaming是什么 四、Spark与storm比较 五、参考文章 一、Hadoop是什么 1. 简介 Hadoop是一个由Apache基金会所开发的分布式系统基础架...

故事爱人
06/14
0
0
Apache Storm 0.9.7 发布,分布式实时计算

Apache Storm 0.9.7 发布了,Apache Storm 的前身是 Twitter Storm 平台,目前已经归于 Apache 基金会管辖。 Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像...

开源中国股侠
2016/09/08
796
1

没有更多内容

加载失败,请刷新页面

加载更多

川普给埃尔多安和内堪尼亚胡的信

任性 https://twitter.com/netanyahu/status/1186647558401253377 https://edition.cnn.com/2019/10/16/politics/trump-erdogan-letter/index.htm...

Iridium
21分钟前
10
0
golang-mysql-原生

db.go package mainimport ("database/sql""time"_ "github.com/go-sql-driver/mysql")var (db *sql.DBdsn = "root:123456@tcp(127.0.0.1:3306)/test?charset=u......

李琼涛
49分钟前
5
0
编程作业20191021092341

1编写一个程序,把用分钟表示的时间转换成用小时和分钟表示的时 间。使用#define或const创建一个表示60的符号常量或const变量。通过while 循环让用户重复输入值,直到用户输入小于或等于0的值...

1李嘉焘1
50分钟前
7
0
Netty整合Protobuffer

现在我们都知道,rpc的三要素:IO模型,线程模型,然后就是数据交互模型,即我们说的序列化和反序列化,现在我们来看一下压缩比率最大的二进制序列化方式——Protobuffer,而且该方式是可以跨...

算法之名
55分钟前
18
0
如何用C++实现栈

栈的定义 栈(stack)又名堆栈,它是一种运算受限的线性表。限定仅在表尾进行插入和删除操作的线性表。这一端被称为栈顶,相对地,把另一端称为栈底。向一个栈插入新元素又称作进栈、入栈或压...

BWH_Steven
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部