文档章节

OpenTSDB的读写API

林中漫步
 林中漫步
发布于 2016/06/26 23:05
字数 1957
阅读 1549
收藏 5

      OpenTSDB提供三种方式的读写操作:telnet、http、post,但官方并没提供JAVA版的API。

      多亏有开源贡献者“shifeng258”,他用java编写了 opentsdb-client ,才使得我能对openTSDB的读写操作进行封装,从而分享至此:

import org.opentsdb.client.ExpectResponse;
import org.opentsdb.client.HttpClient;
import org.opentsdb.client.HttpClientImpl;
import org.opentsdb.client.builder.MetricBuilder;
import org.opentsdb.client.request.Filter;
import org.opentsdb.client.request.Query;
import org.opentsdb.client.request.QueryBuilder;
import org.opentsdb.client.request.SubQueries;
import org.opentsdb.client.response.Response;
import org.opentsdb.client.response.SimpleHttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

/**
 * Opentsdb读写工具类
 * <p/>
 */
public class OpentsdbClient {

    private static Logger log = LoggerFactory.getLogger(OpentsdbClient.class);


    /**
     * tagv的过滤规则: 精确匹配多项迭代值,多项迭代值以'|'分隔,大小写敏感
     */
    public static String FILTER_TYPE_LITERAL_OR = "literal_or";

    /**
     * tagv的过滤规则: 通配符匹配,大小写敏感
     */
    public static String FILTER_TYPE_WILDCARD = "wildcard";

    /**
     * tagv的过滤规则: 正则表达式匹配
     */
    public static String FILTER_TYPE_REGEXP = "regexp";


    /**
     * tagv的过滤规则: 精确匹配多项迭代值,多项迭代值以'|'分隔,忽略大小写
     */
    public static String FILTER_TYPE_ILITERAL_OR = "iliteral_or";


    /**
     * tagv的过滤规则: 通配符匹配,忽略大小写
     */
    public static String FILTER_TYPE_IWILDCARD = "iwildcard";


    /**
     * tagv的过滤规则: 通配符取非匹配,大小写敏感
     */
    public static String FILTER_TYPE_NOT_LITERAL_OR = "not_literal_or";

    /**
     * tagv的过滤规则: 通配符取非匹配,忽略大小写
     */
    public static String FILTER_TYPE_NOT_ILITERAL_OR = "not_iliteral_or";


    /**
     * tagv的过滤规则:
     * <p/>
     * Skips any time series with the given tag key, regardless of the value.
     * This can be useful for situations where a metric has inconsistent tag sets.
     * NOTE: The filter value must be null or an empty string
     */
    public static String FILTER_TYPE_NOT_KEY = "not_key";


    private HttpClient httpClient;


    public OpentsdbClient() {
        this.httpClient = new HttpClientImpl(ConfigLoader.getProperty("opentsdb.url"));
    }

