文档章节

   Storm【实践系列-如何写一个爬虫- Metric 系列】 - 2 Librato

止静
 止静
发布于 2014/08/21 16:57
字数 598
阅读 500
收藏 0

   

本章主题: 辐射性质介绍一个Librato的Metric度量的实现

package com.digitalpebble.storm.crawler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;

import com.librato.metrics.HttpPoster;
import com.librato.metrics.HttpPoster.Response;
import com.librato.metrics.LibratoBatch;
import com.librato.metrics.NingHttpPoster;
import com.librato.metrics.Sanitizer;
import com.librato.metrics.Versions;

/** Sends the metrics to Librato **/
public class LibratoMetricsConsumer implements IMetricsConsumer {

	public static final int DEFAULT_BATCH_SIZE = 500;
	private static final Logger LOG = LoggerFactory
			.getLogger(LibratoMetricsConsumer.class);
	private static final String LIB_VERSION = Versions.getVersion(
			"META-INF/maven/com.librato.metrics/librato-java/pom.properties",
			LibratoBatch.class);
	private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

	private final Sanitizer sanitizer = new Sanitizer() {
		public String apply(String name) {
			return Sanitizer.LAST_PASS.apply(name);
		}
	};

	private int postBatchSize = DEFAULT_BATCH_SIZE;
	private long timeout = 30;
	private final TimeUnit timeoutUnit = TimeUnit.SECONDS;
	private String userAgent = null;
	private HttpPoster httpPoster;

	private Set<String> metricsToKeep = new HashSet<String>();

	public void prepare(Map stormConf, Object registrationArgument,
			TopologyContext context, IErrorReporter errorReporter) {

		// TODO configure timeouts
		// this.timeout = timeout;
		// this.timeoutUnit = timeoutUnit;
		// this.postBatchSize = postBatchSize;

		String agentIdentifier = (String) stormConf.get("librato.agent");
		if (agentIdentifier == null)
			agentIdentifier = "storm";

		String token = (String) stormConf.get("librato.token");

		String username = (String) stormConf.get("librato.username");

		String apiUrl = (String) stormConf.get("librato.api.url");

		if (apiUrl == null)
			apiUrl = "https://metrics-api.librato.com/v1/metrics";

		// check that the values are not null
		if (StringUtils.isBlank(token))
			throw new RuntimeException("librato.token not set");

		if (StringUtils.isBlank(username))
			throw new RuntimeException("librato.username not set");

		this.userAgent = String.format("%s librato-java/%s", agentIdentifier,
				LIB_VERSION);

		this.httpPoster = NingHttpPoster.newPoster(username, token, apiUrl);

		// get the list of metrics names to keep if any
		String metrics2keep = (String) stormConf.get("librato.metrics.to.keep");

		if (metrics2keep != null) {
			String[] mets = metrics2keep.split(",");
			for (String m : mets)
				metricsToKeep.add(m.trim().toLowerCase());
		}

	}

	// post(String source, long epoch)
	public void handleDataPoints(TaskInfo taskInfo,
			Collection<DataPoint> dataPoints) {

		final Map<String, Object> payloadMap = new HashMap<String, Object>();

		payloadMap.put("source", taskInfo.srcComponentId + "_"
				+ taskInfo.srcWorkerHost + "_" + taskInfo.srcTaskId);
		payloadMap.put("measure_time", taskInfo.timestamp);

		final List<Map<String, Object>> gaugeData = new ArrayList<Map<String, Object>>();
		final List<Map<String, Object>> counterData = new ArrayList<Map<String, Object>>();

		int counter = 0;

		final Iterator<DataPoint> datapointsIterator = dataPoints.iterator();
		while (datapointsIterator.hasNext()) {
			final DataPoint dataPoint = datapointsIterator.next();

			// ignore datapoint with a value which is not a map
			if (!(dataPoint.value instanceof Map))
				continue;

			// a counter or a gauge
			// convention if its name contains '_counter'
			// then treat it as a counter
			boolean isCounter = false;

			if (dataPoint.name.contains("_counter")) {
				isCounter = true;
				dataPoint.name = dataPoint.name.replaceFirst("_counter", "");
			}

			if (!metricsToKeep.isEmpty()) {
				if (!metricsToKeep.contains(dataPoint.name.toLowerCase())) {
					continue;
				}
			}

			try {
				Map<String, Number> metric = (Map<String, Number>) dataPoint.value;
				for (Map.Entry<String, Number> entry : metric.entrySet()) {
					String metricId = entry.getKey();
					Number val = entry.getValue();

					final Map<String, Object> data = new HashMap<String, Object>();
					data.put("name",
							sanitizer.apply(dataPoint.name + "_" + metricId));
					data.put("value", val);

					if (isCounter)
						counterData.add(data);
					else
						// use as gauge
						gaugeData.add(data);

					counter++;
					if (counter % postBatchSize == 0
							|| (!datapointsIterator.hasNext() && (!counterData
									.isEmpty() || !gaugeData.isEmpty()))) {
						final String countersKey = "counters";
						final String gaugesKey = "gauges";

						payloadMap.put(countersKey, counterData);
						payloadMap.put(gaugesKey, gaugeData);
						postPortion(payloadMap);
						payloadMap.remove(gaugesKey);
						payloadMap.remove(countersKey);
						gaugeData.clear();
						counterData.clear();
					}
				}
			} catch (RuntimeException e) {
				LOG.error(e.getMessage());
			}

		}
		LOG.debug("Posted {} measurements", counter);

	}

