文档章节

聊聊rocketmq-client-go的TraceInterceptor

go4it
 go4it
发布于 07/07 21:07
字数 275
阅读 26
收藏 0

「深度学习福利」大神带你进阶工程师,立即查看>>>

本文主要研究一下rocketmq-client-go的TraceInterceptor

TraceInterceptor

rocketmq-client-go-v2.0.0/producer/interceptor.go

// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
func WithTrace(traceCfg *primitive.TraceConfig) Option {
	return func(options *producerOptions) {

		ori := options.Interceptors
		options.Interceptors = make([]primitive.Interceptor, 0)
		options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg))
		options.Interceptors = append(options.Interceptors, ori...)
	}
}
  • WithTrace方法在options.Interceptors后追加TraceInterceptor

newTraceInterceptor

rocketmq-client-go-v2.0.0/producer/interceptor.go

func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {
	dispatcher := internal.NewTraceDispatcher(traceCfg)
	dispatcher.Start()

	return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
		beginT := time.Now()
		err := next(ctx, req, reply)

		producerCtx := primitive.GetProducerCtx(ctx)
		if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {
			return next(ctx, req, reply)
		}

		// SendOneway && SendAsync has no reply.
		if reply == nil {
			return err
		}

		result := reply.(*primitive.SendResult)
		if result.RegionID == "" || !result.TraceOn {
			return err
		}

		sendSuccess := result.Status == primitive.SendOK
		costT := time.Since(beginT).Nanoseconds() / int64(time.Millisecond)
		storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2

		traceBean := internal.TraceBean{
			Topic:       producerCtx.Message.Topic,
			Tags:        producerCtx.Message.GetTags(),
			Keys:        producerCtx.Message.GetKeys(),
			StoreHost:   producerCtx.BrokerAddr,
			ClientHost:  utils.LocalIP,
			BodyLength:  len(producerCtx.Message.Body),
			MsgType:     producerCtx.MsgType,
			MsgId:       result.MsgID,
			OffsetMsgId: result.OffsetMsgID,
			StoreTime:   storeT,
		}

		traceCtx := internal.TraceContext{
			RequestId: primitive.CreateUniqID(), // set id
			TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),

			TraceType:  internal.Pub,
			GroupName:  producerCtx.ProducerGroup,
			RegionId:   result.RegionID,
			TraceBeans: []internal.TraceBean{traceBean},
			CostTime:   costT,
			IsSuccess:  sendSuccess,
		}
		dispatcher.Append(traceCtx)
		return err
	}
}
  • newTraceInterceptor方法首先通过internal.NewTraceDispatcher(traceCfg)创建dispatcher,然后执行dispatcher.Start方法,之后返回一个func,该func会构造traceCtx,然后执行dispatcher.Append(traceCtx)

小结

WithTrace方法在options.Interceptors后追加TraceInterceptor;而newTraceInterceptor方法则创建TraceInterceptor

doc

go4it
粉丝 95
博文 1424
码字总数 1298645
作品 0
深圳
私信 提问
加载中
请先登录后再评论。
StreetPass

StreetPass,最初是想模拟任天堂NDS掌机中的StreetPass开发的(但未完成,大体框架已可行)。适用于记录每天在街上偶遇的纸妹Or帅锅的信息。 原理是想采用Wifi hot技术,由此可以在搜索到附近...

口米巴
2013/03/18
1.6K
0
即时通讯工具--easytalker

Donate捐助 EasyTalker 是开源的集群聊与私聊为一体的即时通讯工具,你可以同时下载服务器和客户端程序,开启你与朋友、舍友、同学、同事之间的聊天之旅。 软件特色,你可以在群聊窗口直接@...

刘学炜
2013/01/16
6.8K
1
PHP Client for Mysql Binlog

一个PHP扩展,PHP Client for Mysql Binlog,功能类似于异步触发器。php-binlog能够连接到生产Binlog的Mysql服务器,实时解析出Query语句和Row数据,项目已托管在GitHub:https://github.co...

顾伟刚
2013/04/18
1K
0
Alfresco Explorer客户化定制配置

有几种不同的方法定制Explorer配置选项,Explorer 配置文件是web-client-config-custom.xml   一、在目录修改 Explorer配置文件   1、打开 web-client-config-custom.xml 文件。   2、...

liubang
2012/07/19
833
0
全站 HTTPS 来了

各位使用百度、谷歌或淘宝的时候,有没有注意浏览器左上角已经全部出现了一把绿色锁,这把锁表明该网站已经使用了 HTTPS 进行保护。仔细观察,会发现这些网站已经全站使用 HTTPS。同时,iOS...

腾讯Bugly
2015/12/25
2.1K
5

没有更多内容

加载失败,请刷新页面

加载更多

Model S被18轮重卡撞烂 乘客在车辆保护下幸存

日前,国外一位名为quarm813的网友在社交媒体分享了“Model S救他和女儿性命”的经历。 据该用户描述,当地时间7月31日,他驾驶Model S在高速公路快车道上行驶时,一辆18轮重卡突然实线并线闯...

osc_fipgtxy8
58分钟前
7
0
Redis-cluster5.x集群搭建

1.下载redis5.0.2 wget http://download.redis.io/releases/redis-5.0.2.tar.gz #官网下载 tar xzf redis-5.0.2.tar.gz #解压cd redis-5.0.2 yum install gcc #需要gcc来编......

osc_zzg7fpke
今天
11
0
CGB2004-京淘项目Day12

1.还原系统配置 1.1 释放Linux资源 1.1.1 停止数据库主从服务 1.1.2 关闭数据库服务 说明:关闭数据库服务器. 1.1.3 关闭tomcat/mycat服务器 1.1.4关闭nginx服务器 1.2 修改代码中的配置 1.2....

osc_3361hjxk
今天
16
0
【北京迅为】初识i.MX6ULL终结者开发板

目录 一、 开发板初体验 1. 初识i.MX6ULL终结者开发板 一、 开发板初体验 i.MX6ULL终结者开发板是北京迅为电子推出的一款Cortex-A7架构的开发板。采用核心板+底板的方式,如下图所示: 经典蓝...

osc_0esgtdby
今天
8
0
如何利用基于PXI的下一代ATE系统测试平台进行军事/航天/卫星电子设备测试

前言 自动测试设备(ATE)系统用于在生产产品或产品使用过程中测试电子组件,子组件或完整系统的功能和性能,以确保他们可操作性。对设备、电路板、子组件或系统的测试要求从简单到复杂,设计...

osc_mxz6aybo
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部