文档章节

OpenTSDB的读写API

林中漫步
 林中漫步
发布于 2016/06/26 23:05
字数 1957
阅读 1657
收藏 5

      OpenTSDB提供三种方式的读写操作:telnet、http、post,但官方并没提供JAVA版的API。

      多亏有开源贡献者“shifeng258”,他用java编写了 opentsdb-client ,才使得我能对openTSDB的读写操作进行封装,从而分享至此:

import org.opentsdb.client.ExpectResponse;
import org.opentsdb.client.HttpClient;
import org.opentsdb.client.HttpClientImpl;
import org.opentsdb.client.builder.MetricBuilder;
import org.opentsdb.client.request.Filter;
import org.opentsdb.client.request.Query;
import org.opentsdb.client.request.QueryBuilder;
import org.opentsdb.client.request.SubQueries;
import org.opentsdb.client.response.Response;
import org.opentsdb.client.response.SimpleHttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

/**
 * Opentsdb读写工具类
 * <p/>
 */
public class OpentsdbClient {

    private static Logger log = LoggerFactory.getLogger(OpentsdbClient.class);


    /**
     * tagv的过滤规则: 精确匹配多项迭代值,多项迭代值以'|'分隔,大小写敏感
     */
    public static String FILTER_TYPE_LITERAL_OR = "literal_or";

    /**
     * tagv的过滤规则: 通配符匹配,大小写敏感
     */
    public static String FILTER_TYPE_WILDCARD = "wildcard";

    /**
     * tagv的过滤规则: 正则表达式匹配
     */
    public static String FILTER_TYPE_REGEXP = "regexp";


    /**
     * tagv的过滤规则: 精确匹配多项迭代值,多项迭代值以'|'分隔,忽略大小写
     */
    public static String FILTER_TYPE_ILITERAL_OR = "iliteral_or";


    /**
     * tagv的过滤规则: 通配符匹配,忽略大小写
     */
    public static String FILTER_TYPE_IWILDCARD = "iwildcard";


    /**
     * tagv的过滤规则: 通配符取非匹配,大小写敏感
     */
    public static String FILTER_TYPE_NOT_LITERAL_OR = "not_literal_or";

    /**
     * tagv的过滤规则: 通配符取非匹配,忽略大小写
     */
    public static String FILTER_TYPE_NOT_ILITERAL_OR = "not_iliteral_or";


    /**
     * tagv的过滤规则:
     * <p/>
     * Skips any time series with the given tag key, regardless of the value.
     * This can be useful for situations where a metric has inconsistent tag sets.
     * NOTE: The filter value must be null or an empty string
     */
    public static String FILTER_TYPE_NOT_KEY = "not_key";


    private HttpClient httpClient;


    public OpentsdbClient() {
        this.httpClient = new HttpClientImpl(ConfigLoader.getProperty("opentsdb.url"));
    }