	public void cleanup() {

	}

	private void postPortion(Map<String, Object> chunk) {
		try {
			final String payload = OBJECT_MAPPER.writeValueAsString(chunk);
			final Future<Response> future = httpPoster.post(userAgent, payload);
			final Response response = future.get(timeout, timeoutUnit);
			final int statusCode = response.getStatusCode();
			if (statusCode < 200 || statusCode >= 300) {
				LOG.error(
						"Received an error from Librato API. Code : {}, Message: {}",
						statusCode, response.getBody());
			}
		} catch (Exception e) {
			LOG.error("Unable to post to Librato API", e);
		}
	}
}


   Metirc代码带有一点介绍的性质。

© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
Storm【实践系列-如何写一个爬虫-】6 URLInjector

介绍: URLInjector,我封装了的一个简单的客户端,我们将要放URLs,到一个分片队列里面 【sharded queue】,只有放置到分片队列的数据才会被Storm的管线所处理。 package com.digitalpebbl...

止静
2014/08/21
117
0
Storm【实践系列-如何写一个爬虫】 - ParserBolt

阅读背景: 如果您对爬虫,或则web前端不够了解,请自行google。 代码前提:您需要参阅本ID 所写的前面两篇博文: Storm【实践系列-如何写一个爬虫】 - Fetcher 本章主题: ParserBolt 如何完...

止静
2014/08/18
1K
0
Storm入门 第三章 Storm安装部署步骤

本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出。 3.1 Storm集群组件 Storm集群中包含...

坏坏一笑
2014/12/03
114
0
Storm【实践系列-如何写一个爬虫4】 - ParserBolt

代码精要: 1 : 首先,在新版本Storm0.92的功能中加入了Storm Metric的度量 2 :其次:在真个抽象的过程中,把具体的业务处理过程抽象为handler,其中包括了: 2.1:LinkContentHandler 2....

止静
2014/08/21
132
0
Redis 系列 - 1 【简要介绍】 - 它是什么?它用来做什么?它的优势与短板如何?

1.0 前提 阅读目的: 对什么是内存型数据库有概念性的认知。? 阅读需知:有关Redis系列的博文,大致有以下的5篇博文。本ID将陆续补充。 本ID Redis系列 1 . 0 :首先,这是您在Redis的官方网...

止静
2014/09/18
6.6K
1

没有更多内容

加载失败,请刷新页面

加载更多

深入理解JVM - 类加载机制

类加载过程 一个类型从被加载到虚拟机内存中开始,到卸载出内存为止,它的整个生命周期将会经历加载(Loading)、验证(Verification)、准备(Preparation)、解析(Resolution)、初始化(...

xiaolyuh
12分钟前
57
0
脸盲症的小伙伴 测试下你的脸盲症程度

笔者在背单词的时候突然想到了一个问题,就是背单词的时候,相近的词容易混淆,例如:coast和roast,在我背诵的时候,我就很烦恼,不光是英文单词,还有汉字,例如“籍”和“藉“,我还是个中...

蛤蟆丸子
13分钟前
50
0
「网易官方」极客战记(codecombat)攻略-地牢-囚犯the-prisoner

解放囚犯,你会得到盟友。 简介 敬请期待! 默认代码 # 释放囚犯,击败守卫并夺取宝石。 # 从"Weak Door"后解救Patrick。 # 击败名为"Two"的守卫。 # 获得宝石。 概览 您可以按照名称 "Weak ...

极客战记
15分钟前
12
0
Final cut pro 10.4.4中文版本

1.双击打开dmg,点击红框图示 2.出现这个界面后直接回车 3直接将fcp拖拽到application文件夹 然后就可以直接打开了! 百度网盘地址:链接: https://pan.baidu.com/s/1Db9hXmzPV4EdR7_LxEqctA...

kylin_ink
16分钟前
32
0
jquery.validate

规则名称 类型 描述 required Boolean 设置该项内容为必填 remote Json|String 请求远程资源来校验内容有效性 minlength Number 设置内容的最少字符长度 maxlength Number 设置内容的最多字符...

愚蠢的土豆
16分钟前
129
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部