文档章节

Storm【开发实战】- 流方式的统计系统

止静
 止静
发布于 2014/06/27 16:03
字数 1670
阅读 832
收藏 3

1: 初期硬件准备:

                1 如果条件具备:请保证您安装好了 redis集群

                2 配置好您的Storm开发环境

                3 保证好您的开发环境的畅通: 主机与主机之间,Storm与redis之间


2:业务背景的介绍:

                1  在这里我们将模拟一个   流方式的数据处理过程

                 2 数据的源头保存在我们的redis 集群之中

                 3  发射的数据格式为: ip,url,client_key


数据发射器

package storm.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.tuple.Fields;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;

import java.util.Map;
import org.apache.log4j.Logger;

/**
 * click Spout 从redis中间读取所需要的数据
 */
public class ClickSpout extends BaseRichSpout {

	private static final long serialVersionUID = -6200450568987812474L;

	public static Logger LOG = Logger.getLogger(ClickSpout.class);

	// 对于redis,我们使用的是jedis客户端
	private Jedis jedis;

	// 主机
	private String host;

	// 端口
	private int port;

	// Spout 收集器
	private SpoutOutputCollector collector;

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

	
	        // 这里,我们发射的格式为
	        // IP,URL,CLIENT_KEY
		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
				storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));
	}

	@Override
	public void open(Map conf, TopologyContext topologyContext,
			SpoutOutputCollector spoutOutputCollector) {

		host = conf.get(Conf.REDIS_HOST_KEY).toString();
		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
		this.collector = spoutOutputCollector;
		connectToRedis();
	}

	private void connectToRedis() {
		jedis = new Jedis(host, port);
	}

	@Override
	public void nextTuple() {
		String content = jedis.rpop("count");
		if (content == null || "nil".equals(content)) {
			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
			}
		} else {

			// 将jedis对象 rpop出来的字符串解析为 json对象
			JSONObject obj = (JSONObject) JSONValue.parse(content);

			String ip = obj.get(storm.cookbook.Fields.IP).toString();
			String url = obj.get(storm.cookbook.Fields.URL).toString();
			String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)
					.toString();

			System.out.println("this is a clientKey");

			// List<Object> tuple对象
			collector.emit(new Values(ip, url, clientKey));
		}
	}
}


在这个过程之中,请注意:


1  我们在 OPEN 方法之中初始化   host,port,collector,以及Redis的连接,调用Connect方法并连接到redis数据库


2 我们在nextTupe 取出数据,并且将他转换为一个JSON对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个

Values对象


让我们来看看数据的流向图:

        

在我们的数据从clickSpout 读取以后,接下来,我们将采用2个bolt

                                    1  : repeatVisitBolt 

                                    2   :  geographyBolt 


共同来读取同一个数据源的数据:clickSpout


3 细细察看 repeatVisitBolt

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;

import java.util.Map;

public class RepeatVisitBolt extends BaseRichBolt {

	private OutputCollector collector;

	private Jedis jedis;
	private String host;
	private int port;

	@Override
	public void prepare(Map conf, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
		host = conf.get(Conf.REDIS_HOST_KEY).toString();
		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
		connectToRedis();
	}

	private void connectToRedis() {
		jedis = new Jedis(host, port);
		jedis.connect();
	}

	public boolean isConnected() {
		if (jedis == null)
			return false;
		return jedis.isConnected();
	}

	@Override
	public void execute(Tuple tuple) {

		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
		String clientKey = tuple
				.getStringByField(storm.cookbook.Fields.CLIENT_KEY);
		String url = tuple.getStringByField(storm.cookbook.Fields.URL);
		String key = url + ":" + clientKey;

		String value = jedis.get(key);
		
		// redis中取,如果redis中没有,就插入新的一条访问记录。
		if (value == null) {
			jedis.set(key, "visited");
			collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));
		} else {
			collector
					.emit(new Values(clientKey, url, Boolean.FALSE.toString()));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
				storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,
				storm.cookbook.Fields.UNIQUE));
	}
}


  在这里,我们把url 和 clientKey 组合成为 【url:clientKey】的格式组合,并依据这个对象,在redis中去查找,如果没有,那那Set到redis中间去,并且判定它为【unique】

4:

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class VisitStatsBolt extends BaseRichBolt {

    private OutputCollector collector;

    private int total = 0;
    private int uniqueCount = 0;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
    	
    	//在这里,我们在上游来判断这个Fields 是否是独特和唯一的
        boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));
        
        total++;
        if(unique)uniqueCount++;
        collector.emit(new Values(total,uniqueCount));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,
        		storm.cookbook.Fields.TOTAL_UNIQUE));
    }
}


