Kubernetes Node Controller源码分析之执行篇
Kubernetes Node Controller源码分析之执行篇
WaltonWang 发表于5个月前
Kubernetes Node Controller源码分析之执行篇
  • 发表于 5个月前
  • 阅读 56
  • 收藏 1
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: 我认为,Node Controller是Kubernetes几十个Controller中最为重要的Controller之一,其重要程度在Top3,然而这可能也是最为复杂的一个Controller,因此对其的源码分析,我将做一个系列文章,希望能帮助自己有一个深入浅出的理解。本博文从NodeController的Run方法作为入口,对其工作原理作了跟踪分析。

> Author: xidianwangtao@gmail.com

Node Controller的执行

Node Controller的Run方法如下,这是所有Node Controller真正处理逻辑的入口。

pkg/controller/node/nodecontroller.go:550

// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() {
	go func() {
		defer utilruntime.HandleCrash()

		if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
			utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
			return
		}

		// Incorporate the results of node status pushed from kubelet to master.
		go wait.Until(func() {
			if err := nc.monitorNodeStatus(); err != nil {
				glog.Errorf("Error monitoring node status: %v", err)
			}
		}, nc.nodeMonitorPeriod, wait.NeverStop)

		if nc.runTaintManager {
			go nc.taintManager.Run(wait.NeverStop)
		}

		if nc.useTaintBasedEvictions {
			// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
			// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
			go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
		} else {
			// Managing eviction of nodes:
			// When we delete pods off a node, if the node was not empty at the time we then
			// queue an eviction watcher. If we hit an error, retry deletion.
			go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
		}
	}()
}

WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced)

  • Node Controller首先调用WaitForCacheSync,等待PodInformer、NodeInformer、DaemonSetInformer的HasSyncs都返回true,即这三个API Object都完成同步。
vendor/k8s.io/client-go/tools/cache/shared_informer.go:100

// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
// if the contoller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    
    // 每隔100ms遍历一次cacheSyncs中的InformerSynced方法,
    // 当所有要求的cacheSyncs方法都返回true,
    // 意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,
    // 否则继续遍历。
	err := wait.PollUntil(syncedPollPeriod,
		func() (bool, error) {
			for _, syncFunc := range cacheSyncs {
				if !syncFunc() {
					return false, nil
				}
			}
			return true, nil
		},
		stopCh)
	if err != nil {
		glog.V(2).Infof("stop requested")
		return false
	}

	glog.V(4).Infof("caches populated")
	return true
}

WaitForCacheSync的实现逻辑是:

  • 每隔100ms遍历一次cacheSyncs中的InformerSynced方法,当所有要求的cacheSyncs方法都返回true,意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,

  • 否则按照100ms的周期继续遍历,知道返回true或者受到stop信号为止。

启动goruntime按照5s的周期执行monitorNodeStatus,进行Node状态监控

pkg/controller/node/nodecontroller.go:586

// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error {

	// We are listing nodes from local cache as we can tolerate some small delays
	// comparing to state from etcd and there is eventual consistency anyway.
	nodes, err := nc.nodeLister.List(labels.Everything())
	if err != nil {
		return err
	}
	
	// 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
	added, deleted := nc.checkForNodeAddedDeleted(nodes)
	
	// 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
	for i := range added {
		...
		
		// 将added node添加到knowNodeSet中
		nc.knownNodeSet[added[i].Name] = added[i]
		
		// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
		zone := utilnode.GetZoneKey(added[i])
		if _, found := nc.zoneStates[zone]; !found {
		
		   // 设置该Node对应的新zone状态为“Initial”
			nc.zoneStates[zone] = stateInitial
			
			// 如果Node Controller的useTaintBasedEvictions为false(--feature-gates中指定,默认TaintBasedEvictions=false),
			// 则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(--node-eviction-rate设置,默认为0.1)
			if !nc.useTaintBasedEvictions {
				nc.zonePodEvictor[zone] =
					NewRateLimitedTimedQueue(
						flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
			} else {
			 
			    // 如果Node Controller的useTaintBasedEvictions为true,
			    // 则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
				nc.zoneNotReadyOrUnreachableTainer[zone] =
					NewRateLimitedTimedQueue(
						flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
			}
			...
		}
		
		// 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,
		// 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
		if nc.useTaintBasedEvictions {
			nc.markNodeAsHealthy(added[i])
		} else {
		
		// 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,
		// 将该node从对应的zonePodEvictor Queue中Remove
			nc.cancelPodEviction(added[i])
		}
	}

    // 遍历deleted Nodes列表,将其从knowNodeSet中删除
	for i := range deleted {
	   ...
		delete(nc.knownNodeSet, deleted[i].Name)
	}
    
    
	zoneToNodeConditions := map[string][]*v1.NodeCondition{}
	for i := range nodes {
		...
		// PollImmediate tries a condition func until it returns true, an error, or the timeout is reached.
		// retrySleepTime为20ms,timeout为100ms。
		if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) {
		
		  // nc.tryUpdateNodeStatus - For a given node checks its conditions and tries to update it. 
		  // Returns grace period to which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
			gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
			if err == nil {
				return true, nil
			}
			...
		}); 
        ...
		}

        // 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
		// We do not treat a master node as a part of the cluster for network disruption checking.
		if !system.IsMasterNode(node.Name) {
			zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
		}

        
		decisionTimestamp := nc.now()
		if currentReadyCondition != nil {
			
			// 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理
			// Check eviction timeout against decisionTimestamp
			if observedReadyCondition.Status == v1.ConditionFalse {
			     
			     // useTaintBasedEvictions为true时,
				if nc.useTaintBasedEvictions {
					
					// 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
					// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
					if v1.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
						taintToAdd := *NotReadyTaintTemplate
						if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
							...
						}
					
					// 将node加入到Tainer Queue中,交给Taint Controller处理
					} else if nc.markNodeForTainting(node) {
						...
					}
					
				// 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
				} else {
				
				    // 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
					if decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
					
					   // 将node加入到PodEvictor Queue中,交给PodEvictor处理
						if nc.evictPods(node) {
							...
						}
					}
				}
			}
			
			// 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
			if observedReadyCondition.Status == v1.ConditionUnknown {
			
			    //  useTaintBasedEvictions为true时,
				if nc.useTaintBasedEvictions {
				
				    // 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
					// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
					if v1.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
						taintToAdd := *UnreachableTaintTemplate
						if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
                        ...
						}
						
					// 将node加入到Tainer Queue中,交给Taint Controller处理
					} else if nc.markNodeForTainting(node) {
						...
					}
				
				// 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
				} else {
				
				    // 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
					if decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
						
						// 将node加入到PodEvictor Queue中,交给PodEvictor处理
						if nc.evictPods(node) {
							...
						}
					}
				}
			}
			
			// 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
			if observedReadyCondition.Status == v1.ConditionTrue {
			
             // useTaintBasedEvictions为true时
				if nc.useTaintBasedEvictions {
				
				    // 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
					removed, err := nc.markNodeAsHealthy(node)
					...
				} else {
				
				    // useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
					if nc.cancelPodEviction(node) {
						...
					}
				}
			}

          // 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
			// Report node event.
			if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
				recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
				if err = markAllPodsNotReady(nc.kubeClient, node); err != nil {
					utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
				}
			}
			

			// Check with the cloud provider to see if the node still exists. If it
			// doesn't, delete the node immediately.
			...
		}
	}
	
	// 处理Disruption
	nc.handleDisruption(zoneToNodeConditions, nodes)

	return nil
}
  • 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
  • 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
    • 将added node添加到knowNodeSet中
    • When adding new Nodes we need to check if new zone appeared, and if so add new evictor.如果是新zone,则:
      • 设置该Node对应的新zone状态为“Initial”
      • 如果Node Controller的useTaintBasedEvictions为false(--feature-gates中指定,默认TaintBasedEvictions=false),则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(--node-eviction-rate设置,默认为0.1)
      • 如果Node Controller的useTaintBasedEvictions为true,则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
    • 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
    • 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,将该node从对应的zonePodEvictor Queue中Remove。
  • 遍历deleted Nodes列表,将其从knowNodeSet中删除
  • 遍历所有nodes,
    • 更新Node Status,得到上一次观察到的NodeCondition和当前的NodeCondition
    • 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
    • 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理。
      • useTaintBasedEvictions为true时,
        • 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
        • 将node加入到Tainer Queue中,交给Taint Controller处理
      • 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
        • 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
        • 将node加入到PodEvictor Queue中,交给PodEvictor处理
    • 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
      • useTaintBasedEvictions为true时,
        • 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
        • 将node加入到Tainer Queue中,交给Taint Controller处理
      • 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
        • 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
        • 将node加入到PodEvictor Queue中,交给PodEvictor处理
    • 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
      • useTaintBasedEvictions为true时
        • 将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
      • useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
    • 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
  • 执行handleDisruption

