聊聊dubbo-go的forkingCluster

原创
08/07 21:51
阅读数 46

本文主要研究一下dubbo-go的forkingCluster

forkingCluster

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster.go

type forkingCluster struct{}

const forking = "forking"

func init() {
	extension.SetCluster(forking, NewForkingCluster)
}

// NewForkingCluster ...
func NewForkingCluster() cluster.Cluster {
	return &forkingCluster{}
}

func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {
	return newForkingClusterInvoker(directory)
}
  • forkingCluster的Join方法执行newForkingClusterInvoker

newForkingClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

type forkingClusterInvoker struct {
	baseClusterInvoker
}

func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
	return &forkingClusterInvoker{
		baseClusterInvoker: newBaseClusterInvoker(directory),
	}
}
  • newForkingClusterInvoker创建了forkingClusterInvoker

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

// Invoke ...
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
	err := invoker.checkWhetherDestroyed()
	if err != nil {
		return &protocol.RPCResult{Err: err}
	}

	invokers := invoker.directory.List(invocation)
	err = invoker.checkInvokers(invokers, invocation)
	if err != nil {
		return &protocol.RPCResult{Err: err}
	}

	var selected []protocol.Invoker
	forks := int(invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS))
	timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT)
	if forks < 0 || forks > len(invokers) {
		selected = invokers
	} else {
		selected = make([]protocol.Invoker, 0)
		loadbalance := getLoadBalance(invokers[0], invocation)
		for i := 0; i < forks; i++ {
			ivk := invoker.doSelect(loadbalance, invocation, invokers, selected)
			if ivk != nil {
				selected = append(selected, ivk)
			}
		}
	}

	resultQ := queue.New(1)
	for _, ivk := range selected {
		go func(k protocol.Invoker) {
			result := k.Invoke(ctx, invocation)
			err := resultQ.Put(result)
			if err != nil {
				logger.Errorf("resultQ put failed with exception: %v.\n", err)
			}
		}(ivk)
	}

	rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))
	if err != nil {
		return &protocol.RPCResult{
			Err: fmt.Errorf("failed to forking invoke provider %v, "+
				"but no luck to perform the invocation. Last error is: %v", selected, err),
		}
	}
	if len(rsps) == 0 {
		return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)}
	}

	result, ok := rsps[0].(protocol.Result)
	if !ok {
		return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}
	}

	return result
}
  • Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后从invoker.GetUrl()获取forks及timeouts参数,然后循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke方法,并将结果放到resultQ中;最后通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的结果返回

小结

forkingCluster的Join方法执行newForkingClusterInvoker;其Invoke方法循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke方法,并将结果放到resultQ中;最后通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的结果返回

doc

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
在线直播报名
返回顶部
顶部