聊聊rocketmq-client-go的defaultProducer

原创
2020/07/05 10:51
阅读数 79

本文主要研究一下rocketmq-client-go的defaultProducer

defaultProducer

rocketmq-client-go-v2.0.0/producer/producer.go

type defaultProducer struct {
	group       string
	client      internal.RMQClient
	state       int32
	options     producerOptions
	publishInfo sync.Map
	callbackCh  chan interface{}

	interceptor primitive.Interceptor
}
  • defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor

NewDefaultProducer

rocketmq-client-go-v2.0.0/producer/producer.go

func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
	defaultOpts := defaultProducerOptions()
	for _, apply := range opts {
		apply(&defaultOpts)
	}
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
	if err != nil {
		return nil, errors.Wrap(err, "new Namesrv failed.")
	}
	if !defaultOpts.Credentials.IsEmpty() {
		srvs.SetCredentials(defaultOpts.Credentials)
	}
	defaultOpts.Namesrv = srvs

	producer := &defaultProducer{
		group:      defaultOpts.GroupName,
		callbackCh: make(chan interface{}),
		options:    defaultOpts,
	}
	producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)

	producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)

	return producer, nil
}
  • NewDefaultProducer方法通过internal.NewNamesrv创建NameServerAddrs,之后实例化defaultProducer,然后实例化internal.GetOrNewRocketMQClient及primitive.ChainInterceptors

Start

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) Start() error {
	atomic.StoreInt32(&p.state, int32(internal.StateRunning))
	if len(p.options.NameServerAddrs) == 0 {
		p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)
	}

	p.client.RegisterProducer(p.group, p)
	p.client.Start()
	return nil
}
  • Start方法之执行p.client.RegisterProducer及p.client.Start()

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) Shutdown() error {
	atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
	p.client.UnregisterProducer(p.group)
	p.client.Shutdown()
	return nil
}
  • Shutdown方法执行p.client.UnregisterProducer及p.client.Shutdown()

SendSync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Message) (*primitive.SendResult, error) {
	if err := p.checkMsg(msgs...); err != nil {
		return nil, err
	}

	msg := p.encodeBatch(msgs...)

	resp := new(primitive.SendResult)
	if p.interceptor != nil {
		primitive.WithMethod(ctx, primitive.SendSync)
		producerCtx := &primitive.ProducerCtx{
			ProducerGroup:     p.group,
			CommunicationMode: primitive.SendSync,
			BornHost:          utils.LocalIP,
			Message:           *msg,
			SendResult:        resp,
		}
		ctx = primitive.WithProducerCtx(ctx, producerCtx)

		err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error {
			var err error
			realReq := req.(*primitive.Message)
			realReply := reply.(*primitive.SendResult)
			err = p.sendSync(ctx, realReq, realReply)
			return err
		})
		return resp, err
	}

	err := p.sendSync(ctx, msg, resp)
	return resp, err
}
  • SendSync方法首先通过p.checkMsg校验消息,然后通过p.encodeBatch编码,之后对于p.interceptor不为null的执行p.interceptor,最后执行p.sendSync(ctx, msg, resp)

sendSync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {

	retryTime := 1 + p.options.RetryTimes

	var (
		err error
	)

	if p.options.Namespace != "" {
		msg.Topic = p.options.Namespace + "%" + msg.Topic
	}

	var producerCtx *primitive.ProducerCtx
	for retryCount := 0; retryCount < retryTime; retryCount++ {
		mq := p.selectMessageQueue(msg)
		if mq == nil {
			err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
			continue
		}

		addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
		if addr == "" {
			return fmt.Errorf("topic=%s route info not found", mq.Topic)
		}

		if p.interceptor != nil {
			producerCtx = primitive.GetProducerCtx(ctx)
			producerCtx.BrokerAddr = addr
			producerCtx.MQ = *mq
		}

		res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
		if _err != nil {
			err = _err
			continue
		}
		return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
	}
	return err
}
  • sendSync会重试retryCount,每次是先通过p.selectMessageQueue(msg)选择mq,然后通过p.options.Namesrv.FindBrokerAddrByName寻找addr,最后执行p.client.InvokeSync(ctx, addr, p.buildSendRequest

SendAsync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, *primitive.SendResult, error), msgs ...*primitive.Message) error {
	if err := p.checkMsg(msgs...); err != nil {
		return err
	}

	msg := p.encodeBatch(msgs...)

	if p.interceptor != nil {
		primitive.WithMethod(ctx, primitive.SendAsync)

		return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
			return p.sendAsync(ctx, msg, f)
		})
	}
	return p.sendAsync(ctx, msg, f)
}
  • SendAsync方法主要是执行p.sendAsync(ctx, msg, f)

sendAsync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
	if p.options.Namespace != "" {
		msg.Topic = p.options.Namespace + "%" + msg.Topic
	}
	mq := p.selectMessageQueue(msg)
	if mq == nil {
		return errors.Errorf("the topic=%s route info not found", msg.Topic)
	}

	addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
	if addr == "" {
		return errors.Errorf("topic=%s route info not found", mq.Topic)
	}

	ctx, _ = context.WithTimeout(ctx, 3*time.Second)
	return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
		resp := new(primitive.SendResult)
		if err != nil {
			h(ctx, nil, err)
		} else {
			p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
			h(ctx, resp, nil)
		}
	})
}
  • sendAsync主要是执行p.client.InvokeAsync

SendOneWay

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Message) error {
	if err := p.checkMsg(msgs...); err != nil {
		return err
	}

	msg := p.encodeBatch(msgs...)

	if p.interceptor != nil {
		primitive.WithMethod(ctx, primitive.SendOneway)
		return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
			return p.SendOneWay(ctx, msg)
		})
	}

	return p.sendOneWay(ctx, msg)
}
  • SendOneWay主要是执行p.sendOneWay(ctx, msg)

sendOneWay

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
	retryTime := 1 + p.options.RetryTimes

	if p.options.Namespace != "" {
		msg.Topic = p.options.Namespace + "%" + msg.Topic
	}

	var err error
	for retryCount := 0; retryCount < retryTime; retryCount++ {
		mq := p.selectMessageQueue(msg)
		if mq == nil {
			err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
			continue
		}

		addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
		if addr == "" {
			return fmt.Errorf("topic=%s route info not found", mq.Topic)
		}

		_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
		if _err != nil {
			err = _err
			continue
		}
		return nil
	}
	return err
}
  • sendOneWay主要是重试执行p.client.InvokeOneWay

小结

defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor;它提供了NewDefaultProducer、Start、Shutdown、SendSync、SendAsync、SendOneWay方法

doc

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部