下面我们接着看handleDisruption的逻辑:

pkg/controller/node/nodecontroller.go:772


func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
	newZoneStates := map[string]zoneState{}
	
	// 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
	allAreFullyDisrupted := true
	for k, v := range zoneToNodeConditions {
		ZoneSize.WithLabelValues(k).Set(float64(len(v)))
		unhealthy, newState := nc.computeZoneStateFunc(v)
		ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
		UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
		if newState != stateFullDisruption {
			allAreFullyDisrupted = false
		}
		newZoneStates[k] = newState
		if _, had := nc.zoneStates[k]; !had {
			glog.Errorf("Setting initial state for unseen zone: %v", k)
			nc.zoneStates[k] = stateInitial
		}
	}

    // 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
	allWasFullyDisrupted := true
	for k, v := range nc.zoneStates {
		if _, have := zoneToNodeConditions[k]; !have {
			ZoneSize.WithLabelValues(k).Set(0)
			ZoneHealth.WithLabelValues(k).Set(100)
			UnhealthyNodes.WithLabelValues(k).Set(0)
			delete(nc.zoneStates, k)
			continue
		}
		if v != stateFullDisruption {
			allWasFullyDisrupted = false
			break
		}
	}

	// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
	// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
	// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
	// - if new state is "fullDisruption" we restore normal eviction rate,
	//   - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
	if !allAreFullyDisrupted || !allWasFullyDisrupted {
	
	   // 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,则遍历所有nodes:
		// We're switching to full disruption mode
		if allAreFullyDisrupted {
			glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
			for i := range nodes {
			     // 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
				if nc.useTaintBasedEvictions {
					_, err := nc.markNodeAsHealthy(nodes[i])
					if err != nil {
						glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
					}
				} else {
				    // 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
					nc.cancelPodEviction(nodes[i])
				}
			}
			
			// 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
			// We stop all evictions.
			for k := range nc.zoneStates {
				if nc.useTaintBasedEvictions {
					nc.zoneNotReadyOrUnreachableTainer[k].SwapLimiter(0)
				} else {
					nc.zonePodEvictor[k].SwapLimiter(0)
				}
			}
			
			// 更新所有zone的状态(nc.zoneStates)为FullDisruption
			for k := range nc.zoneStates {
				nc.zoneStates[k] = stateFullDisruption
			}
			// All rate limiters are updated, so we can return early here.
			return
		}
		
		//  如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,则遍历所有nodes:
		// We're exiting full disruption mode
		if allWasFullyDisrupted {
			glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
			
			// When exiting disruption mode update probe timestamps on all Nodes.
			now := nc.now()
			for i := range nodes {
				v := nc.nodeStatusMap[nodes[i].Name]
				v.probeTimestamp = now
				v.readyTransitionTimestamp = now
				nc.nodeStatusMap[nodes[i].Name] = v
			}
			
			
			// 根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
			// We reset all rate limiters to settings appropriate for the given state.
			for k := range nc.zoneStates {
				nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
				nc.zoneStates[k] = newZoneStates[k]
			}
			return
		}
		
		
		// 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
		// We know that there's at least one not-fully disrupted so,
		// we can use default behavior for rate limiters
		for k, v := range nc.zoneStates {
			newState := newZoneStates[k]
			if v == newState {
				continue
			}
			glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
			nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
			nc.zoneStates[k] = newState
		}
	}
}

