文档章节

OpenTSDB的读写API

林中漫步
 林中漫步
发布于 2016/06/26 23:05
字数 1957
阅读 6.4K
收藏 5

行业解决方案、产品招募中!想赚钱就来传!>>>

      OpenTSDB提供三种方式的读写操作:telnet、http、post,但官方并没提供JAVA版的API。

      多亏有开源贡献者“shifeng258”,他用java编写了 opentsdb-client ,才使得我能对openTSDB的读写操作进行封装,从而分享至此:

import org.opentsdb.client.ExpectResponse;
import org.opentsdb.client.HttpClient;
import org.opentsdb.client.HttpClientImpl;
import org.opentsdb.client.builder.MetricBuilder;
import org.opentsdb.client.request.Filter;
import org.opentsdb.client.request.Query;
import org.opentsdb.client.request.QueryBuilder;
import org.opentsdb.client.request.SubQueries;
import org.opentsdb.client.response.Response;
import org.opentsdb.client.response.SimpleHttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

/**
 * Opentsdb读写工具类
 * <p/>
 */
public class OpentsdbClient {

    private static Logger log = LoggerFactory.getLogger(OpentsdbClient.class);


    /**
     * tagv的过滤规则: 精确匹配多项迭代值,多项迭代值以'|'分隔,大小写敏感
     */
    public static String FILTER_TYPE_LITERAL_OR = "literal_or";

    /**
     * tagv的过滤规则: 通配符匹配,大小写敏感
     */
    public static String FILTER_TYPE_WILDCARD = "wildcard";

    /**
     * tagv的过滤规则: 正则表达式匹配
     */
    public static String FILTER_TYPE_REGEXP = "regexp";


    /**
     * tagv的过滤规则: 精确匹配多项迭代值,多项迭代值以'|'分隔,忽略大小写
     */
    public static String FILTER_TYPE_ILITERAL_OR = "iliteral_or";


    /**
     * tagv的过滤规则: 通配符匹配,忽略大小写
     */
    public static String FILTER_TYPE_IWILDCARD = "iwildcard";


    /**
     * tagv的过滤规则: 通配符取非匹配,大小写敏感
     */
    public static String FILTER_TYPE_NOT_LITERAL_OR = "not_literal_or";

    /**
     * tagv的过滤规则: 通配符取非匹配,忽略大小写
     */
    public static String FILTER_TYPE_NOT_ILITERAL_OR = "not_iliteral_or";


    /**
     * tagv的过滤规则:
     * <p/>
     * Skips any time series with the given tag key, regardless of the value.
     * This can be useful for situations where a metric has inconsistent tag sets.
     * NOTE: The filter value must be null or an empty string
     */
    public static String FILTER_TYPE_NOT_KEY = "not_key";


    private HttpClient httpClient;


    public OpentsdbClient() {
        this.httpClient = new HttpClientImpl(ConfigLoader.getProperty("opentsdb.url"));
    }

    public OpentsdbClient(String opentsdbUrl) {
        this.httpClient = new HttpClientImpl(opentsdbUrl);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, Date timestamp, Long value, Map<String, String> tagMap) throws Exception {
        long timsSecs = timestamp.getTime();
        return this.putData(metric, timsSecs, value, tagMap);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, Date timestamp, Double value, Map<String, String> tagMap) throws Exception {
        long timsSecs = timestamp.getTime();
        return this.putData(metric, timsSecs, value, tagMap);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 转化为秒的时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, long timestamp, Long value, Map<String, String> tagMap) throws Exception {
        MetricBuilder builder = MetricBuilder.getInstance();
        builder.addMetric(metric).setDataPoint(timestamp, value).addTags(tagMap);
        try {
            log.debug("write quest:{}", builder.build());
            Response response = httpClient.pushMetrics(builder, ExpectResponse.SUMMARY);
            log.debug("response.statusCode: {}", response.getStatusCode());
            return response.isSuccess();
        } catch (Exception e) {
            log.error("put data to opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 转化为秒的时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, long timestamp, Double value, Map<String, String> tagMap) throws Exception {
        MetricBuilder builder = MetricBuilder.getInstance();
        builder.addMetric(metric).setDataPoint(timestamp, value).addTags(tagMap);
        try {
            log.debug("write quest:{}", builder.build());
            Response response = httpClient.pushMetrics(builder, ExpectResponse.SUMMARY);
            log.debug("response.statusCode: {}", response.getStatusCode());
            return response.isSuccess();
        } catch (Exception e) {
            log.error("put data to opentsdb error: ", e);
            throw e;
        }
    }



    /**
     * 查询数据,返回的数据为json格式,结构为:
     * "[
     * "  {
     * "    metric: mysql.innodb.row_lock_time,
     * "    tags: {
     * "      host: web01,
     * "      dc: beijing
     * "    },
     * "    aggregateTags: [],
     * "    dps: {
     * "      1435716527: 1234,
     * "      1435716529: 2345
     * "    }
     * "  },
     * "  {
     * "    metric: mysql.innodb.row_lock_time,
     * "    tags: {
     * "      host: web02,
     * "      dc: beijing
     * "    },
     * "    aggregateTags: [],
     * "    dps: {
     * "      1435716627: 3456
     * "    }
     * "  }
     * "]";
     *
     * @param metric     要查询的指标
     * @param tagk       tagk
     * @param tagvFtype  tagv的过滤规则
     * @param tagvFilter tagv的匹配字符
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @return
     */
    public String getData(String metric, String tagk, String tagvFtype, String tagvFilter, String aggregator, String downsample,
                          Date startTime, Date endTime) throws IOException {

        QueryBuilder queryBuilder = QueryBuilder.getInstance();
        Query query = queryBuilder.getQuery();

        query.setStart(startTime.getTime() / 1000);
        query.setEnd(endTime.getTime() / 1000);

        List<SubQueries> sqList = new ArrayList<SubQueries>();
        SubQueries sq = new SubQueries();
        sq.setMetric(metric);
        sq.setAggregator(aggregator);

        List<Filter> filters = new ArrayList<Filter>();
        Filter filter = new Filter();
        filter.setTagk(tagk);
        filter.setType(tagvFtype);
        filter.setFilter(tagvFilter);
        filter.setGroupBy(Boolean.TRUE);
        filters.add(filter);

        sq.setFilters(filters);

        sq.setDownsample(downsample + "-" + aggregator);

        sqList.add(sq);

        query.setQueries(sqList);

        try {
            log.debug("query request:{}", queryBuilder.build()); //这行起到校验作用
            SimpleHttpResponse spHttpResponse = httpClient.pushQueries(queryBuilder, ExpectResponse.DETAIL);
            log.debug("response.content: {}", spHttpResponse.getContent());

            if (spHttpResponse.isSuccess()) {
                return spHttpResponse.getContent();
            }
            return null;
        } catch (IOException e) {
            log.error("get data from opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 查询数据,返回的数据为json格式。
     *
     * @param metric     要查询的指标
     * @param filters     查询过滤的条件, 原来使用的tags在v2.2后已不适用
     *                   filter.setType(): 设置过滤类型, 如: wildcard, regexp
     *                   filter.setTagk(): 设置tag
     *                   filter.setFilter(): 根据type设置tagv的过滤表达式, 如: hqdApp|hqdWechat
     *                   filter.setGroupBy():设置成true, 不设置或设置成false会导致读超时
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     */
    public String getData(String metric, List<Filter> filters, String aggregator, String downsample,
                          Date startTime, Date endTime) throws IOException {

        QueryBuilder queryBuilder = QueryBuilder.getInstance();
        Query query = queryBuilder.getQuery();

        query.setStart(startTime.getTime() / 1000);
        query.setEnd(endTime.getTime() / 1000);

        List<SubQueries> sqList = new ArrayList<SubQueries>();
        SubQueries sq = new SubQueries();
        sq.addMetric(metric);
        sq.addAggregator(aggregator);

        sq.setFilters(filters);

        sq.setDownsample(downsample + "-" + aggregator);
        sqList.add(sq);

        query.setQueries(sqList);

        try {
            log.info("query request:{}", queryBuilder.build()); //这行起到校验作用
            SimpleHttpResponse spHttpResponse = httpClient.pushQueries(queryBuilder, ExpectResponse.DETAIL);
            log.info("response.content: {}", spHttpResponse.getContent());

            if (spHttpResponse.isSuccess()) {
                return spHttpResponse.getContent();
            }
            return null;
        } catch (IOException e) {
            log.error("get data from opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 查询数据,返回tags与时序值的映射: Map<tags, Map<时间点, value>>
     *
     * @param metric     要查询的指标
     * @param tagk       tagk
     * @param tagvFtype  tagv的过滤规则
     * @param tagvFilter tagv的匹配字符
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @param retTimeFmt 返回的结果集中,时间点的格式, 如:yyyy-MM-dd HH:mm:ss 或 yyyyMMddHH 等
     * @return Map<tags, Map<时间点, value>>
     * @throws java.io.IOException
     */
    public Map<String, Map<String, Object>> getData(String metric, String tagk, String tagvFtype, String tagvFilter, String aggregator, String downsample,
                                                    Date startTime, Date endTime, String retTimeFmt) throws IOException {
        String resContent = this.getData(metric, tagk, tagvFtype, tagvFilter, aggregator, downsample, startTime, endTime);
        return this.convertContentToMap(resContent, retTimeFmt);
    }

    /**
     * 查询数据,返回tags与时序值的映射: Map<tags, Map<时间点, value>>
     *
     * @param metric     要查询的指标
     * @param filters     查询过滤的条件, 原来使用的tags在v2.2后已不适用
     *                   filter.setType(): 设置过滤类型, 如: wildcard, regexp
     *                   filter.setTagk(): 设置tag
     *                   filter.setFilter(): 根据type设置tagv的过滤表达式, 如: hqdApp|hqdWechat
     *                   filter.setGroupBy():设置成true, 不设置或设置成false会导致读超时
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @param retTimeFmt 返回的结果集中,时间点的格式, 如:yyyy-MM-dd HH:mm:ss 或 yyyyMMddHH 等
     * @return Map<tags, Map<时间点, value>>
     */
    public Map<String, Map<String, Object>> getData(String metric, List<Filter> filters, String aggregator, String downsample,
                                                    Date startTime, Date endTime, String retTimeFmt) throws IOException {
        String resContent = this.getData(metric, filters, aggregator, downsample, startTime, endTime);
        return this.convertContentToMap(resContent, retTimeFmt);
    }


    public Map<String, Map<String, Object>> convertContentToMap(String resContent, String retTimeFmt) {

        // Map<tags, Map<时间点, value>>
        Map<String, Map<String, Object>> tagsValuesMap = new HashMap<String, Map<String, Object>>();

        if (resContent == null || "".equals(resContent.trim())) {
            return tagsValuesMap;
        }

        JSONArray array = (JSONArray) JSONObject.parse(resContent);
        if (array != null) {
            for (int i = 0; i < array.size(); i++) {
                JSONObject obj = (JSONObject) array.get(i);
                JSONObject tags = (JSONObject) obj.get("tags");
                JSONObject dps = (JSONObject) obj.get("dps");

                //timeValueMap.putAll(dps);
                Map<String, Object> timeValueMap = new HashMap<String, Object>();
                for (Iterator<String> it = dps.keySet().iterator(); it.hasNext(); ) {
                    String timstamp = it.next();
                    Date datetime = new Date(Long.parseLong(timstamp) * 1000);
                    timeValueMap.put(DateTimeUtil.format(datetime, retTimeFmt), dps.get(timstamp));
                }
                tagsValuesMap.put(tags.toString(), timeValueMap);
            }
        }
        return tagsValuesMap;
    }
}

下而是测试类:

mport org.junit.Test;
import org.opentsdb.client.request.Filter;
import org.opentsdb.client.util.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;


public class OpentsdbClientTest {

    private static Logger log = LoggerFactory.getLogger(OpentsdbClientTest.class);

    @Test
    public void testPutData() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Map<String, String> tagMap = new HashMap<String, String>();
            tagMap.put("actv4", "union618");
            tagMap.put("ylff4", "hqdApp001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 210l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 180l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 180l, tagMap);

            tagMap.clear();
            tagMap.put("actv4", "union618");
            tagMap.put("ylff4", "hqdWeixin001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 100l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 150l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 150l, tagMap);

            tagMap.clear();
            tagMap.put("ylff4", "hqdWeixin001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 40l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 50l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 60l, tagMap);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testPutData2() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Map<String, String> tagMap = new HashMap<String, String>();
            tagMap.put("chl", "hqdWechat");

            client.putData("metric-t", DateTimeUtil.parse("20160627 12:25", "yyyyMMdd HH:mm"), 120l, tagMap);
            client.putData("metric-t", DateTimeUtil.parse("20160627 12:27", "yyyyMMdd HH:mm"), 810l, tagMap);
            client.putData("metric-t", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 880l, tagMap);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testGetData() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Filter filter = new Filter();
            String tagk = "chl";
            String tagvFtype = OpentsdbClient.FILTER_TYPE_WILDCARD;
            String tagvFilter = "hqdapp*";

            Map<String, Map<String, Object>> tagsValuesMap = client.getData("metric-t", tagk, tagvFtype, tagvFilter, Aggregator.avg.name(), "1m",
                    DateTimeUtil.parse("2016-06-27 12:00:00", "yyyy-MM-dd HH:mm:ss"), DateTimeUtil.parse("2016-06-30 11:00:00", "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd HHmm");

            for (Iterator<String> it = tagsValuesMap.keySet().iterator(); it.hasNext(); ) {
                String tags = it.next();
                System.out.println(">> tags: " + tags);
                Map<String, Object> tvMap = tagsValuesMap.get(tags);
                for (Iterator<String> it2 = tvMap.keySet().iterator(); it2.hasNext(); ) {
                    String time = it2.next();
                    System.out.println("    >> " + time + " <-> " + tvMap.get(time));
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testGetData2() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            List<Filter> filters = new ArrayList<Filter>();
            Filter filter = new Filter();
            filter.setType(OpentsdbClient.FILTER_TYPE_LITERAL_OR);
            filter.setTagk("actv4");
            filter.setFilter("union618");
            filter.setGroupBy(Boolean.TRUE);
            filters.add(filter);

            filter = new Filter();
            filter.setType(OpentsdbClient.FILTER_TYPE_LITERAL_OR);
            filter.setTagk("ylff4");
            filter.setFilter("hqdApp001|hqdWeixin001");
            filter.setGroupBy(Boolean.TRUE);
            filters.add(filter);

            Map<String, Map<String, Object>> tagsValuesMap = client.getData("pv", filters, Aggregator.sum.name(), "1m",
                    DateTimeUtil.parse("2016-06-27 12:00:00", "yyyy-MM-dd HH:mm:ss"), DateTimeUtil.parse("2016-06-30 11:00:00", "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd HHmm");

            for (Iterator<String> it = tagsValuesMap.keySet().iterator(); it.hasNext(); ) {
                String tags = it.next();
                System.out.println(">> tags: " + tags);
                Map<String, Object> tvMap = tagsValuesMap.get(tags);
                for (Iterator<String> it2 = tvMap.keySet().iterator(); it2.hasNext(); ) {
                    String time = it2.next();
                    System.out.println("    >> " + time + " <-> " + tvMap.get(time));
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
林中漫步
粉丝 104
博文 55
码字总数 33266
作品 0
深圳
架构师
私信 提问
加载中
此博客有 3 条评论,请先登录后再查看。
beego API开发以及自动化文档

beego API开发以及自动化文档 beego1.3版本已经在上个星期发布了,但是还是有很多人不了解如何来进行开发,也是在一步一步的测试中开发,期间QQ群里面很多人都问我如何开发,我的业余时间实在...

astaxie
2014/06/25
2.7W
22
XLSX读写库--EPPlus

EPPlus 是使用Open Office XML格式(xlsx)读写Excel 2007 / 2010文件的.net开发库。 EPPlus 支持: 单元格范围 单元格样式(Border, Color, Fill, Font, Number, Alignments) Charts 图片 形状...

匿名
2013/02/01
1W
2
Java™ 编译器--Janino

Janino是一个超级小但又超级快的Java™ 编译器. 它不仅能像javac工具那样讲一组源文件编译成字节码文件,还可以对一些Java表达式,代码块,类中的文本(class body)或者内存中源文件进行编译,...

匿名
2013/04/02
4.1K
0
Python数据分析工具包--Pandas

Python Data Analysis Library 或 pandas 是连接 SciPy 和 NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。Pandas 纳入了大量库和一些标准的数据模型,提供了高效地操作大型数据集...

匿名
2012/10/30
2.1W
2
PHP OAuth API

PHP OAuth API可以授权访问的用户一个API使用OAuth协议。它抽象OAuth 1.0,1.0和2.0在同一个类,因此您可以使用相同的“获得一个令牌”授权访问代表当前用户的任何API支持任何版本的OAuth协议。...

匿名
2012/11/01
4K
0

没有更多内容

加载失败,请刷新页面

加载更多

开源FPGA单板iCESugar

随着产业的发展,近年来FPGA越来越得到市场的重视,5G、矿机、人工智能、图像识别、risc-v、通信等众多领域均可见到FPGA的身影,目前比较知名的FPGA厂商有xilinx、altera、lattice等,其中x...

whoisliang
23分钟前
6
0
合并记录帮助文档

合并记录步骤用于将两个不同来源的数据合并,这两个来源的数据分别为旧数据和新数据;该步骤将旧数据和新数据按照指定的关键字匹配、比较、合并,并显示差异信息。接下来就详细介绍一下该步骤...

osc_slnrw1du
23分钟前
8
0
Spark之RDD转换算子(transformation)大全

前面已经给大家讲过RDD原理,今天就给大家说说RDD的转换算子有哪些,以便大家理解。 对于转换操作,RDD的所有转换都不会直接计算结果,仅记录作用于RDD上的操作,当遇到动作算子(Action)时...

osc_3nr2bq5w
24分钟前
11
0
自定义常量数据帮助文档

自定义常量数据步骤主要用于增加自定义字段和行集数据到流中,可增加多个字段并为每个字段赋予行集的值。步骤配置信息如图1所示。 图1 自定义常量数据步骤配置信息 下文详细解释各控件的含义...

osc_r9wwwi0j
25分钟前
3
0
Linux安装配置ftp(Ceonts 7)

1、安装vsftpd yum -y install vsftpd (我这里已经安装好了,只要不报错即安装成功) 安装完成后可以在/etc/vsftpd目录下看到vsftpd.conf 文件,这是vsftp的配置文件。 2、 添加一个ftp用户...

osc_tko37abm
26分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部