    public OpentsdbClient(String opentsdbUrl) {
        this.httpClient = new HttpClientImpl(opentsdbUrl);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, Date timestamp, Long value, Map<String, String> tagMap) throws Exception {
        long timsSecs = timestamp.getTime();
        return this.putData(metric, timsSecs, value, tagMap);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, Date timestamp, Double value, Map<String, String> tagMap) throws Exception {
        long timsSecs = timestamp.getTime();
        return this.putData(metric, timsSecs, value, tagMap);
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 转化为秒的时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, long timestamp, Long value, Map<String, String> tagMap) throws Exception {
        MetricBuilder builder = MetricBuilder.getInstance();
        builder.addMetric(metric).setDataPoint(timestamp, value).addTags(tagMap);
        try {
            log.debug("write quest:{}", builder.build());
            Response response = httpClient.pushMetrics(builder, ExpectResponse.SUMMARY);
            log.debug("response.statusCode: {}", response.getStatusCode());
            return response.isSuccess();
        } catch (Exception e) {
            log.error("put data to opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 写入数据
     *
     * @param metric    指标
     * @param timestamp 转化为秒的时间点
     * @param value
     * @param tagMap
     * @return
     * @throws Exception
     */
    public boolean putData(String metric, long timestamp, Double value, Map<String, String> tagMap) throws Exception {
        MetricBuilder builder = MetricBuilder.getInstance();
        builder.addMetric(metric).setDataPoint(timestamp, value).addTags(tagMap);
        try {
            log.debug("write quest:{}", builder.build());
            Response response = httpClient.pushMetrics(builder, ExpectResponse.SUMMARY);
            log.debug("response.statusCode: {}", response.getStatusCode());
            return response.isSuccess();
        } catch (Exception e) {
            log.error("put data to opentsdb error: ", e);
            throw e;
        }
    }



    /**
     * 查询数据,返回的数据为json格式,结构为:
     * "[
     * "  {
     * "    metric: mysql.innodb.row_lock_time,
     * "    tags: {
     * "      host: web01,
     * "      dc: beijing
     * "    },
     * "    aggregateTags: [],
     * "    dps: {
     * "      1435716527: 1234,
     * "      1435716529: 2345
     * "    }
     * "  },
     * "  {
     * "    metric: mysql.innodb.row_lock_time,
     * "    tags: {
     * "      host: web02,
     * "      dc: beijing
     * "    },
     * "    aggregateTags: [],
     * "    dps: {
     * "      1435716627: 3456
     * "    }
     * "  }
     * "]";
     *
     * @param metric     要查询的指标
     * @param tagk       tagk
     * @param tagvFtype  tagv的过滤规则
     * @param tagvFilter tagv的匹配字符
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @return
     */
    public String getData(String metric, String tagk, String tagvFtype, String tagvFilter, String aggregator, String downsample,
                          Date startTime, Date endTime) throws IOException {

        QueryBuilder queryBuilder = QueryBuilder.getInstance();
        Query query = queryBuilder.getQuery();

        query.setStart(startTime.getTime() / 1000);
        query.setEnd(endTime.getTime() / 1000);

        List<SubQueries> sqList = new ArrayList<SubQueries>();
        SubQueries sq = new SubQueries();
        sq.setMetric(metric);
        sq.setAggregator(aggregator);

        List<Filter> filters = new ArrayList<Filter>();
        Filter filter = new Filter();
        filter.setTagk(tagk);
        filter.setType(tagvFtype);
        filter.setFilter(tagvFilter);
        filter.setGroupBy(Boolean.TRUE);
        filters.add(filter);

        sq.setFilters(filters);

        sq.setDownsample(downsample + "-" + aggregator);

        sqList.add(sq);

        query.setQueries(sqList);

        try {
            log.debug("query request:{}", queryBuilder.build()); //这行起到校验作用
            SimpleHttpResponse spHttpResponse = httpClient.pushQueries(queryBuilder, ExpectResponse.DETAIL);
            log.debug("response.content: {}", spHttpResponse.getContent());

            if (spHttpResponse.isSuccess()) {
                return spHttpResponse.getContent();
            }
            return null;
        } catch (IOException e) {
            log.error("get data from opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 查询数据,返回的数据为json格式。
     *
     * @param metric     要查询的指标
     * @param filters     查询过滤的条件, 原来使用的tags在v2.2后已不适用
     *                   filter.setType(): 设置过滤类型, 如: wildcard, regexp
     *                   filter.setTagk(): 设置tag
     *                   filter.setFilter(): 根据type设置tagv的过滤表达式, 如: hqdApp|hqdWechat
     *                   filter.setGroupBy():设置成true, 不设置或设置成false会导致读超时
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     */
    public String getData(String metric, List<Filter> filters, String aggregator, String downsample,
                          Date startTime, Date endTime) throws IOException {

        QueryBuilder queryBuilder = QueryBuilder.getInstance();
        Query query = queryBuilder.getQuery();

        query.setStart(startTime.getTime() / 1000);
        query.setEnd(endTime.getTime() / 1000);

        List<SubQueries> sqList = new ArrayList<SubQueries>();
        SubQueries sq = new SubQueries();
        sq.addMetric(metric);
        sq.addAggregator(aggregator);

        sq.setFilters(filters);

        sq.setDownsample(downsample + "-" + aggregator);
        sqList.add(sq);

        query.setQueries(sqList);

        try {
            log.info("query request:{}", queryBuilder.build()); //这行起到校验作用
            SimpleHttpResponse spHttpResponse = httpClient.pushQueries(queryBuilder, ExpectResponse.DETAIL);
            log.info("response.content: {}", spHttpResponse.getContent());

            if (spHttpResponse.isSuccess()) {
                return spHttpResponse.getContent();
            }
            return null;
        } catch (IOException e) {
            log.error("get data from opentsdb error: ", e);
            throw e;
        }
    }

    /**
     * 查询数据,返回tags与时序值的映射: Map<tags, Map<时间点, value>>
     *
     * @param metric     要查询的指标
     * @param tagk       tagk
     * @param tagvFtype  tagv的过滤规则
     * @param tagvFilter tagv的匹配字符
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @param retTimeFmt 返回的结果集中,时间点的格式, 如:yyyy-MM-dd HH:mm:ss 或 yyyyMMddHH 等
     * @return Map<tags, Map<时间点, value>>
     * @throws java.io.IOException
     */
    public Map<String, Map<String, Object>> getData(String metric, String tagk, String tagvFtype, String tagvFilter, String aggregator, String downsample,
                                                    Date startTime, Date endTime, String retTimeFmt) throws IOException {
        String resContent = this.getData(metric, tagk, tagvFtype, tagvFilter, aggregator, downsample, startTime, endTime);
        return this.convertContentToMap(resContent, retTimeFmt);
    }

    /**
     * 查询数据,返回tags与时序值的映射: Map<tags, Map<时间点, value>>
     *
     * @param metric     要查询的指标
     * @param filters     查询过滤的条件, 原来使用的tags在v2.2后已不适用
     *                   filter.setType(): 设置过滤类型, 如: wildcard, regexp
     *                   filter.setTagk(): 设置tag
     *                   filter.setFilter(): 根据type设置tagv的过滤表达式, 如: hqdApp|hqdWechat
     *                   filter.setGroupBy():设置成true, 不设置或设置成false会导致读超时
     * @param aggregator 查询的聚合类型, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM
     * @param downsample 采样的时间粒度, 如: 1s,2m,1h,1d,2d
     * @param startTime  开始时间
     * @param endTime    结束时间
     * @param retTimeFmt 返回的结果集中,时间点的格式, 如:yyyy-MM-dd HH:mm:ss 或 yyyyMMddHH 等
     * @return Map<tags, Map<时间点, value>>
     */
    public Map<String, Map<String, Object>> getData(String metric, List<Filter> filters, String aggregator, String downsample,
                                                    Date startTime, Date endTime, String retTimeFmt) throws IOException {
        String resContent = this.getData(metric, filters, aggregator, downsample, startTime, endTime);
        return this.convertContentToMap(resContent, retTimeFmt);
    }


    public Map<String, Map<String, Object>> convertContentToMap(String resContent, String retTimeFmt) {

        // Map<tags, Map<时间点, value>>
        Map<String, Map<String, Object>> tagsValuesMap = new HashMap<String, Map<String, Object>>();

        if (resContent == null || "".equals(resContent.trim())) {
            return tagsValuesMap;
        }

        JSONArray array = (JSONArray) JSONObject.parse(resContent);
        if (array != null) {
            for (int i = 0; i < array.size(); i++) {
                JSONObject obj = (JSONObject) array.get(i);
                JSONObject tags = (JSONObject) obj.get("tags");
                JSONObject dps = (JSONObject) obj.get("dps");

                //timeValueMap.putAll(dps);
                Map<String, Object> timeValueMap = new HashMap<String, Object>();
                for (Iterator<String> it = dps.keySet().iterator(); it.hasNext(); ) {
                    String timstamp = it.next();
                    Date datetime = new Date(Long.parseLong(timstamp) * 1000);
                    timeValueMap.put(DateTimeUtil.format(datetime, retTimeFmt), dps.get(timstamp));
                }
                tagsValuesMap.put(tags.toString(), timeValueMap);
            }
        }
        return tagsValuesMap;
    }
}

下而是测试类:

mport org.junit.Test;
import org.opentsdb.client.request.Filter;
import org.opentsdb.client.util.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;


public class OpentsdbClientTest {

    private static Logger log = LoggerFactory.getLogger(OpentsdbClientTest.class);

    @Test
    public void testPutData() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Map<String, String> tagMap = new HashMap<String, String>();
            tagMap.put("actv4", "union618");
            tagMap.put("ylff4", "hqdApp001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 210l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 180l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 180l, tagMap);

            tagMap.clear();
            tagMap.put("actv4", "union618");
            tagMap.put("ylff4", "hqdWeixin001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 100l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 150l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 150l, tagMap);

            tagMap.clear();
            tagMap.put("ylff4", "hqdWeixin001");
            client.putData("pv", DateTimeUtil.parse("20160627 12:15", "yyyyMMdd HH:mm"), 40l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 12:17", "yyyyMMdd HH:mm"), 50l, tagMap);
            client.putData("pv", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 60l, tagMap);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testPutData2() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Map<String, String> tagMap = new HashMap<String, String>();
            tagMap.put("chl", "hqdWechat");

            client.putData("metric-t", DateTimeUtil.parse("20160627 12:25", "yyyyMMdd HH:mm"), 120l, tagMap);
            client.putData("metric-t", DateTimeUtil.parse("20160627 12:27", "yyyyMMdd HH:mm"), 810l, tagMap);
            client.putData("metric-t", DateTimeUtil.parse("20160627 13:20", "yyyyMMdd HH:mm"), 880l, tagMap);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testGetData() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            Filter filter = new Filter();
            String tagk = "chl";
            String tagvFtype = OpentsdbClient.FILTER_TYPE_WILDCARD;
            String tagvFilter = "hqdapp*";

            Map<String, Map<String, Object>> tagsValuesMap = client.getData("metric-t", tagk, tagvFtype, tagvFilter, Aggregator.avg.name(), "1m",
                    DateTimeUtil.parse("2016-06-27 12:00:00", "yyyy-MM-dd HH:mm:ss"), DateTimeUtil.parse("2016-06-30 11:00:00", "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd HHmm");

            for (Iterator<String> it = tagsValuesMap.keySet().iterator(); it.hasNext(); ) {
                String tags = it.next();
                System.out.println(">> tags: " + tags);
                Map<String, Object> tvMap = tagsValuesMap.get(tags);
                for (Iterator<String> it2 = tvMap.keySet().iterator(); it2.hasNext(); ) {
                    String time = it2.next();
                    System.out.println("    >> " + time + " <-> " + tvMap.get(time));
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testGetData2() {
        OpentsdbClient client = new OpentsdbClient(ConfigLoader.getProperty("opentsdb.url"));
        try {
            List<Filter> filters = new ArrayList<Filter>();
            Filter filter = new Filter();
            filter.setType(OpentsdbClient.FILTER_TYPE_LITERAL_OR);
            filter.setTagk("actv4");
            filter.setFilter("union618");
            filter.setGroupBy(Boolean.TRUE);
            filters.add(filter);

            filter = new Filter();
            filter.setType(OpentsdbClient.FILTER_TYPE_LITERAL_OR);
            filter.setTagk("ylff4");
            filter.setFilter("hqdApp001|hqdWeixin001");
            filter.setGroupBy(Boolean.TRUE);
            filters.add(filter);

            Map<String, Map<String, Object>> tagsValuesMap = client.getData("pv", filters, Aggregator.sum.name(), "1m",
                    DateTimeUtil.parse("2016-06-27 12:00:00", "yyyy-MM-dd HH:mm:ss"), DateTimeUtil.parse("2016-06-30 11:00:00", "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd HHmm");

            for (Iterator<String> it = tagsValuesMap.keySet().iterator(); it.hasNext(); ) {
                String tags = it.next();
                System.out.println(">> tags: " + tags);
                Map<String, Object> tvMap = tagsValuesMap.get(tags);
                for (Iterator<String> it2 = tvMap.keySet().iterator(); it2.hasNext(); ) {
                    String time = it2.next();
                    System.out.println("    >> " + time + " <-> " + tvMap.get(time));
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

© 著作权归作者所有

共有 人打赏支持
林中漫步
粉丝 97
博文 55
码字总数 33323
作品 0
深圳
架构师
私信 提问
加载中

评论(3)

我有一头小毛驴
我有一头小毛驴
我也遇到了多个tags的时候返回结果不匹配的问题,请问是怎么解决的?麻烦指点一下
林中漫步
林中漫步
这个代码已经是完整的了,依赖的主要的包在:https://github.com/shifeng258/opentsdb-client

我找不这个代码了, 好像没啥依赖了呀
我有一头小毛驴
我有一头小毛驴
你好,能麻烦您把完整的代码,给我一份吗?
OpenTSDB 生产应用与思考

作者:陈杰,欢聚时代YY 基础架构部,数据库技术组,专注于HBase、Kafka,MySQL 等技术。 OpenTSDB 官方介绍 这里就不翻译了。http://opentsdb.net/overview.html How does OpenTSDB work? O...

pursue5956
08/07
0
0
MySQL 记一次修改root密码

1、正常关闭MySQL服务 [root@mysqlnode02 opentsdb-2.3.0]# systemctl stop mysqld.service 2、修改配置文件my.cnf增加跳过权限认证 [root@mysqlnode02 opentsdb-2.3.0]# vi /etc/my.cnf [m......

PeakFang-BOK
11/14
0
0
OpenTSDB中的一个奇怪现象

功能: 有个采集点,后端会周期性的采集指标数据,以及指标实例(TimeSeries)对应的元数据信息。 因此也就是要定期的更新TSMeta。 程序刚启动运行都很OK, 可是第二天看日志发现,很多Interal...

莫问viva
2015/05/26
0
0
开源监控系统 - OpenTSDB

开源监控系统OpenTSDB,用hbase存储所有的时序(无须 采样)来构建一个分布式、可伸缩的时间序列数据库。它支持秒级数据采集所有metrics,支持永久存储,可以做容量规划,并很容易的接入到现...

匿名
2012/07/07
0
3
openTSDB数据结构的设计优化咨询

既有状态:openTSDB安装完成但我没有使用过TSDB 期望:根据我这边的中小规模物联网远程监控数据存储的应用场景,进行较为合理的TSDB数据存储结构的设计和定义和我这边物联网数据采集服务器的...

伍工
08/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MariaDB 服务器在 MySQL Workbench 备份数据的时候出错如何解决

服务器是运行在 MariaDB 10.2 上面的,在使用 MySQL Workbench 出现错误: mysqldump: Couldn't execute 'SELECT COLUMN_NAME, JSON_EXTRACT(HISTOGRAM, '$."number-of-buckets-specified"'......

honeymose
今天
3
0
apache顶级项目(二) - B~C

apache顶级项目(二) - B~C https://www.apache.org/ Bahir Apache Bahir provides extensions to multiple distributed analytic platforms, extending their reach with a diversity of s......

晨猫
今天
6
0
day152-2018-11-19-英语流利阅读

“超级食物”竟然是营销噱头? Daniel 2018-11-19 1.今日导读 近几年来,超级食物 superfoods 开始逐渐走红。不难发现,越来越多的轻食餐厅也在不断推出以超级食物为主打食材的健康料理,像是...

飞鱼说编程
今天
12
0
SpringBoot源码:启动过程分析(二)

接着上篇继续分析 SpringBoot 的启动过程。 SpringBoot的版本为:2.1.0 release,最新版本。 一.时序图 一样的,我们先把时序图贴上来,方便理解: 二.源码分析 回顾一下,前面我们分析到了下...

Jacktanger
昨天
4
0
Apache防盗链配置,Directory访问控制,FilesMatch进行访问控制

防盗链配置 通过限制referer来实现防盗链的功能 配置前,使用curl -e 指定referer [root@test-a test-webroot]# curl -e "http://www.test.com/1.html" -x127.0.0.1:80 "www.test.com/1.jpg......

野雪球
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部