因此,handleDisruption的逻辑为:

  • 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
  • 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
  • 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,表示switching to full disruption mode,则遍历所有nodes:
    • 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
    • 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
    • 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
    • 更新所有zone的状态(nc.zoneStates)为FullDisruption
  • 如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,表示 exiting disruption mode 则遍历所有nodes:
    • update probe timestamps on all Nodes.
    • 根据zone size和zone state,调用**setLimiterInZone**重新设置对应的Disruption rate limiter的值。
  • 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,调用**setLimiterInZone**重新设置对应的Disruption rate limiter的值。

下面接着来看setLimiterInZone的逻辑,它是如何根据zone size和zone state对应到不同的rate limiter的。

pkg/controller/node/nodecontroller.go:870

func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
	switch state {
	
	// 如果zone state为normal,则设置对应的rate limiter为evictionLimiterQPS(默认为0.1)
	case stateNormal:
		if nc.useTaintBasedEvictions {
			nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
		} else {
			nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
		}
		
	// 如果zone state为PartialDisruption,则调用nc.enterPartialDisruptionFunc来设置对应的rate limiter。
	case statePartialDisruption:
		if nc.useTaintBasedEvictions {
			nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
				nc.enterPartialDisruptionFunc(zoneSize))
		} else {
			nc.zonePodEvictor[zone].SwapLimiter(
				nc.enterPartialDisruptionFunc(zoneSize))
		}
		
	// 如果zone state为FullDisruption,则调用nc.enterFullDisruptionFunc来设置对应的rate limiter。
	case stateFullDisruption:
		if nc.useTaintBasedEvictions {
			nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
				nc.enterFullDisruptionFunc(zoneSize))
		} else {
			nc.zonePodEvictor[zone].SwapLimiter(
				nc.enterFullDisruptionFunc(zoneSize))
		}
	}
}

nc.enterFullDisruptionFunc和nc.enterPartialDisruptionFunc是在调用NewNodeController创建Node Controller的时候赋值注册的:

pkg/controller/node/nodecontroller.go:270
func NewNodeController(...) (*NodeController, error) {
    ...
    nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
    nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
    ...
}    

pkg/controller/node/nodecontroller.go:1132
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
	return nc.evictionLimiterQPS
}

// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
	if int32(nodeNum) > nc.largeClusterThreshold {
		return nc.secondaryEvictionLimiterQPS
	}
	return 0
}

因此setLimiterInZone的逻辑为:

  • zone state为PartialDisruption时,设置Tainter Queue或者Pod Evictor Queue的rate limiter为:

    • 如果当前zone size大于nc.largeClusterThreshold(默认为50),则设置为secondaryEvictionLimiterQPS(默认为0.01)
    • 否则设置为0
  • zone state为FullDisruption时,设置Tainter Queue或者Pod Evictor Queue的rate limiter为evictionLimiterQPS(默认0.1)

  • 如果zone state为normal,则设置Tainter Queue或者Pod Evictor Queue的rate limiter为evictionLimiterQPS(默认为0.1)

Run TaintManager

在Node Controller的Run方法中,接着会启动Tainter Manager:

if nc.runTaintManager {
	go nc.taintManager.Run(wait.NeverStop)
}

nc.runTaintManager通过--enable-taint-manager设置,默认为true,因此默认情况下都会启动Taint Manager。

接下来,我们看看Taint Manager Run方法中都做了些什么。

pkg/controller/node/taint_controller.go:179

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
	glog.V(0).Infof("Starting NoExecuteTaintManager")
	// Functions that are responsible for taking work items out of the workqueues and putting them
	// into channels.
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.nodeUpdateQueue.Get()
			if shutdown {
				break
			}
			nodeUpdate := item.(*nodeUpdateItem)
			select {
			case <-stopCh:
				break
			case tc.nodeUpdateChannel <- nodeUpdate:
			}
		}
	}(stopCh)

	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.podUpdateQueue.Get()
			if shutdown {
				break
			}
			podUpdate := item.(*podUpdateItem)
			select {
			case <-stopCh:
				break
			case tc.podUpdateChannel <- podUpdate:
			}
		}
	}(stopCh)

	// When processing events we want to prioritize Node updates over Pod updates,
	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
	// start evicting Pods from tainted Nodes.
	for {
		select {
		case <-stopCh:
			break
		case nodeUpdate := <-tc.nodeUpdateChannel:
			tc.handleNodeUpdate(nodeUpdate)
		case podUpdate := <-tc.podUpdateChannel:
			// If we found a Pod update we need to empty Node queue first.
		priority:
			for {
				select {
				case nodeUpdate := <-tc.nodeUpdateChannel:
					tc.handleNodeUpdate(nodeUpdate)
				default:
					break priority
				}
			}
			// After Node queue is emptied we process podUpdate.
			tc.handlePodUpdate(podUpdate)
		}
	}
}
  • 启动一个goroutine从NoExecuteTaintManager的nodeUpdateQueue中获取nodeUpdate,并扔给tc.nodeUpdateChannel。
  • 启动一个goroutine从NoExecuteTaintManager的podUpdateQueue中获取podUpdate,并扔给tc.podUpdateChannel。
  • 调用tc.handleNodeUpdate处理完所有nodeUpdate后,才会去调用tc.handlePodUpdate开始处理podUpdate。

