文档章节

剖析Kubernetes EnableEquivalenceClassCache提升Scheduler吞吐量的工作机制

WaltonWang
 WaltonWang
发布于 05/17 00:48
字数 4852
阅读 317
收藏 3

Author: xidianwangtao@gmail.com

Equivalence Class概念及其意义

2015年,google发表的关于Borg的论文“Large-scale cluster management at Google with Borg”中对Equivalence Class的描述如下:

Equivalence classes: Tasks in a Borg job usually have identical requirements and constraints, so rather than determining feasibility for every pending task on every machine, and scoring all the feasible machines, Borg only does feasibility and scoring for one task per equivalence class – a group of tasks with identical requirements.

Equivalence Class目前是用来在Kubernetes Scheduler加速Predicate,提升Scheduler的吞吐性能。Kubernetes scheduler及时维护着Equivalence Cache的数据,当某些情况发生时(比如delete node、bind pod等事件),需要立刻invalid相关的Equivalence Cache中的缓存数据。

一个Equivalence Class是用来定义一组具有相同Requirements和Constraints的Pods的相关信息的集合,在Scheduler进行Predicate阶段时可以只需对Equivalence Class中一个Pod进行Predicate,并把Predicate的结果放到Equivalence Cache中以供该Equivalence Class中其他Pods(成为Equivalent Pods)重用该结果。只有当Equivalence Cache中没有可以重用的Predicate Result才会进行正常的Predicate流程。

什么样的Pods会被归类到同一Equivalence Class呢?按照其定义,其实只要Pods有某些相同的field,比如resources requirement、label、affinity等,它们都被认为是Equivalent Pods,属于同一Equivalence Class。但是考虑到用户可能随时修改Pods的fields,会导致Scheduler需要及时更新该Pod所属的Equivalence Class变动,从而导致可能正在进行的Predicate需要感知这一变化并作出变更,这使得问题变得异常复杂。因此,目前Scheduler只把那些属于同一OwnerReference(包括RC,RS,Job, StatefulSet)的Pods归类到同一Equivalence Class,比如某个RS定义了N个副本,那么这N个副本Pods就对应一个Equivalence Class。Scheduler会为每个Equivalence Class中的Equivalent Pods计算出一个uint64 EquivalenceHash值。

注意,截止Kubernetes 1.10,即使有两个一样Pod Template的RS,也会对应两个Equivalence Class。

Equivalence Class工作原理

要想使用Equivalence Class需要启用EnableEquivalenceClassCache Feature Gate,截止Kubernetes 1.10,该Feature还是Alpha阶段。

前期我的几遍关于scheduler的博客中对Predicate的分析中提到,所有注册成功的Predicate Policy都会在scheduler.findNodesThatFit(pod, nodes, predicateFuncs ...)过程中按照一定的并行数对每个node调用scheduler.podFitsOnNode(pod, node, predicateFuncs ...)进行注册的Predicate Policys检查。

podFitsOnNode的输入是一个pod,一个node和一系列注册成功的predicateFuncs,用来检查node是否满足该pod的预选条件。加入了Equivalence Class之后,预选阶段会发生了如下变化:

  • 预选之前,先检查该pod是否有对应的Equivalence Class。
  • 如果有对应的Equivalence Class,那么接下来检查Equivalence Cache中是否有可用的Predicate Result,否则触发完整的正常预选。
  • 如果有可用的Predicate Result,那么直接使用该Cached Predicate Result完成预选,否则触发完整的正常预选。

Equivalence Cache会存储每个node的Predicates Results,是一个3层Map对象:

  • 第一层key是node name,表示节点名称;
  • 第二层key是predicateKey,表示预选策略,因此该node对应的algorithmCache Entries数量最多不超过Scheduler注册的Predicate Policies数量,这用来保证Cache大小,防止查找Equivalence Cache时性能太差。
  • 第三层key是Equivalence Hash,前面已经提到过。

比如,algorithmCache[$nodeName].predicatesCache.Get($predicateKey)[$equivalenceHash]表示$equivalenceHash对应的Pods在$nodeName节点上进行$predicateKey进行预选是否成功。