    public OpentsdbClient(String opentsdbUrl) {
        this.httpClient = new HttpClientImpl(opentsdbUrl);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, Date timestamp, Long value, Map<String, String> tagMap) throws Exception {
        long timsSecs = timestamp.getTime();
        return this.putData(metric, timsSecs, value, tagMap);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, Date timestamp, Double value, Map<String, String> tagMap) throws Exception {
        long timsSecs = timestamp.getTime();
        return this.putData(metric, timsSecs, value, tagMap);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 转化为秒的时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, long timestamp, Long value, Map<String, String> tagMap) throws Exception {
        MetricBuilder builder = MetricBuilder.getInstance();
        builder.addMetric(metric).setDataPoint(timestamp, value).addTags(tagMap);
        try {
            log.debug("write quest:{}", builder.build());
            Response response = httpClient.pushMetrics(builder, ExpectResponse.SUMMARY);
            log.debug("response.statusCode: {}", response.getStatusCode());
            return response.isSuccess();
        } catch (Exception e) {
            log.error("put data to opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 转化为秒的时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, long timestamp, Double value, Map<String, String> tagMap) throws Exception {
        MetricBuilder builder = MetricBuilder.getInstance();
        builder.addMetric(metric).setDataPoint(timestamp, value).addTags(tagMap);
        try {
            log.debug("write quest:{}", builder.build());
            Response response = httpClient.pushMetrics(builder, ExpectResponse.SUMMARY);
            log.debug("response.statusCode: {}", response.getStatusCode());
            return response.isSuccess();
        } catch (Exception e) {
            log.error("put data to opentsdb error: ", e);
            throw e;
        }
    }



    /**
     * 查询数据,返回的数据为json格式,结构为:
     * "[
     * "  {
     * "    metric: mysql.innodb.row_lock_time,
     * "    tags: {
     * "      host: web01,
     * "      dc: beijing
     * "    },
     * "    aggregateTags: [],
     * "    dps: {
     * "      1435716527: 1234,
     * "      1435716529: 2345
     * "    }
     * "  },
     * "  {
     * "    metric: mysql.innodb.row_lock_time,
     * "    tags: {
     * "      host: web02,
     * "      dc: beijing
     * "    },
     * "    aggregateTags: [],
     * "    dps: {
     * "      1435716627: 3456
     * "    }
     * "  }
     * "]";
     *
     * @param metric     要查询的指标
     * @param tagk       tagk
     * @param tagvFtype  tagv的过滤规则
     * @param tagvFilter tagv的匹配字符
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @return
     */
    public String getData(String metric, String tagk, String tagvFtype, String tagvFilter, String aggregator, String downsample,
                          Date startTime, Date endTime) throws IOException {

        QueryBuilder queryBuilder = QueryBuilder.getInstance();
        Query query = queryBuilder.getQuery();

        query.setStart(startTime.getTime() / 1000);
        query.setEnd(endTime.getTime() / 1000);

        List<SubQueries> sqList = new ArrayList<SubQueries>();
        SubQueries sq = new SubQueries();
        sq.setMetric(metric);
        sq.setAggregator(aggregator);

        List<Filter> filters = new ArrayList<Filter>();
        Filter filter = new Filter();
        filter.setTagk(tagk);
        filter.setType(tagvFtype);
        filter.setFilter(tagvFilter);
        filter.setGroupBy(Boolean.TRUE);
        filters.add(filter);

        sq.setFilters(filters);

        sq.setDownsample(downsample + "-" + aggregator);

        sqList.add(sq);

        query.setQueries(sqList);

        try {
            log.debug("query request:{}", queryBuilder.build()); //这行起到校验作用
            SimpleHttpResponse spHttpResponse = httpClient.pushQueries(queryBuilder, ExpectResponse.DETAIL);
            log.debug("response.content: {}", spHttpResponse.getContent());

            if (spHttpResponse.isSuccess()) {
                return spHttpResponse.getContent();
            }
            return null;
        } catch (IOException e) {
            log.error("get data from opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 查询数据,返回的数据为json格式。
     *
     * @param metric     要查询的指标
     * @param filters     查询过滤的条件, 原来使用的tags在v2.2后已不适用
     *                   filter.setType(): 设置过滤类型, 如: wildcard, regexp
     *                   filter.setTagk(): 设置tag
     *                   filter.setFilter(): 根据type设置tagv的过滤表达式, 如: hqdApp|hqdWechat
     *                   filter.setGroupBy():设置成true, 不设置或设置成false会导致读超时
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     */
    public String getData(String metric, List<Filter> filters, String aggregator, String downsample,
                          Date startTime, Date endTime) throws IOException {

        QueryBuilder queryBuilder = QueryBuilder.getInstance();
        Query query = queryBuilder.getQuery();

        query.setStart(startTime.getTime() / 1000);
        query.setEnd(endTime.getTime() / 1000);

        List<SubQueries> sqList = new ArrayList<SubQueries>();
        SubQueries sq = new SubQueries();
        sq.addMetric(metric);
        sq.addAggregator(aggregator);

        sq.setFilters(filters);

        sq.setDownsample(downsample + "-" + aggregator);
        sqList.add(sq);

        query.setQueries(sqList);

        try {
            log.info("query request:{}", queryBuilder.build()); //这行起到校验作用
            SimpleHttpResponse spHttpResponse = httpClient.pushQueries(queryBuilder, ExpectResponse.DETAIL);
            log.info("response.content: {}", spHttpResponse.getContent());

            if (spHttpResponse.isSuccess()) {
                return spHttpResponse.getContent();
            }
            return null;
        } catch (IOException e) {
            log.error("get data from opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 查询数据,返回tags与时序值的映射: Map<tags, Map<时间点, value>>
     *
     * @param metric     要查询的指标
     * @param tagk       tagk
     * @param tagvFtype  tagv的过滤规则
     * @param tagvFilter tagv的匹配字符
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @param retTimeFmt 返回的结果集中,时间点的格式, 如:yyyy-MM-dd HH:mm:ss 或 yyyyMMddHH 等
     * @return Map<tags, Map<时间点, value>>
     * @throws java.io.IOException
     */
    public Map<String, Map<String, Object>> getData(String metric, String tagk, String tagvFtype, String tagvFilter, String aggregator, String downsample,
                                                    Date startTime, Date endTime, String retTimeFmt) throws IOException {
        String resContent = this.getData(metric, tagk, tagvFtype, tagvFilter, aggregator, downsample, startTime, endTime);
        return this.convertContentToMap(resContent, retTimeFmt);
    }

    /**
     * 查询数据,返回tags与时序值的映射: Map<tags, Map<时间点, value>>
     *
     * @param metric     要查询的指标
     * @param filters     查询过滤的条件, 原来使用的tags在v2.2后已不适用
     *                   filter.setType(): 设置过滤类型, 如: wildcard, regexp
     *                   filter.setTagk(): 设置tag
     *                   filter.setFilter(): 根据type设置tagv的过滤表达式, 如: hqdApp|hqdWechat
     *                   filter.setGroupBy():设置成true, 不设置或设置成false会导致读超时
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @param retTimeFmt 返回的结果集中,时间点的格式, 如:yyyy-MM-dd HH:mm:ss 或 yyyyMMddHH 等
     * @return Map<tags, Map<时间点, value>>
     */
    public Map<String, Map<String, Object>> getData(String metric, List<Filter> filters, String aggregator, String downsample,
                                                    Date startTime, Date endTime, String retTimeFmt) throws IOException {
        String resContent = this.getData(metric, filters, aggregator, downsample, startTime, endTime);
        return this.convertContentToMap(resContent, retTimeFmt);
    }


    public Map<String, Map<String, Object>> convertContentToMap(String resContent, String retTimeFmt) {

        // Map<tags, Map<时间点, value>>
        Map<String, Map<String, Object>> tagsValuesMap = new HashMap<String, Map<String, Object>>();

        if (resContent == null || "".equals(resContent.trim())) {
            return tagsValuesMap;
        }

        JSONArray array = (JSONArray) JSONObject.parse(resContent);
        if (array != null) {
            for (int i = 0; i < array.size(); i++) {
                JSONObject obj = (JSONObject) array.get(i);
                JSONObject tags = (JSONObject) obj.get("tags");
                JSONObject dps = (JSONObject) obj.get("dps");

                //timeValueMap.putAll(dps);
                Map<String, Object> timeValueMap = new HashMap<String, Object>();
                for (Iterator<String> it = dps.keySet().iterator(); it.hasNext(); ) {
                    String timstamp = it.next();
                    Date datetime = new Date(Long.parseLong(timstamp) * 1000);
                    timeValueMap.put(DateTimeUtil.format(datetime, retTimeFmt), dps.get(timstamp));
                }
                tagsValuesMap.put(tags.toString(), timeValueMap);
            }
        }
        return tagsValuesMap;
    }
}

下而是测试类:

mport org.junit.Test;
import org.opentsdb.client.request.Filter;
import org.opentsdb.client.util.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;


public class OpentsdbClientTest {

    private static Logger log = LoggerFactory.getLogger(OpentsdbClientTest.class);

    @Test
    public void testPutData() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Map<String, String> tagMap = new HashMap<String, String>();
            tagMap.put("actv4", "union618");
            tagMap.put("ylff4", "hqdApp001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 210l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 180l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 180l, tagMap);

            tagMap.clear();
            tagMap.put("actv4", "union618");
            tagMap.put("ylff4", "hqdWeixin001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 100l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 150l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 150l, tagMap);

            tagMap.clear();
            tagMap.put("ylff4", "hqdWeixin001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 40l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 50l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 60l, tagMap);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testPutData2() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Map<String, String> tagMap = new HashMap<String, String>();
            tagMap.put("chl", "hqdWechat");

            client.putData("metric-t", DateTimeUtil.parse("20160627 12:25", "yyyyMMdd HH:mm"), 120l, tagMap);
            client.putData("metric-t", DateTimeUtil.parse("20160627 12:27", "yyyyMMdd HH:mm"), 810l, tagMap);
            client.putData("metric-t", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 880l, tagMap);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testGetData() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Filter filter = new Filter();
            String tagk = "chl";
            String tagvFtype = OpentsdbClient.FILTER_TYPE_WILDCARD;
            String tagvFilter = "hqdapp*";

            Map<String, Map<String, Object>> tagsValuesMap = client.getData("metric-t", tagk, tagvFtype, tagvFilter, Aggregator.avg.name(), "1m",
                    DateTimeUtil.parse("2016-06-27 12:00:00", "yyyy-MM-dd HH:mm:ss"), DateTimeUtil.parse("2016-06-30 11:00:00", "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd HHmm");

            for (Iterator<String> it = tagsValuesMap.keySet().iterator(); it.hasNext(); ) {
                String tags = it.next();
                System.out.println(">> tags: " + tags);
                Map<String, Object> tvMap = tagsValuesMap.get(tags);
                for (Iterator<String> it2 = tvMap.keySet().iterator(); it2.hasNext(); ) {
                    String time = it2.next();
                    System.out.println("    >> " + time + " <-> " + tvMap.get(time));
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testGetData2() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            List<Filter> filters = new ArrayList<Filter>();
            Filter filter = new Filter();
            filter.setType(OpentsdbClient.FILTER_TYPE_LITERAL_OR);
            filter.setTagk("actv4");
            filter.setFilter("union618");
            filter.setGroupBy(Boolean.TRUE);
            filters.add(filter);

            filter = new Filter();
            filter.setType(OpentsdbClient.FILTER_TYPE_LITERAL_OR);
            filter.setTagk("ylff4");
            filter.setFilter("hqdApp001|hqdWeixin001");
            filter.setGroupBy(Boolean.TRUE);
            filters.add(filter);

            Map<String, Map<String, Object>> tagsValuesMap = client.getData("pv", filters, Aggregator.sum.name(), "1m",
                    DateTimeUtil.parse("2016-06-27 12:00:00", "yyyy-MM-dd HH:mm:ss"), DateTimeUtil.parse("2016-06-30 11:00:00", "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd HHmm");

            for (Iterator<String> it = tagsValuesMap.keySet().iterator(); it.hasNext(); ) {
                String tags = it.next();
                System.out.println(">> tags: " + tags);
                Map<String, Object> tvMap = tagsValuesMap.get(tags);
                for (Iterator<String> it2 = tvMap.keySet().iterator(); it2.hasNext(); ) {
                    String time = it2.next();
                    System.out.println("    >> " + time + " <-> " + tvMap.get(time));
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

© 著作权归作者所有

共有 人打赏支持
林中漫步
粉丝 97
博文 55
码字总数 33266
作品 0
深圳
架构师
私信 提问
加载中

评论(3)

我有一头小毛驴
我有一头小毛驴
我也遇到了多个tags的时候返回结果不匹配的问题,请问是怎么解决的?麻烦指点一下
林中漫步
林中漫步
这个代码已经是完整的了,依赖的主要的包在:https://github.com/shifeng258/opentsdb-client

我找不这个代码了, 好像没啥依赖了呀
我有一头小毛驴
我有一头小毛驴
你好,能麻烦您把完整的代码,给我一份吗?
OpenTSDB 生产应用与思考

作者:陈杰,欢聚时代YY 基础架构部,数据库技术组,专注于HBase、Kafka,MySQL 等技术。 OpenTSDB 官方介绍 这里就不翻译了。http://opentsdb.net/overview.html How does OpenTSDB work? O...

pursue5956
2018/08/07
0
0
OpenTSDB 监控系统的研究和介绍

一、背景介绍 此次航天局为了让天宫一号与神舟九号载人交会顺利对接成功,采用了新一代数值天气预报系统为 神九保驾护航。新一代数值天气预报系统是中国国内技术最先进、分辨率最高、预报时效...

红薯
2012/07/07
4.3K
2
MySQL 记一次修改root密码

1、正常关闭MySQL服务 [root@mysqlnode02 opentsdb-2.3.0]# systemctl stop mysqld.service 2、修改配置文件my.cnf增加跳过权限认证 [root@mysqlnode02 opentsdb-2.3.0]# vi /etc/my.cnf [m......

PeakFang-BOK
2018/11/14
0
0
OpenTSDB 数据会乱套问题

OpenTSDB运行nohup日志: java.lang.IllegalStateException: id=[0, 0, 0, -9] => name=4029, already mapped to 有线宽带开户 java.lang.IllegalStateException: id=[0, 0, 2, -81] => name......

涛涛已经有人使用
2017/02/15
2
0
开源监控系统 - OpenTSDB

开源监控系统OpenTSDB,用hbase存储所有的时序(无须 采样)来构建一个分布式、可伸缩的时间序列数据库。它支持秒级数据采集所有metrics,支持永久存储,可以做容量规划,并很容易的接入到现...

匿名
2012/07/07
0
3

没有更多内容

加载失败,请刷新页面

加载更多

大数据反欺诈技术架构

一年多以前,有朋友让我聊一下你们的大数据反欺诈架构是怎么实现的,以及我们途中踩了哪些坑,怎么做到从30min延迟优化到1s内完成实时反欺诈。当时呢第一是觉得不合适,第二也是觉得场景比较...

微笑向暖wx
8分钟前
0
0
flink-系统内部消息传递的exactly once语义

At Most once,At Least once和Exactly once 在分布式系统中,组成系统的各个计算机是独立的。这些计算机有可能fail。 一个sender发送一条message到receiver。根据receiver出现fail时sender如...

xtof
15分钟前
0
0
iOS程序执行顺序和UIViewController 的生命周期(整理)

说明:此文是自己的总结笔记,主要参考: iOS程序的启动执行顺序 AppDelegate 及 UIViewController 的生命周期 UIView的生命周期 言叶之庭.jpeg 一. iOS程序的启动执行顺序 程序启动顺序图 iO...

壹峰
17分钟前
0
0
配置网络、远程登录、Linux秘钥认证

配置网络 一台服务器安装完系统之后不管是为了方便管理还是业务需要,我们都要给它配置ip地址。让机器能够联网。在现实的生产环境的当中,往往我们给服务器配置的ip都是提前规划好的,但是在...

李超小牛子
21分钟前
0
0
dotConnect for Oracle入门指南(五):检索和修改数据

【下载dotConnect for Oracle最新版本】 dotConnect for Oracle(原名OraDirect.NET)建立在ADO.NET技术上,为基于Oracle数据库的应用程序提供完整的解决方案。它为设计应用程序结构带来了新的...

电池盒
21分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部