关于NoExecuteTaintManager的handleNodeUpdate和handlePodUpdate,我会在之后专门对Taint Manager写一篇博客进行分析,在此就不会再深入下去。

doTaintingPass

如果useTaintBasedEvictions为true,即--feature-gates中指定TaintBasedEvictions为true(默认TaintBasedEvictions=false)则每隔100ms调用一次doTaintingPass。

doTaintingPass就是根据Node Condition是NotReady或者Unknown,调apiserver,分别给node打上对应的Taint:node.alpha.kubernetes.io/notReadynode.alpha.kubernetes.io/unreachable

if nc.useTaintBasedEvictions {
	// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
	// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
	go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
}

doEvictionPass

如果useTaintBasedEvictions为false(默认TaintBasedEvictions=false)则每隔100ms调用一次doEvictionPass。

else {
	// Managing eviction of nodes:
	// When we delete pods off a node, if the node was not empty at the time we then
	// queue an eviction watcher. If we hit an error, retry deletion.
	go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}

我们接着来看doEvictionPass的代码:

pkg/controller/node/nodecontroller.go:481

func (nc *NodeController) doEvictionPass() {
	...
	
	// 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,然后调用deletePods删除node上的所有pods(deamonSet对应的Pod除外)
	for k := range nc.zonePodEvictor {
		// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
		nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
			node, err := nc.nodeLister.Get(value.Value)
			...
			nodeUid, _ := value.UID.(string)
			remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
			...
			return true, 0
		})
	}
}

doEvictionPass的逻辑:

  • 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,
  • 然后调用deletePods删除node上的所有pods(deamonSet对应的Pod除外)

deletePods的代码如下,

pkg/controller/node/controller_utils.go:49

// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
	...
	// 从apiserver中获取所有的pods对象。
	pods, err := kubeClient.Core().Pods(metav1.NamespaceAll).List(options)
	...

    // 逐个遍历pods中的pod,筛选出该node上的pods
	for _, pod := range pods.Items {
		// Defensive check, also needed for tests.
		if pod.Spec.NodeName != nodeName {
			continue
		}

		...
		
		// if the pod has already been marked for deletion, we still return true that there are remaining pods.
		if pod.DeletionGracePeriodSeconds != nil {
			remaining = true
			continue
		}
		
		// if the pod is managed by a daemonset, ignore it
		_, err := daemonStore.GetPodDaemonSets(&pod)
		if err == nil { // No error means at least one daemonset was found
			continue
		}

		...
		
		// 调用apiserver接口删除pod
		if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
			return false, err
		}
		remaining = true
	}

	...
	return remaining, nil
}

deletePods的主要逻辑如下:

  • 从apiserver中获取所有的pods对象。
  • 逐个遍历pods中的pod,筛选出该node上的pods
  • 如果pod已经被标记为删除(pod.DeletionGracePeriodSeconds != nil ),我们跳过这个pod.
  • 如果pod是某个daemonset的pod,我们跳过这个pod。
  • 除此之外,调用apiserver接口删除pod。

至此,Node Controller Run方法的所有主要逻辑都已分析完成。

其中涉及到Taint Manager Run的相关逻辑,在该博文没有深入进去分析,我将在后续博文中对Taint Manager做一次专门的分析。

关于Node Controller的其他博文:

共有 人打赏支持
粉丝 101
博文 71
码字总数 131389
×
WaltonWang
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: