文档章节

go监控方案(7) -- 实现

solate
 solate
发布于 07/19 15:12
字数 1062
阅读 32
收藏 4

metrics 客户端

数据采集使用go-metrics

传输使用UDP, 仿StatsD上传采集数据, InfluxDB进行数据存储, Grafana进行展示。

实现github 地址

https://github.com/solate/metrics

该地址有已经改好的配置文件可以直接使用。

使用的all-in-one :

git docker-statsd-influxdb-grafana

docker hub 地址

数据封装

//挂载配置文件,已修改statsd模版
docker run --ulimit nofile=66000:66000  -v /root/telegraf.conf:/etc/telegraf/telegraf.conf   -d   --name docker-statsd-influxdb-grafana   -p 3003:3003   -p 3004:8888   -p 8086:8086   -p 8125:8125/udp   samuelebistoletti/docker-statsd-influxdb-grafana:latest

register

register 使用的name 必须是不同的

telegraf 配置修改

[[inputs.statsd]] 部分配置打开, 修改templates为:

   templates = [
      "* measurement.measurement.field"
   ]

表示传值prefix.name.field 最好表示为prefix_name field

代码实现

package client

import (
	"bufio"
	"bytes"
	"github.com/rcrowley/go-metrics"
	"log"
	"net"
	"strconv"
	"strings"
	"time"
)

const (
	// UDP packet limit, see
	// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
	UDP_MAX_PACKET_SIZE int = 64 * 1024
)

// Config provides a container with configuration parameters for
// the StatsD exporter
type Config struct {
	Network       string           // Network: tcp, udp.
	Addr          string           // Network address to connect to | 地址
	Registry      metrics.Registry // Registry to be exported | metrics注册
	FlushInterval time.Duration    // Flush interval | 刷新间隔时间
	Prefix        string           // Prefix to be prepended to metric names | 前缀名字
	Rate          float32          // Rate
	Tags          string           // tag //TODO

	conn net.Conn
}

func StatsD(r metrics.Registry, d time.Duration, prefix string, network string, addr string, rate float32) {

	conn, err := net.Dial(network, addr)
	if err != nil {
		panic("conn remote err!")
	}

	StatsDWithConfig(Config{
		Network:       network,
		Addr:          addr,
		Registry:      r,
		FlushInterval: d,
		Prefix:        prefix,
		Rate:          rate,
		conn:          conn,
	})

}

// WithConfig is a blocking exporter function
func StatsDWithConfig(c Config) {
	for _ = range time.Tick(c.FlushInterval) {
		if err := statsd(&c); err != nil {
			log.Println(err)
			c.conn.Close()
		}
	}
}