截止Kubernetes 1.10,predicateKey支持列表如下(20个):

  • MatchInterPodAffinity
  • CheckVolumeBinding
  • CheckNodeCondition
  • GeneralPredicates
  • HostName
  • PodFitsHostPorts
  • MatchNodeSelector
  • PodFitsResources
  • NoDiskConflict
  • PodToleratesNodeTaints
  • CheckNodeUnschedulable
  • PodToleratesNodeNoExecuteTaints
  • CheckNodeLabelPresence
  • CheckServiceAffinity
  • MaxEBSVolumeCount
  • MaxGCEPDVolumeCount
  • MaxAzureDiskVolumeCount
  • NoVolumeZoneConflict
  • CheckNodeMemoryPressure
  • CheckNodeDiskPressure

注意,即使该Pod找到对应的Equivalence Class,Equivalence Cache中也有可能没有可用的Predicate Result,或者对应的Predicate Result已经失效。这时就会触发正常的Predicate,并把Result写到Equivalence Cache中。

如何维护和更新Equivalence Cache呢?如果频繁的更新整个node对应的Equivalence Cache,这违背了Equivalence Cache设计的初衷,并不能提升Predicate的效率。

前面提到过Equivalence Cache的三层Map结构设计,第二层Key是predicateKey,因此Scheduler能做到只invalid单个Predicate Result,而不是盲目的invalid整个node的algorithmCache。

Scheduler会Watch相关API Objects Add/Update/Delete Event,并根据相关策略invalid对应的Equivalence Cache数据,具体的逻辑请看下面的源码分析部分。

Equivalence Class源码分析

Equivalence Cache数据结构

Equivalence Cache结构定义如下:

// EquivalenceCache holds:
// 1. a map of AlgorithmCache with node name as key
// 2. function to get equivalence pod
type EquivalenceCache struct {
	sync.RWMutex
	getEquivalencePod algorithm.GetEquivalencePodFunc
	algorithmCache    map[string]AlgorithmCache
}

// The AlgorithmCache stores PredicateMap with predicate name as key
type AlgorithmCache struct {
	// Only consider predicates for now
	predicatesCache *lru.Cache
}
  • Equivalence Cache真正的缓存数据是通过algorithmCache Map存储,其key为nodeName。

  • 每个node上的Predicate Result Cache通过AlgorithmCache.predicateCache存储,predicateCache是LRU(Least Recently Used,最少最近使用算法)Cache,只能存储一定数量的Entries,Kubernetes中指定最大值为100(Kubernetes 1.10默认实现的Predicate Funcs一共有20个)。

    LRU Cache是一个Cache置换算法,含义是“最近最少使用”,当Cache满(没有空闲的cache块)时,把满足“最近最少使用”的数据从Cache中置换出去,并且保证Cache中第一个数据是最近刚刚访问的。由“局部性原理”,这样的数据更有可能被接下来的程序访问,提升性能。

  • predicateCache也是k-v存储,key为predicateKey,value为PredicateMap。

  • predicateMap的key为uint64的Equivalence Hash,value为HostPredicate。

  • HostPredicate用来表示Pod使用Predicate Policy与某个node的匹配结果,结构如下:

    // HostPredicate is the cached predicate result
    type HostPredicate struct {
    	Fit         bool
    	FailReasons []algorithm.PredicateFailureReason
    }
    

输入图片说明

Equivalence Cache的核心操作

  • InvalidateCachedPredicateItem:用来从Equivalence Cache中删除某个node上某个predicate policy的所有EquivalenceHash(对应Equivalent Pods)的Predicate Result缓存数据。
func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) {
	...
	if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
		for predicateKey := range predicateKeys {
			algorithmCache.predicatesCache.Remove(predicateKey)
		}
	}
	...
}
  • InvalidateCachedPredicateItemOfAllNodes:用来删除所有node上指定predicate policy集合对应的所有EquivalenceHash(对应Equivalent Pods)的Predicate Result缓存数据。
func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) {
	...
	// algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates
	for _, algorithmCache := range ec.algorithmCache {
		for predicateKey := range predicateKeys {
			// just use keys is enough
			algorithmCache.predicatesCache.Remove(predicateKey)
		}
	}
	...
}
  • PredicateWithECache:检查Equivalence Cache中的Predicate Result缓存数据是否有可用的数据,如果命中缓存,则直接根据缓存中的Predicate Result作为该pod在该node上该Predicate policy的预选结果返回。如果没命中,则返回false和失败原因。
// PredicateWithECache returns:
// 1. if fit
// 2. reasons if not fit
// 3. if this cache is invalid
// based on cached predicate results
func (ec *EquivalenceCache) PredicateWithECache(
	podName, nodeName, predicateKey string,
	equivalenceHash uint64, needLock bool,
) (bool, []algorithm.PredicateFailureReason, bool) {
	...
	if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
		if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
			predicateMap := cachePredicate.(PredicateMap)
			// TODO(resouer) Is it possible a race that cache failed to update immediately?
			if hostPredicate, ok := predicateMap[equivalenceHash]; ok {
				if hostPredicate.Fit {
					return true, []algorithm.PredicateFailureReason{}, false
				}
				return false, hostPredicate.FailReasons, false
			}
			// is invalid
			return false, []algorithm.PredicateFailureReason{}, true
		}
	}
	return false, []algorithm.PredicateFailureReason{}, true
}
  • UpdateCachedPredicateItem:当PredicateWithECache使用Predicate Result Cache数据命中失败时,scheduler会调用对应的Predicate Funcs触发真正的预选逻辑,完成之后,就通过UpdateCachedPredicateItem将刚预选的结果更新到Equivalence Cache缓存中。每个node的predicateCache的初始化也是在这里完成的。
// UpdateCachedPredicateItem updates pod predicate for equivalence class
func (ec *EquivalenceCache) UpdateCachedPredicateItem(
	podName, nodeName, predicateKey string,
	fit bool,
	reasons []algorithm.PredicateFailureReason,
	equivalenceHash uint64,
	needLock bool,
) {
	...
	if _, exist := ec.algorithmCache[nodeName]; !exist {
		ec.algorithmCache[nodeName] = newAlgorithmCache()
	}
	predicateItem := HostPredicate{
		Fit:         fit,
		FailReasons: reasons,
	}
	// if cached predicate map already exists, just update the predicate by key
	if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok {
		predicateMap := v.(PredicateMap)
		// maps in golang are references, no need to add them back
		predicateMap[equivalenceHash] = predicateItem
	} else {
		ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey,
			PredicateMap{
				equivalenceHash: predicateItem,
			})
	}
}

Equivalence Cache的初始化

Kubernetes在注册predicates、priorities、scheduler extenders时,同时也会进行Equivalence Cache的初始化,并将其传入scheduler config中。

// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
	...
	// Init equivalence class cache
	if c.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil {
		pluginArgs, err := c.getPluginArgs()
		if err != nil {
			return nil, err
		}
		c.equivalencePodCache = core.NewEquivalenceCache(
			getEquivalencePodFuncFactory(*pluginArgs),
		)
		glog.Info("Created equivalence class cache")
	}
	...
}

// NewEquivalenceCache creates a EquivalenceCache object.
func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache {
	return &EquivalenceCache{
		getEquivalencePod: getEquivalencePodFunc,
		algorithmCache:    make(map[string]AlgorithmCache),
	}
}

NewEquivalenceCache负责Equivalence Cache的初始化工作,那么getEquivalencePod又是在哪完成注册的呢?defualt algorithm provider初始化时完成注册GetEquivalencePodFunc(只能使用defualt provider?通过configfile就不行吗?),注意这里factory.PluginFactoryArgs只传入了PVCInfo。

GetEquivalencePodFunc is a function that gets a EquivalencePod from a pod.

pkg/scheduler/algorithmprovider/defaults/defaults.go:38

func init() {
	...
	// Use equivalence class to speed up heavy predicates phase.
	factory.RegisterGetEquivalencePodFunction(
		func(args factory.PluginFactoryArgs) algorithm.GetEquivalencePodFunc {
			return predicates.NewEquivalencePodGenerator(args.PVCInfo)
		},
	)
	...
}	

为什么只传入PVCInfo呢?或者为什么需要PVCInfo呢?要回答这个问题,我们先来看看EquivalencePod和getEquivalencePod的定义。

// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
type EquivalencePod struct {
	ControllerRef metav1.OwnerReference
	PVCSet        sets.String
}

EquivalencePod定义了具备哪些相同属性的Pods属于Equivalent Pods,Equivalence Hash就是根据Pod的EquivalencePod中指定的两个属性来计算的,这两个属性分别是:

  • ControllerRef:对应Pod的meta.OwnerReference,对应Pod所属的Controller Object,可以是RS,RC,Job,StatefulSet类型之一。
  • PVCSet:是Pod所引用的所有PVCs IDs集合。

因此,只有两个Pod属于同一个Controller并且引用可同样的PVCs对象才被认为是EquivalentPod,对应同一个Equivalence Hash。

getEquivalencePod根据Pod Object中的OwnerReference和PVC信息获取它所属的EquivalencePod对象。

func (e *EquivalencePodGenerator) getEquivalencePod(pod *v1.Pod) interface{} {
	for _, ref := range pod.OwnerReferences {
		if ref.Controller != nil && *ref.Controller {
			pvcSet, err := e.getPVCSet(pod)
			if err == nil {
				// A pod can only belongs to one controller, so let's return.
				return &EquivalencePod{
					ControllerRef: ref,
					PVCSet:        pvcSet,
				}
			}
			return nil
		}
	}
	return nil
}

何时生成Pod对应的Equivalence Hash

预选的入口是findNodesThatFit,也就是在findNodesThatFit中调用了getEquivalenceClassInfo计算Pod的EquivalenceHash,然后把该hash值传入podFitsOnNode中进行后续的Equivalence Class功能。

func findNodesThatFit(
	pod *v1.Pod,
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
	nodes []*v1.Node,
	predicateFuncs map[string]algorithm.FitPredicate,
	extenders []algorithm.SchedulerExtender,
	metadataProducer algorithm.PredicateMetadataProducer,
	ecache *EquivalenceCache,
	schedulingQueue SchedulingQueue,
	alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
	...

		var equivCacheInfo *equivalenceClassInfo
		if ecache != nil {
			// getEquivalenceClassInfo will return immediately if no equivalence pod found
			equivCacheInfo = ecache.getEquivalenceClassInfo(pod)
		}

		checkNode := func(i int) {
			nodeName := nodes[i].Name
			fits, failedPredicates, err := podFitsOnNode(
				pod,
				meta,
				nodeNameToInfo[nodeName],
				predicateFuncs,
				ecache,
				schedulingQueue,
				alwaysCheckAllPredicates,
				equivCacheInfo,
			)
			...
		}
		...
	}

getEquivalenceClassInfo计算pod的EquivalenceHash的原理如下:

// getEquivalenceClassInfo returns the equivalence class of given pod.
func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo {
	equivalencePod := ec.getEquivalencePod(pod)
	if equivalencePod != nil {
		hash := fnv.New32a()
		hashutil.DeepHashObject(hash, equivalencePod)
		return &equivalenceClassInfo{
			hash: uint64(hash.Sum32()),
		}
	}
	return nil
}

可见,EquivalenceHash就是对getEquivalencePod利用FNV算法进行哈希的。

Equivalent Pod的Predicate Result何时加到PredicateCache中

我们先看看podFitsOnNode的相关实现:

func podFitsOnNode(
	pod *v1.Pod,
	meta algorithm.PredicateMetadata,
	info *schedulercache.NodeInfo,
	predicateFuncs map[string]algorithm.FitPredicate,
	ecache *EquivalenceCache,
	queue SchedulingQueue,
	alwaysCheckAllPredicates bool,
	equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
	...
			if predicate, exist := predicateFuncs[predicateKey]; exist {
				// Use an in-line function to guarantee invocation of ecache.Unlock()
				// when the in-line function returns.
				func() {
					var invalid bool
					if eCacheAvailable {
						// Lock ecache here to avoid a race condition against cache invalidation invoked
						// in event handlers. This race has existed despite locks in equivClassCacheimplementation.
						ecache.Lock()
						defer ecache.Unlock()
						// PredicateWithECache will return its cached predicate results.
						fit, reasons, invalid = ecache.PredicateWithECache(
							pod.GetName(), info.Node().GetName(),
							predicateKey, equivCacheInfo.hash, false)
					}

					if !eCacheAvailable || invalid {
						// we need to execute predicate functions since equivalence cache does not work
						fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
						if err != nil {
							return
						}

						if eCacheAvailable {
							// Store data to update equivClassCacheafter this loop.
							if res, exists := predicateResults[predicateKey]; exists {
								res.Fit = res.Fit && fit
								res.FailReasons = append(res.FailReasons, reasons...)
								predicateResults[predicateKey] = res
							} else {
								predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
							}
							result := predicateResults[predicateKey]
							ecache.UpdateCachedPredicateItem(
								pod.GetName(), info.Node().GetName(),
								predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
						}
					}
				}()

				...
}

podFitsOnNode时会先通过PredicateWithECache检查是否Equivalence Cache中有该缓存命中:

  • 如果有命中数据可用,则对应的Predicate Policy就算处理完成。
  • 如果没有命中数据才会触发调用predicate,然后将predicate的结果通过UpdateCachedPredicateItem添加/更新到缓存中。

维护Equivalence Cache

我们回到Scheduler Config Factory,看看Scheduler中podInformer、nodeInformer、serviceInformer、pvcInformer等注册的EventHandler中对Equivalence Cache的操作。

Assume Pod

当完成pod的调度后,在Bind Node之前,会先进行Pod Assume,在Assume过程中,会对Equivalence Cache有操作。

// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
	...

	// Optimistically assume that the binding will succeed, so we need to invalidate affected
	// predicates in equivalence cache.
	// If the binding fails, these invalidated item will not break anything.
	if sched.config.Ecache != nil {
		sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
	}
	return nil
}

Assume Pod时调用InvalidateCachedPredicateItemForPodAdd对Equivalence Cache进行操作。

func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
	// GeneralPredicates: will always be affected by adding a new pod
	invalidPredicates := sets.NewString("GeneralPredicates")

	// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision.
	for _, vol := range pod.Spec.Volumes {
		if vol.PersistentVolumeClaim != nil {
			invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount")
		} else {
			if vol.AWSElasticBlockStore != nil {
				invalidPredicates.Insert("MaxEBSVolumeCount")
			}
			if vol.GCEPersistentDisk != nil {
				invalidPredicates.Insert("MaxGCEPDVolumeCount")
			}
			if vol.AzureDisk != nil {
				invalidPredicates.Insert("MaxAzureDiskVolumeCount")
			}
		}
	}
	ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
}

InvalidateCachedPredicateItemForPodAdd中可以看出,Assume Pod会删除该node上以下predicateKey对应的predicateCache:

  • GeneralPredicates;
  • 如果该pod中引用了PVCs,则会删除"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount"这些PredicateCaches;
  • 如果pod volume中使用了AWSElasticBlockStore,则会删除MaxEBSVolumeCount PredicateCache;
  • 如果pod volume中使用了GCEPersistentDisk,则会删除MaxGCEPDVolumeCount PredicateCache;
  • 如果pod volume中使用了AzureDisk,则会删除MaxAzureDiskVolumeCount PredicateCache;

Update Pod in Scheduled Pod Cache

在scheduler进行NewConfigFactory时,注册Update assignedNonTerminatedPod Event Handler为updatePodInCache。

func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
	...
	c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
	c.podQueue.AssignedPodUpdated(newPod)
}

func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
	if c.enableEquivalenceClassCache {
		// if the pod does not have bound node, updating equivalence cache is meaningless;
		// if pod's bound node has been changed, that case should be handled by pod add & delete.
		if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
			if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
				// MatchInterPodAffinity need to be reconsidered for this node,
				// as well as all nodes in its same failure domain.
				c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
					matchInterPodAffinitySet)
			}
			// if requested container resource changed, invalidate GeneralPredicates of this node
			if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
				predicates.GetResourceRequest(oldPod)) {
				c.equivalencePodCache.InvalidateCachedPredicateItem(
					newPod.Spec.NodeName, generalPredicatesSets)
			}
		}
	}
}

updatePodInCache调用invalidateCachedPredicatesOnUpdatePod对Equivalence Cache做了如下处理:

  • 如果pod Labels做了更新,那么会删除所有nodes上Equivalence Cache中的MatchInterPodAffinity PredicateCache;
  • 如果pod的resource request做了更新,那么会删除该node上Equivalence Cache中的GeneralPredicates PredicateCache;

Delete Pod in Scheduled Pod Cache

同样的,当发生删除assignedNonTerminatedPod时,对应会调用invalidateCachedPredicatesOnDeletePod更新Equivalence Cache。

func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
	if c.enableEquivalenceClassCache {
		// part of this case is the same as pod add.
		c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName)
		// MatchInterPodAffinity need to be reconsidered for this node,
		// as well as all nodes in its same failure domain.
		// TODO(resouer) can we just do this for nodes in the same failure domain
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
			matchInterPodAffinitySet)

		// if this pod have these PV, cached result of disk conflict will become invalid.
		for _, volume := range pod.Spec.Volumes {
			if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
				volume.RBD != nil || volume.ISCSI != nil {
				c.equivalencePodCache.InvalidateCachedPredicateItem(
					pod.Spec.NodeName, noDiskConflictSet)
			}
		}
	}
}

invalidateCachedPredicatesOnDeletePod更新Equivalence Cache的处理总结为:

  • 删除该node上Equivalence Cache中的GeneralPredicates PredicateCache;
  • 如果该pod中引用了PVCs,则会删除该node上Equivalence Cache中的"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount"这些PredicateCaches;
  • 如果pod volume中使用了AWSElasticBlockStore,则会删除该node上Equivalence Cache中的MaxEBSVolumeCount PredicateCache;
  • 如果pod volume中使用了GCEPersistentDisk,则会删除该node上Equivalence Cache中的MaxGCEPDVolumeCount PredicateCache;
  • 如果pod volume中使用了AzureDisk,则会删除该node上Equivalence Cache中的MaxAzureDiskVolumeCount PredicateCache;
  • 删除所有nodes上Equivalence Cache中的MatchInterPodAffinity PredicateCache;
  • 如果pod的resource request做了更新,那么会删除该node上Equivalence Cache中的GeneralPredicates PredicateCache;
  • 如果pod volume中引用了GCEPersistentDisk、AWSElasticBlockStore、RBD、ISCSI之一,则删除该node上Equivalence Cache中的NoDiskConflict PredicateCache。

Update Node

当发生node update event时,对应会调用invalidateCachedPredicatesOnNodeUpdate更新Equivalence Cache。

func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
	if c.enableEquivalenceClassCache {
		// Begin to update equivalence cache based on node update
		// TODO(resouer): think about lazily initialize this set
		invalidPredicates := sets.NewString()

		if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
			invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources"
		}
		if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) {
			invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches"
			for k, v := range oldNode.GetLabels() {
				// any label can be topology key of pod, we have to invalidate in all cases
				if v != newNode.GetLabels()[k] {
					invalidPredicates.Insert(predicates.MatchInterPodAffinityPred)
				}
				// NoVolumeZoneConflict will only be affected by zone related label change
				if isZoneRegionLabel(k) {
					if v != newNode.GetLabels()[k] {
						invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
					}
				}
			}
		}

		oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations())
		if oldErr != nil {
			glog.Errorf("Failed to get taints from old node annotation for equivalence cache")
		}
		newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations())
		if newErr != nil {
			glog.Errorf("Failed to get taints from new node annotation for equivalence cache")
		}
		if !reflect.DeepEqual(oldTaints, newTaints) ||
			!reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) {
			invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred)
		}

		if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) {
			oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
			newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
			for _, cond := range oldNode.Status.Conditions {
				oldConditions[cond.Type] = cond.Status
			}
			for _, cond := range newNode.Status.Conditions {
				newConditions[cond.Type] = cond.Status
			}
			if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] {
				invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred)
			}
			if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] {
				invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred)
			}
			if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] ||
				oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] ||
				oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] {
				invalidPredicates.Insert(predicates.CheckNodeConditionPred)
			}
		}
		if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable {
			invalidPredicates.Insert(predicates.CheckNodeConditionPred)
		}
		c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates)
	}
}

因此,node update时,会删除该node对应的Equivalence Cache中如下PredicateKey的PredicateCache:

  • GeneralPredicates, 前提:node.Status.Allocatable或node labels发生变更.
  • ServiceAffinity, 前提:node labels发生变更。
  • MatchInterPodAffinity, 前提:node labels发生变更。
  • NoVolumeZoneConflict, 前提:failure-domain.beta.kubernetes.io/zone或failure-domain.beta.kubernetes.io/region Annotation发生变更;
  • PodToleratesNodeTaints, 前提: Node的Taints(对应scheduler.alpha.kubernetes.io/taints Annotation)发生变更.
  • CheckNodeMemoryPressure, CheckNodeDiskPressure, CheckNodeCondition, 前提:如果对应的Node Condition发生变更。

Delete Node

当发生node delete event时,对应会调用InvalidateAllCachedPredicateItemOfNode更新Equivalence Cache。

// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid
func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) {
	ec.Lock()
	defer ec.Unlock()
	delete(ec.algorithmCache, nodeName)
	glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
}

因此,node delete时,则会从Equivalence Cache中删除整个node对应的algorthmCache。

Add or Delete PV

当发生pv add或者delete event时,对应会调用invalidatePredicatesForPv更新Equivalence Cache。

func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
	// You could have a PVC that points to a PV, but the PV object doesn't exist.
	// So when the PV object gets added, we can recount.
	invalidPredicates := sets.NewString()

	// PV types which impact MaxPDVolumeCountPredicate
	if pv.Spec.AWSElasticBlockStore != nil {
		invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
	}
	if pv.Spec.GCEPersistentDisk != nil {
		invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
	}
	if pv.Spec.AzureDisk != nil {
		invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
	}

	// If PV contains zone related label, it may impact cached NoVolumeZoneConflict
	for k := range pv.Labels {
		if isZoneRegionLabel(k) {
			invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
			break
		}
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Add/delete impacts the available PVs to choose from
		invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
	}

	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

因此,当add或者delete PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • MaxEBSVolumeCount, MaxGCEPDVolumeCount, MaxAzureDiskVolumeCount,前提:PV类型是这三者的范围内;

Update PV

当发生pv update event时,对应会调用invalidatePredicatesForPvUpdate更新Equivalence Cache。

func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
	invalidPredicates := sets.NewString()
	for k, v := range newPV.Labels {
		// If PV update modifies the zone/region labels.
		if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) {
			invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
			break
		}
	}
	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

因此,当update PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • NoVolumeZoneConflict, 前提:PV的failure-domain.beta.kubernetes.io/zone或failure-domain.beta.kubernetes.io/region Annotation发生变更;

Add or Delete PVC

func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
	// We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod

	// The bound volume type may change
	invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...)

	// The bound volume's label may change
	invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Add/delete impacts the available PVs to choose from
		invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
	}
	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

当发生pvc add或者delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount" PredicateCaches;
  • NoVolumeZoneConflict PredicateCaches;
  • CheckVolumeBinding,前提,VolumeScheduling这个Feature Gate是启用状态;

Update PVC

func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
	invalidPredicates := sets.NewString()

	if old.Spec.VolumeName != new.Spec.VolumeName {
		if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
			// PVC volume binding has changed
			invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
		}
		// The bound volume type may change
		invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
	}

	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

当发生pvc update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • CheckVolumeBinding,前提:VolumeScheduling这个Feature Gate是启用状态,并且PVC对应的PV发生变更;
  • "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount" PredicateCaches,前提:PVC对应的PV发生变更;

Add or Delete Service

func (c *configFactory) onServiceAdd(obj interface{}) {
	if c.enableEquivalenceClassCache {
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
	}
	c.podQueue.MoveAllToActiveQueue()
}

func (c *configFactory) onServiceDelete(obj interface{}) {
	if c.enableEquivalenceClassCache {
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
	}
	c.podQueue.MoveAllToActiveQueue()
}

当发生Service Add或Delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • CheckServiceAffinity;

Update Service

func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
	if c.enableEquivalenceClassCache {
		// TODO(resouer) We may need to invalidate this for specified group of pods only
		oldService := oldObj.(*v1.Service)
		newService := newObj.(*v1.Service)
		if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
			c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
		}
	}
	c.podQueue.MoveAllToActiveQueue()
}

当发生Service Update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • CheckServiceAffinity,前提:Service的Selector发生变更。

Equivalence Class的不足

  • Equivalence Class Feature最困难的就是如何最优的维护和更新Equivalence Cache,做到每次更新都是最小粒度的、准确无误的,目前这方面还需优化。

  • Equivalence Cache只缓存Predicate Result,并不支持Priority Result数据的缓存和维护(社区正在实现基于Map-Reduce方式优化),通常情况下,Priority Funcs的处理逻辑要比Predicate Funcs复杂,支持的意义就更大。

  • Equivalence Class目前只能根据Pod对应的OwnerReference和PVC信息进行Equivalence Hash,如果能摒弃OwnerReference的考虑,充分考虑Pod spec中那些核心的field,比如resource request, Labels,Affinity等,缓存命中的几率可能会大的多,Predicate的性能就能得到更显著的提升。

总结

Equivalence Class是用来给Kubernetes Scheduler加速Predicate,从而提升Scheduler的吞吐性能。当然,普通用户其实无需关注Equivalence Class Feature,因为目前的scheduler性能对大部分用户来说已经足够了,但对于有大规模AI训练场景的用户,可以多关注它。

© 著作权归作者所有

共有 人打赏支持
WaltonWang
粉丝 166
博文 92
码字总数 187622
作品 0
深圳
程序员
Kubernetes核心组件解析

众所周知,Kubernetes是目前最为火热的容器编排工具之一,其背后有如此多的追随者必然是有原因的。首先Kubernetes非常轻量,通常Kubernetes都是以容器作为载体,而容器本来就具有轻量级秒级部...

Docker
08/05
0
0
Kubernetes内部组件工作原理介绍

本篇文章讲述了Kubernetes内部组件的工作原理,及创建Pod的流程。如果你是运维人员或者是Kubernetes的使用者,你可以不需要知道Kubernetes的内部工作原理,但是如果你想理解Kubernetes内部的...

Docker
04/25
0
0
伸缩Kubernetes到2500个节点中遇到的问题和解决方法

Kubernetes自从1.6起便号称可以承载5000个以上的节点,但是从数十到5000的路上,难免会遇到问题。 本片文章即分享Open API在kubernetes 5000之路上的经验,包括遇到的问题、尝试解决问题以及...

好雨云帮
04/24
0
0
容器化RDS:PersistentLocalVolumes和VolumeScheduling

容器化RDS系列文章: 容器化RDS:计算存储分离架构下的“Split-Brain” 容器化RDS:计算存储分离还是本地存储? 容器化RDS:你需要了解数据是如何被写"坏"的 数据库的高可用方案非常依赖底层...

Docker
04/28
0
0
Kubernetes系统架构简介[转]

1. 前言 Together we will ensure that Kubernetes is a strong and open container management framework for any application and in any environment, whether in a private, public or ......

长征2号
2017/07/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

java并发备忘

不安全的“先检查后执行”,代码形式如下: if(条件满足){ //这里容易出现线程安全问题//doSomething}else{//doOther} 读取-修改-写入 原子操作:使用CAS技术,即首先从V中读取...

Funcy1122
今天
0
0
SpringBoot2.0 停机

最近新建了个SpringBoot2.0的项目,因为原来一直使用的是传统的Tomcat部署war包的形式,所以这次SpringBoot内置Tomcat部署jar包的时候遇到了很多问题。其中一个就是因为没有外置的Tomcat容器...

Canaan_
昨天
0
1
Confluence 6 外部参考

一个外部参考的意思是任何站点链接到你 Confluence 的实例。任何时候当 Confluence 的用户单击这个外部链接的时候,Confluence 可以记录这次单击为参考。 在默认的情况下,外部链接的参考链接...

honeymose
昨天
0
0
Android中的设计模式之抽象工厂模式

参考 《设计模式解析》 第十一章 Abstract Factory模式 《设计模式:可复用面向对象软件的基础 》3.1 Abstract Factory 抽象工厂 对象创建型模式 《Android源码设计模式解析与实战》第6章 创...

newtrek
昨天
0
0
Redis | 地理空间(GEO)的一个坑

Redis的地理空间(Geo)是个好东西,轻轻松松的就可以把地图描点的问题处理了, 最近却遇到一个坑...Redis采用的Msater-Slave模式, 运用GEORADIUS在salve读取对应的数据,新增了从节点但是从不返...

云迹
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部