第一次出现,uv ++ 


5  接下来,看看流水线2 :

package storm.bolt;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.json.simple.JSONObject;

import storm.cookbook.IPResolver;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use
 * File | Settings | File Templates.
 */
public class GeographyBolt extends BaseRichBolt {

	// ip解析器
	private IPResolver resolver;

	private OutputCollector collector;

	public GeographyBolt(IPResolver resolver) {
		this.resolver = resolver;
	}

	@Override
	public void prepare(Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
	}

	@Override
	public void execute(Tuple tuple) {

		// 1 从上级的目录之中拿到我们所要使用的ip
		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);

		// 将ip 转换为json
		JSONObject json = resolver.resolveIP(ip);

		// 将 city和country 组织成为一个新的元祖,在这里也就是我们的Values对象
		String city = (String) json.get(storm.cookbook.Fields.CITY);
		String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);

		collector.emit(new Values(country, city));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

		// 确定了我们这次输出元祖的格式
		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,
				storm.cookbook.Fields.CITY));
	}
}


以上Bolt,完成了一个Ip到 CITY,COUNTRY 的转换

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class GeoStatsBolt extends BaseRichBolt {

	private class CountryStats {

		//
		private int countryTotal = 0;

		private static final int COUNT_INDEX = 0;
		private static final int PERCENTAGE_INDEX = 1;
		private String countryName;

		public CountryStats(String countryName) {
			this.countryName = countryName;
		}

		private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>();

		/**
		 * @param cityName
		 */
		public void cityFound(String cityName) {
			countryTotal++;

			// 已经有了值,一个加1的操作
			if (cityStats.containsKey(cityName)) {
				cityStats.get(cityName)
						.set(COUNT_INDEX,
								cityStats.get(cityName).get(COUNT_INDEX)
										.intValue() + 1);
				// 没有值的时候
			} else {
				List<Integer> list = new LinkedList<Integer>();
				list.add(1);
				list.add(0);
				cityStats.put(cityName, list);
			}

			double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)
					/ (double) countryTotal;

			cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);

		}

		/**
		 * @return 拿到的国家总数
		 */
		public int getCountryTotal() {
			return countryTotal;
		}

		/**
		 * @param cityName  依据传入的城市名称,拿到城市总数
		 * @return
		 */

		public int getCityTotal(String cityName) {
			return cityStats.get(cityName).get(COUNT_INDEX).intValue();
		}

		
		public String toString() {
			return "Total Count for " + countryName + " is "
					+ Integer.toString(countryTotal) + "\n" + "Cities:  "
					+ cityStats.toString();
		}
	}

	private OutputCollector collector;

	// CountryStats 是一个内部类的对象
	private Map<String, CountryStats> stats = new HashMap<String, CountryStats>();

	@Override
	public void prepare(Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
	}

	@Override
	public void execute(Tuple tuple) {
		String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
		String city = tuple.getStringByField(storm.cookbook.Fields.CITY);

		// 如果国家不存在的时候,新增加一个国家,国家的统计
		if (!stats.containsKey(country)) {
			stats.put(country, new CountryStats(country));
		}

		// 这里拿到新的统计,cityFound 是拿到某个城市的值
		stats.get(country).cityFound(city);

		collector.emit(new Values(country,
				stats.get(country).getCountryTotal(), city, stats.get(country)
						.getCityTotal(city)));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
				storm.cookbook.Fields.COUNTRY,
				storm.cookbook.Fields.COUNTRY_TOTAL,
				storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL));
	}
}


有关地理位置的统计,附带上程序其他的使用类

package storm.cookbook;

/**
 */
public class Fields {

	public static final String IP = "ip";
	
	public static final String URL = "url";
	
	public static final String CLIENT_KEY = "clientKey";
	
	public static final String COUNTRY = "country";
	
	public static final String COUNTRY_NAME = "country_name";
	
	public static final String CITY = "city";
	
	//唯一的,独一无二的
	public static final String UNIQUE = "unique";
	
	//城镇整数
	public static final String COUNTRY_TOTAL = "countryTotal";
	
	//城市整数
	public static final String CITY_TOTAL = "cityTotal";
	
	//总共计数
	public static final String TOTAL_COUNT = "totalCount";
	
	//总共独一无二的
	public static final String TOTAL_UNIQUE = "totalUnique";




}
package storm.cookbook;

