聊聊rocketmq-client-go的transactionProducer

原创
2020/07/06 23:23
阅读数 148

本文主要研究一下rocketmq-client-go的transactionProducer

transactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

type transactionProducer struct {
	producer *defaultProducer
	listener primitive.TransactionListener
}
  • transactionProducer定义了producer及listener属性

NewTransactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) {
	producer, err := NewDefaultProducer(opts...)
	if err != nil {
		return nil, errors.Wrap(err, "NewDefaultProducer failed.")
	}
	return &transactionProducer{
		producer: producer,
		listener: listener,
	}, nil
}
  • NewTransactionProducer方法实例化transactionProducer

Start

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) Start() error {
	go primitive.WithRecover(func() {
		tp.checkTransactionState()
	})
	return tp.producer.Start()
}
  • Start方法先异步执行tp.checkTransactionState(),然后执行tp.producer.Start()

checkTransactionState

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) checkTransactionState() {
	for ch := range tp.producer.callbackCh {
		switch callback := ch.(type) {
		case *internal.CheckTransactionStateCallback:
			localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
			uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
			if uniqueKey == "" {
				uniqueKey = callback.Msg.MsgId
			}
			header := &internal.EndTransactionRequestHeader{
				CommitLogOffset:      callback.Header.CommitLogOffset,
				ProducerGroup:        tp.producer.group,
				TranStateTableOffset: callback.Header.TranStateTableOffset,
				FromTransactionCheck: true,
				MsgID:                uniqueKey,
				TransactionId:        callback.Header.TransactionId,
				CommitOrRollback:     tp.transactionState(localTransactionState),
			}

			req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
			req.Remark = tp.errRemark(nil)

			err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
				tp.producer.options.SendMsgTimeout)
			if err != nil {
				rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{
					"callback":               callback.Addr.String(),
					"request":                req.String(),
					rlog.LogKeyUnderlayError: err,
				})
			}
		default:
			rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
		}
	}
}
  • checkTransactionState方法遍历tp.producer.callbackCh,根据type来不同处理,目前支持CheckTransactionStateCallback,它会构造EndTransactionRequestHeader执行tp.producer.client.InvokeOneWay

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) Shutdown() error {
	return tp.producer.Shutdown()
}
  • Shutdown方法执行tp.producer.Shutdown()

SendMessageInTransaction

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg *primitive.Message) (*primitive.TransactionSendResult, error) {
	msg.WithProperty(primitive.PropertyTransactionPrepared, "true")
	msg.WithProperty(primitive.PropertyProducerGroup, tp.producer.options.GroupName)

	rsp, err := tp.producer.SendSync(ctx, msg)
	if err != nil {
		return nil, err
	}
	localTransactionState := primitive.UnknowState
	switch rsp.Status {
	case primitive.SendOK:
		if len(rsp.TransactionID) > 0 {
			msg.WithProperty("__transactionId__", rsp.TransactionID)
		}
		transactionId := msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
		if len(transactionId) > 0 {
			msg.TransactionId = transactionId
		}
		localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
		if localTransactionState != primitive.CommitMessageState {
			rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{
				"localState": localTransactionState,
				"message":    msg,
			})
		}

	case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, primitive.SendSlaveNotAvailable:
		localTransactionState = primitive.RollbackMessageState
	default:
	}

	tp.endTransaction(*rsp, err, localTransactionState)

	transactionSendResult := &primitive.TransactionSendResult{
		SendResult: rsp,
		State:      localTransactionState,
	}

	return transactionSendResult, nil
}
  • SendMessageInTransaction方法先执行tp.producer.SendSync(ctx, msg),然后根据rsp.Status来做不同处理;对于primitive.SendOK执行tp.listener.ExecuteLocalTransaction来更新localTransactionState;对于primitive.SendFlushDiskTimeout、primitive.SendFlushSlaveTimeout、primitive.SendSlaveNotAvailable则更新localTransactionState为primitive.RollbackMessageState;最后执行tp.endTransaction

小结

transactionProducer定义了producer及listener属性;它提供了NewTransactionProducer、Start、Shutdown、SendMessageInTransaction方法

doc

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部