func statsd(c *Config) (err error) {

	w := bufio.NewWriter(c.conn)

	c.Registry.Each(func(name string, i interface{}) {
		switch metric := i.(type) {
		case metrics.Counter:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Count(), "|c", c.Rate))
		case metrics.Gauge:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Value(), "|g", c.Rate))
		case metrics.GaugeFloat64:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Value(), "|g", c.Rate))
		case metrics.Histogram:
			ms := metric.Snapshot()
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})

			fields := make([][]byte, 12)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "max", ms.Max(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.Mean(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "min", ms.Min(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "stddev", ms.StdDev(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "variance", ms.Variance(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p50", ps[0], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p75", ps[1], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p95", ps[2], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p99", ps[3], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p999", ps[4], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p9999", ps[5], "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})
			w.Write(buf)
		case metrics.Meter:
			ms := metric.Snapshot()
			fields := make([][]byte, 5)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m1", ms.Rate1(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m5", ms.Rate5(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m15", ms.Rate15(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.RateMean(), "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})
			w.Write(buf)

		case metrics.Timer:
			ms := metric.Snapshot()
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})

			fields := make([][]byte, 12)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "max", ms.Max(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.Mean(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "min", ms.Min(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "stddev", ms.StdDev(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "variance", ms.Variance(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p50", ps[0], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p75", ps[1], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p95", ps[2], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p99", ps[3], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p999", ps[4], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p9999", ps[5], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m1", ms.Rate1(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m5", ms.Rate5(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m15", ms.Rate15(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.RateMean(), "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})

			w.Write(buf)

			//case metrics.Healthcheck:
			//	metric.Check()
			//	log.Printf("healthcheck %s\n", name)
			//	log.Printf("  error:       %v\n", metric.Error())
			//
			//case metrics.EWMA:
			//case metrics.Sample:

		}

		w.Flush()
	})

	return
}

//构造发送line
func statsdLine(prefix, name, field string, value interface{}, suffix string, rate float32) []byte {

	//<metricname>:<value>|<type>|@<rate>
	var buffer bytes.Buffer

	//buf := make([]byte, UDP_MAX_PACKET_SIZE)

	//添加前缀
	if prefix != "" {
		//buf = append(buf, prefix...)
		//buf = append(buf, '.')
		buffer.WriteString(prefix)
		buffer.WriteString(".")
	} else {
		buffer.WriteString("statsd")
		buffer.WriteString(".")
	}

	////添加名称
	//buf = append(buf, name...)
	//buf = append(buf, ':')

	//将name注册中的'.'替换成'_', 配合telegraf修改模版,防止将数据库名字改为属性
	if strings.Contains(name, ".") {
		name = strings.ReplaceAll(name, ".", "_")
	}
	//添加名称
	buffer.WriteString(name)

	if field != "" {
		buffer.WriteString(".")
		buffer.WriteString(field)
	}
	buffer.WriteString(":")

	buf := buffer.Bytes()

	switch v := value.(type) {
	case string:
		buf = append(buf, v...)
	case int64:
		buf = strconv.AppendInt(buf, v, 10)
	case float64:
		buf = strconv.AppendFloat(buf, v, 'f', -1, 64)
	default:
		return nil
	}

	if suffix != "" {
		buf = append(buf, suffix...)
	}

	if rate != 0 && rate < 1 {
		buf = append(buf, "|@"...)
		buf = strconv.AppendFloat(buf, float64(rate), 'f', 6, 32)
	}

	buf = append(buf, "\n"...) //每一行打一个回车,telegraf 是使用回车进行读取的
	return buf

}

参考

multiple field values issues

© 著作权归作者所有

上一篇: influxdb 学习
solate
粉丝 10
博文 134
码字总数 119760
作品 0
成都
程序员
私信 提问
百度、阿里、腾讯、京东、大型互联网分布式架构必备技能

分布式架构 迎接高并发大数据的挑战,从深度到广度完善知识体系,成为下一个互联网高薪人才。 理论结合实战,透彻理解分布式架构及其解决方案。 面向人群 1、工作1-5年需要突破瓶颈; 2、传统...

Java高级架构
2017/12/21
0
0
斌哥的 Docker 进阶指南—监控方案的实现

过去的一年中,关于 [Docker][1] 的话题从未断过,而如今,从尝试 Docker 到最终决定使用 Docker 的转化率依然在逐步升高,关于 Docker 的讨论更是有增无减。另一方面,大家的注意力也渐渐从...

OneAPM蓝海讯通
2016/05/04
117
0
架构设计:系统间通信(33)——其他消息中间件及场景应用(下3)

=================================== (接上文:《架构设计:系统间通信(32)——其他消息中间件及场景应用(下2)》) 5-7、解决方案三:非侵入式方案 以上两种方案中为了让业务系统能够集...

yinwenjie
2016/05/31
0
0
视频监控业务上云方案解析

行业痛点 由于视频监控能最大的记录和还原当被监控的场景,近年来,视频监控逐步从专业领域的应用普及到了各个民用、家用领域,各个摄像相机厂家也纷纷推出各种型号的摄像机和解决方案。由于...

little09
2017/12/22
0
0
其他消息中间件及场景应用(下3)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51516329 目...

yunlielai
2018/04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

只需一步,在Spring Boot中统一Restful API返回值格式与统一处理异常

统一返回值 在前后端分离大行其道的今天,有一个统一的返回值格式不仅能使我们的接口看起来更漂亮,而且还可以使前端可以统一处理很多东西,避免很多问题的产生。 比较通用的返回值格式如下:...

晓月寒丶
昨天
59
0
区块链应用到供应链上的好处和实际案例

区块链可以解决供应链中的很多问题,例如记录以及追踪产品。那么使用区块链应用到各产品供应链上到底有什么好处?猎头悬赏平台解优人才网小编给大家做个简单的分享: 使用区块链的最突出的优...

猎头悬赏平台
昨天
28
0
全世界到底有多少软件开发人员?

埃文斯数据公司(Evans Data Corporation) 2019 最新的统计数据(原文)显示,2018 年全球共有 2300 万软件开发人员,预计到 2019 年底这个数字将达到 2640万,到 2023 年达到 2770万。 而来自...

红薯
昨天
65
0
Go 语言基础—— 通道(channel)

通过通信来共享内存(Java是通过共享内存来通信的) 定义 func service() string {time.Sleep(time.Millisecond * 50)return "Done"}func AsyncService() chan string {retCh := mak......

刘一草
昨天
58
0
Apache Flink 零基础入门(一):基础概念解析

Apache Flink 的定义、架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速...

Vincent-Duan
昨天
60
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部