import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;

public class HttpIPResolver implements IPResolver, Serializable {

	static String url = "http://api.hostip.info/get_json.php";

	@Override
	public JSONObject resolveIP(String ip) {
		URL geoUrl = null;
		BufferedReader in = null;
		try {
			geoUrl = new URL(url + "?ip=" + ip);
			URLConnection connection = geoUrl.openConnection();
			in = new BufferedReader(new InputStreamReader(
					connection.getInputStream()));
			String inputLine;

			JSONObject json = (JSONObject) JSONValue.parse(in);

			in.close();

			return json;
		} catch (IOException e) {
			e.printStackTrace();
		} finally {

			// 每当in为空的时候我们不进行如下的close操作,只有在in不为空的时候进行close操作
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
				}
			}
		}
		return null;
	}
}


package storm.cookbook;

import org.json.simple.JSONObject;

/**
 * Created with IntelliJ IDEA.
 * User: admin
 * Date: 2012/12/07
 * Time: 5:29 PM
 * To change this template use File | Settings | File Templates.
 */
public interface IPResolver {

	public JSONObject resolveIP(String ip);
}


至此,整个流程完毕。 对于统计以后,数据如何持久,亦或是数据数据写回redis的过程,请实践~

© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
加载中

评论(1)

chuckpu
chuckpu
正好看看
使用 Twitter Storm 处理实时的大数据

使用 Twitter Storm 处理实时的大数据 流式处理大数据简介 IBM DW/M. Tim Jones, 独立作家, 顾问 简介: Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与...

IBMdW
2012/12/06
6.4K
3
Storm 并非完全适合所有实时应用

流数据的迅速崛起带来了一类全新的应用开发技术。为了应对不断增长的数据(如物联网和机器通信产生的大量数据),同时,利用实时个性化技术改进在线用户体验,越来越多的应用开发引入了流数据...

亚当李
2014/12/18
1.5W
9
Storm笔记整理(一):简介与设计思想

[TOC] 实时计算概述 有别于传统的离线批处理操作(对很多数据的集合进行的操作),实时处理,说白就是针对一条一条的数据/记录进行操作,所有的这些操作进行一个汇总(截止到目前为止的所有的统...

xpleaf
2018/04/12
0
0
Storm同步调用之DRPC模型探讨

  摘要:Storm的编程模型是一个有向无环图,决定了storm的spout接收到外部系统的请求后,spout并不能得到bolt的处理结果并将结果返回给外部请求。所以也就决定了storm无法提供对外部系统的同...

刘洋intsmaze
2017/09/28
0
0
hadoop、storm和spark的区别、比较

一、hadoop、Storm该选哪一个? 为了区别hadoop和Storm,该部分将回答如下问题:1.hadoop、Storm各是什么运算2.Storm为什么被称之为流式计算系统3.hadoop适合什么场景,什么情况下使用hadoo...

whoisliang
2018/07/02
60
0

没有更多内容

加载失败,请刷新页面

加载更多

MBTI助你成功,让你更了解你自己

MBTI助你成功,让你更了解你自己 生活总是一个七日接着又一个七日,相信看过第七日的小伙伴,很熟悉这段开场白,人生是一个测试接着又一个测试,上学的时候测试,是为了证明你的智力,可谓从...

蛤蟆丸子
今天
55
0
Android实现App版本自动更新

现在很多的App中都会有一个检查版本的功能。例如斗鱼TV App的设置界面下: 当我们点击检查更新的时候,就会向服务器发起版本检测的请求。一般的处理方式是:服务器返回的App版本与当前手机安...

shzwork
昨天
72
0
npm 发布webpack插件 webpack-html-cdn-plugin

初始化一个项目 npm init 切换到npm源 淘宝 npm config set registry https://registry.npm.taobao.org npm npm config set registry http://registry.npmjs.org 登录 npm login 登录状态......

阿豪boy
昨天
87
0
java基础(16)递归

一.说明 递归:方法内调用自己 public static void run1(){ //递归 run1(); } 二.入门: 三.执行流程: 四.无限循环:经常用 无限递归不要轻易使用,无限递归的终点是:栈内存溢出错误 五.递...

煌sir
昨天
63
0
REST接口设计规范总结

URI格式规范 URI中尽量使用连字符”-“代替下划线”_”的使用 URI中统一使用小写字母 URI中不要包含文件(脚本)的扩展名 URI命名规范 文档(Document)类型的资源用名词(短语)单数命名 集合(Co...

Treize
昨天
69
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部