文档章节

Kubernetes Scheduler源码分析

WaltonWang
 WaltonWang
发布于 2017/01/17 20:06
字数 3023
阅读 906
收藏 4

本文是对Kubernetes 1.5的Scheduler源码层面的剖析,包括对应的源码目录结构分析、kube-scheduler运行机制分析、整体代码流程图、核心代码走读分析等内容。阅读本文前,请先了解kubernetes scheduler原理解析

Kubernetes源码目录结构分析

Kubernetes Scheduler是作为kubernetes的一个plugin来设计的,这种可插拔的设计极大方便用户自定义调度算法,在不同的公司,通常大家对调度的需求是不同的,自定义调度是很常见的。

Scheduler的源码主要在k8s.io/kubernetes/plugin/目录下,其中两个目录cmd/scheduler和pkg/scheduler分别定义了kube-scheduler command的参数封装和app启动运行和scheduler的具体内部实现。具体的目录结构分析如下所示。

k8s.io/kubernetes/plugin/
.
├── cmd
│   └── kube-scheduler          // kube-scheduler command的相关代码
│       ├── app                 // kube-scheduler app的启动
│       │   ├── options         
│       │   │   └── options.go  // 封装SchedulerServer对象和AddFlags方法
│       │   └── server.go       // 定义SchedulerServer的config封装和Run方法
│       └── scheduler.go        // kube-scheduler main方法入口
└── pkg
    ├── scheduler               // scheduler后端核心代码
    │   ├── algorithm
    │   │   ├── doc.go
    │   │   ├── listers.go      // 定义NodeLister和PodLister等Interface
    │   │   ├── predicates      // 定义kubernetes自带的Predicates Policies的Function实现
    │   │   │   ├── error.go
    │   │   │   ├── metadata.go
    │   │   │   ├── predicates.go   // 自带Predicates Policies的主要实现
    │   │   │   ├── predicates_test.go
    │   │   │   ├── utils.go
    │   │   │   └── utils_test.go
    │   │   ├── priorities      // 定义kubernetes自带的Priorities Policies的Function实现
    │   │   │   ├── balanced_resource_allocation.go    // defaultProvider - BalancedResourceAllocation
    │   │   │   ├── balanced_resource_allocation_test.go
    │   │   │   ├── image_locality.go    // defaultProvider - ImageLocalityPriority
    │   │   │   ├── image_locality_test.go
    │   │   │   ├── interpod_affinity.go   // defaultProvider - InterPodAffinityPriority
    │   │   │   ├── interpod_affinity_test.go
    │   │   │   ├── least_requested.go  // defaultProvider - LeastRequestedPriority
    │   │   │   ├── least_requested_test.go 
    │   │   │   ├── metadata.go         // priorityMetadata定义
    │   │   │   ├── most_requested.go   // defaultProvider - MostRequestedPriority
    │   │   │   ├── most_requested_test.go
    │   │   │   ├── node_affinity.go    // defaultProvider - NodeAffinityPriority
    │   │   │   ├── node_affinity_test.go
    │   │   │   ├── node_label.go       // 当policy.Argument.LabelPreference != nil时,会注册该Policy
    │   │   │   ├── node_label_test.go
    │   │   │   ├── node_prefer_avoid_pods.go  // defaultProvider - NodePreferAvoidPodsPriority 
    │   │   │   ├── node_prefer_avoid_pods_test.go
    │   │   │   ├── selector_spreading.go     // defaultProvider - SelectorSpreadPriority
    │   │   │   ├── selector_spreading_test.go
    │   │   │   ├── taint_toleration.go      // defaultProvider - TaintTolerationPriority
    │   │   │   ├── taint_toleration_test.go
    │   │   │   ├── test_util.go
    │   │   │   └── util                // 工具类
    │   │   │       ├── non_zero.go
    │   │   │       ├── topologies.go
    │   │   │       └── util.go
    │   │   ├── scheduler_interface.go    // 定义SchedulerExtender和ScheduleAlgorithm Interface
    │   │   ├── scheduler_interface_test.go
    │   │   └── types.go               // 定义了Predicates和Priorities Algorithm要实现的方法类型(FitPredicate, PriorityMapFunction)
    │   ├── algorithmprovider          // algorithm-provider参数配置的项
    │   │   ├── defaults    
    │   │   │   ├── compatibility_test.go
    │   │   │   └── defaults.go         // "DefaultProvider"的实现
    │   │   ├── plugins.go            // 空,预留自定义
    │   │   └── plugins_test.go
    │   ├── api                       // 定义Scheduelr API接口和对象,用于SchedulerExtender处理来自HTTPExtender的请求。
    │   │   ├── latest
    │   │   │   └── latest.go
    │   │   ├── register.go
    │   │   ├── types.go              // 定义Policy, PredicatePolicy,PriorityPolicy等
    │   │   ├── v1
    │   │   │   ├── register.go
    │   │   │   └── types.go
    │   │   └── validation
    │   │       ├── validation.go    // 验证Policy的定义是否合法
    │   │       └── validation_test.go
    │   ├── equivalence_cache.go    // 
    │   ├── extender.go               // 定义HTTPExtender的新建以及对应的Filter和Prioritize方法来干预预选和优选
    │   ├── extender_test.go
    │   ├── factory                    // 根据配置的Policies注册和匹配到对应的预选(FitPredicateFactory)和优选(PriorityFunctionFactory2)函数
    │   │   ├── factory.go             // 核心是定义ConfigFactory来工具配置完成scheduler的封装函数,最关键的CreateFromConfig和CreateFromKeys
    │   │   ├── factory_test.go
    │   │   ├── plugins.go             // 核心是定义注册自定义预选和优选Policy的方法
    │   │   └── plugins_test.go
    │   ├── generic_scheduler.go        // 定义genericScheduler,其Schedule(...)方法作为调度执行的真正开始的地方
    │   ├── generic_scheduler_test.go
    │   ├── metrics                    // 支持注册metrics到Prometheus
    │   │   └── metrics.go
    │   ├── scheduler.go                // 定义Scheduler及Run(),核心的scheduleOne()方法也在此,scheduleOne()一个完成的调度流程,包括或许待调度Pod、调度、Bind等
    │   ├── scheduler_test.go
    │   ├── schedulercache       
    │   │   ├── cache.go               // 定义schedulerCache对Pod,Node,以及Bind的CURD,以及超时维护等工作
    │   │   ├── cache_test.go
    │   │   ├── interface.go           // schedulerCache要实现的Interface
    │   │   ├── node_info.go          // 定义NodeInfo及其相关Opertation
    │   │   └── util.go
    │   └── testing
    │       ├── fake_cache.go
    │       └── pods_to_cache.go

Kube-scheduler运行机制分析

  1. kube-scheduler作为kubernetes master上一个单独的进程提供调度服务,通过--master指定kube-api-server的地址,用来watch pod和node和调用api server bind接口完成node和pod的Bind操作。

  2. kube-scheduler中维护了一个FIFO类型的PodQueue cache,新创建的Pod都会被ConfigFactory watch到,被添加到该PodQueue中,每次调度都从该PodQueue中getNextPod作为即将调度的Pod。

  3. 获取到待调度的Pod后,就执行AlgorithmProvider配置Algorithm的Schedule方法进行调度,整个调度过程分两个关键步骤:Predicates和Priorities,最终选出一个最适合该Pod借宿的Node返回。

  4. 更新SchedulerCache中Pod的状态(AssumePod),标志该Pod为scheduled,并更新到最有NodeInfo中。

  5. 调用api server的Bind接口,完成node和pod的Bind操作,如果Bind失败,从SchedulerCache中删除上一步中已经Assumed的Pod。

Kubernetes Scheduler代码流程图

由于图片布局较大,请下载到本地放大查看。 这里写图片描述

Kubernetes Scheduler核心代码走读分析

Scheduler的main入口如下,负责创建SchedulerServer和启动。

plugin/cmd/kube-scheduler/scheduler.go

func main() {
	s := options.NewSchedulerServer()
	s.AddFlags(pflag.CommandLine)

	flag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	verflag.PrintAndExitIfRequested()

	if err := app.Run(s); err != nil {
		glog.Fatalf("scheduler app failed to run: %v", err)
	}
}

kuber-scheduler的参数说明在options中定义如下:

plugin/cmd/kube-scheduler/app/options/options.go

// AddFlags adds flags for a specific SchedulerServer to the specified FlagSet
func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
	fs.Int32Var(&s.Port, "port", s.Port, "The port that the scheduler's http service runs on")
	fs.StringVar(&s.Address, "address", s.Address, "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
	fs.StringVar(&s.AlgorithmProvider, "algorithm-provider", s.AlgorithmProvider, "The scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders())
	fs.StringVar(&s.PolicyConfigFile, "policy-config-file", s.PolicyConfigFile, "File with scheduler policy configuration")
	fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
	fs.BoolVar(&s.EnableContentionProfiling, "contention-profiling", false, "Enable lock contention profiling, if profiling is enabled")
	fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
	fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
	fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "Content type of requests sent to apiserver.")
	fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
	fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
	fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
	fs.IntVar(&s.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", api.DefaultHardPodAffinitySymmetricWeight,
		"RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+
			"to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.")
	fs.StringVar(&s.FailureDomains, "failure-domains", api.DefaultFailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.")
	leaderelection.BindFlags(&s.LeaderElection, fs)
	config.DefaultFeatureGate.AddFlag(fs)
}

server.Run方法是cmd/kube-scheduler中最重要的方法:

  • 负责config的生成。
  • 并根据config创建sheduler对象。
  • 启动HTTP服务,提供/debug/pprof http接口方便进行性能数据收集调优,提供/metrics http接口以供prometheus收集监控数据。
  • kube-scheduler自选举完成后立刻开始循环执行scheduler.Run进行调度。
plugin/cmd/kube-scheduler/app/server.go:75

// Run runs the specified SchedulerServer.  This should never exit.
func Run(s *options.SchedulerServer) error {
	...
	config, err := createConfig(s, kubecli)
	...
	sched := scheduler.New(config)

	go startHTTP(s)

	run := func(_ <-chan struct{}) {
		sched.Run()
		select {}
	}

	...
	leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
		Lock:          rl,
		LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
		RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
		RetryPeriod:   s.LeaderElection.RetryPeriod.Duration,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: run,
			OnStoppedLeading: func() {
				glog.Fatalf("lost master")
			},
		},
	})
	...
}

开始进入Scheduler.Run的逻辑,启动goroutine,循环反复执行Scheduler.scheduleOne方法,直到收到shut down scheduler的信号。

Scheduler.scheduleOne开始真正的调度逻辑,每次负责一个Pod的调度:

  • 从PodQueue中获取一个Pod。
  • 执行对应Algorithm的Schedule,进行预选和优选。
  • AssumePod
  • Bind Pod, 如果Bind Failed,ForgetPod。
plugin/pkg/scheduler/scheduler.go:86

// Run begins watching and scheduling. It starts a goroutine and returns immediately.
func (s *Scheduler) Run() {
	go wait.Until(s.scheduleOne, 0, s.config.StopEverything)
}

func (s *Scheduler) scheduleOne() {
	pod := s.config.NextPod()
	...
	dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister)
	...
	assumed := *pod
	assumed.Spec.NodeName = dest
	if err := s.config.SchedulerCache.AssumePod(&assumed); err != nil {
		...
		return
	}

	go func() {
		...

		b := &v1.Binding{
			ObjectMeta: v1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},
			Target: v1.ObjectReference{
				Kind: "Node",
				Name: dest,
			},
		}

		...
		err := s.config.Binder.Bind(b)
		if err != nil {
			glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name)
			if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil {
				...
			return
		}
	
	}()
}

下面是Schedule Algorithm要实现的Schedule接口:

plugin/pkg/scheduler/algorithm/scheduler_interface.go:41

// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods onto machines.
type ScheduleAlgorithm interface {
	Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
}

genericScheduler作为一个默认Scheduler,当然也必须实现上述接口:

plugin/pkg/scheduler/generic_scheduler.go:89

func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {

	// 从cache中获取可被调度的Nodes
	...
	nodes, err := nodeLister.List()
	...

	// 开始预选
	trace.Step("Computing predicates")
	filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
	
	...

	// 开始优选打分
	trace.Step("Prioritizing")
	metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
	priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
	...

	// 如果优选出多个Node,则随机选择一个Node作为最佳Node返回
	trace.Step("Selecting host")
	return g.selectHost(priorityList)
}


// findNodesThatFit是预选的入口
func findNodesThatFit(
	pod *v1.Pod,
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
	nodes []*v1.Node,
	predicateFuncs map[string]algorithm.FitPredicate,
	extenders []algorithm.SchedulerExtender,
	metadataProducer algorithm.MetadataProducer,
) ([]*v1.Node, FailedPredicateMap, error) {
	var filtered []*v1.Node
	failedPredicateMap := FailedPredicateMap{}

	if len(predicateFuncs) == 0 {
		filtered = nodes
	} else {
		...
		// checkNode会调用podFitsOnNode完成配置的所有Predicates Policies对该Node的检查。
		checkNode := func(i int) {
			nodeName := nodes[i].Name
			fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
			...
		}
		
		// 根据nodes数量,启动最多16个个goroutine worker执行checkNode方法
		workqueue.Parallelize(16, len(nodes), checkNode)
		filtered = filtered[:filteredLen]
		if len(errs) > 0 {
			return []*v1.Node{}, FailedPredicateMap{}, errors.NewAggregate(errs)
		}
	}

	// 如果配置了Extender,则执行Extender的Filter逻辑再次进行甩选。
	if len(filtered) > 0 && len(extenders) != 0 {
		for _, extender := range extenders {
			filteredList, failedMap, err := extender.Filter(pod, filtered)
			...
		}
	}
	return filtered, failedPredicateMap, nil
}

// 循环执行所有配置的Predicates Polic对应的predicateFunc。
func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) {
	var failedPredicates []algorithm.PredicateFailureReason
	for _, predicate := range predicateFuncs {
		fit, reasons, err := predicate(pod, meta, info)
		...
	}
	return len(failedPredicates) == 0, failedPredicates, nil
}


// 根据所有配置到Priorities Policies对所有预选后的Nodes进行优选打分
// 每个Priorities policy对每个node打分范围为0-10分,分越高表示越合适
func PrioritizeNodes(
	pod *v1.Pod,
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
	meta interface{},
	priorityConfigs []algorithm.PriorityConfig,
	nodes []*v1.Node,
	extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
	
	...
	// 对单个node遍历所有的Priorities Policies,得到每个node每个policy打分的二维数据数据
	processNode := func(index int) {
		nodeInfo := nodeNameToInfo[nodes[index].Name]
		var err error
		for i := range priorityConfigs {
			if priorityConfigs[i].Function != nil {
				continue
			}
			results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
			if err != nil {
				appendError(err)
				return
			}
		}
	}
	
	// 根据nodes数量,启动最多16个goroutine worker执行processNode方法
	workqueue.Parallelize(16, len(nodes), processNode)
	
	// 遍历所有配置的Priorities policies,如果某个policy配置了Reduce,则执行对应的Reduce,更新result[node][policy]得分
	for i, priorityConfig := range priorityConfigs {
		if priorityConfig.Reduce == nil {
			continue
		}
		wg.Add(1)
		go func(index int, config algorithm.PriorityConfig) {
			defer wg.Done()
			if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
				appendError(err)
			}
		}(i, priorityConfig)
	}
	
	// Wait for all computations to be finished.
	wg.Wait()
	...

	// 对得分进行加权求和得到最终分数
	result := make(schedulerapi.HostPriorityList, 0, len(nodes))
	// TODO: Consider parallelizing it.
	for i := range nodes {
		result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
		for j := range priorityConfigs {
			result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
		}
	}

	// 如果配置了Extender,则再执行Extender的优选打分方法Extender.Prioritize
	if len(extenders) != 0 && nodes != nil {
		combinedScores := make(map[string]int, len(nodeNameToInfo))
		for _, extender := range extenders {
			wg.Add(1)
			go func(ext algorithm.SchedulerExtender) {
				defer wg.Done()
				prioritizedList, weight, err := ext.Prioritize(pod, nodes)
				...
			}(extender)
		}
		
		
		// wait for all go routines to finish
		wg.Wait()
		
		// 执行combinedScores,将非Extender优选后的node得分再次经过Extender的优选打分排序
		for i := range result {
			result[i].Score += combinedScores[result[i].Host]
		}
	}

	...
}

具体的Predicate Policy对应的PredicateFunc都定义在plugin/pkg/scheduler/algorithm/predicates/predicates.go中,下面是CheckNodeMemoryPressurePredicate的定义。

plugin/pkg/scheduler/algorithm/predicates/predicates.go:1202

// CheckNodeMemoryPressurePredicate checks if a pod can be scheduled on a node
// reporting memory pressure condition.
func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
	var podBestEffort bool
	if predicateMeta, ok := meta.(*predicateMetadata); ok {
		podBestEffort = predicateMeta.podBestEffort
	} else {
		// We couldn't parse metadata - fallback to computing it.
		podBestEffort = isPodBestEffort(pod)
	}
	// pod is not BestEffort pod
	if !podBestEffort {
		return true, nil, nil
	}

	// is node under presure?
	if nodeInfo.MemoryPressureCondition() == v1.ConditionTrue {
		return false, []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil
	}
	return true, nil, nil
}

具体的Priorities Policy对应的PriorityFunc都定义在plugin/pkg/scheduler/algorithm/priorities/*.go中,下面是MostRequestedPriority的定义。

plugin/pkg/scheduler/algorithm/priorities/most_requested.go:33

// MostRequestedPriority is a priority function that favors nodes with most requested resources.
// It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes
// based on the maximum of the average of the fraction of requested to capacity.
// Details: (cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2
func MostRequestedPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
	var nonZeroRequest *schedulercache.Resource
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		nonZeroRequest = priorityMeta.nonZeroRequest
	} else {
		// We couldn't parse metadatat - fallback to computing it.
		nonZeroRequest = getNonZeroRequests(pod)
	}
	return calculateUsedPriority(pod, nonZeroRequest, nodeInfo)
}

kubernetes默认给kube-scheduler配置了DefaultProvider。DefaultProvider配置了哪些Predicates和Priorities Policies呢?这些都定义在plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go中,如下所示:

plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go:205

// DefaultProvider配置的默认Predicates Policies
func defaultPredicates() sets.String {
	return sets.NewString(
		// Fit is determined by volume zone requirements.
		factory.RegisterFitPredicateFactory(
			"NoVolumeZoneConflict",
			func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
				return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo)
			},
		),
		...
		// Fit is determined by non-conflicting disk volumes.
		factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict),

		// GeneralPredicates are the predicates that are enforced by all Kubernetes components
		// (e.g. kubelet and all schedulers)
		factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates),

		// Fit is determined based on whether a pod can tolerate all of the node's taints
		factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints),

		// Fit is determined by node memory pressure condition.
		factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate),

		// Fit is determined by node disk pressure condition.
		factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
	)
}

// DefaultProvider配置的默认Priorities Policies
func defaultPriorities() sets.String {
	return sets.NewString(
		// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
		factory.RegisterPriorityConfigFactory(
			"SelectorSpreadPriority",
			factory.PriorityConfigFactory{
				Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
					return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister)
				},
				Weight: 1,
			},
		),
		...

		// TODO: explain what it does.
		factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
	)
}

上面核心代码的走读分析,请结合上一节Kubernetes Scheduler代码流程图进行阅读。相信读到这里,你对整个scheduler的代码已经有一定的理解了。

总结

  • kube-scheduler作为kubernetes master上一个单独的进程提供调度服务,通过–master指定kube-api-server的地址,用来watch pod和node和调用api server bind接口完成node和pod的Bind操作。

  • kube-scheduler中维护了一个FIFO类型的PodQueue cache,新创建的Pod都会被ConfigFactory watch到,被添加到该PodQueue中,每次调度都从该PodQueue中getNextPod作为即将调度的Pod。

  • 获取到待调度的Pod后,就执行AlgorithmProvider配置Algorithm的Schedule方法进行调度,整个调度过程分两个关键步骤:Predicates和Priorities,最终选出一个最适合该Pod借宿的Node返回。

  • 更新SchedulerCache中Pod的状态(AssumePod),标志该Pod为scheduled,并更新到最有NodeInfo中。

  • 调用api server的Bind接口,完成node和pod的Bind操作,如果Bind失败,从SchedulerCache中删除上一步中已经Assumed的Pod。

© 著作权归作者所有

共有 人打赏支持
WaltonWang
粉丝 199
博文 101
码字总数 212210
作品 0
深圳
程序员
私信 提问
加载中

评论(3)

heymybuddy
heymybuddy
WaltonWang
WaltonWang

引用来自“heymybuddy”的评论

分析的很棒,很赞!有个问题想问您一下,您的这个调度器的函数执行流程图是通过调试k8s调度器部分的源码,使用glog打印输出得出来的结论吗?我也在研究调度器源码,但是使用
/usr/bin/kube-scheduler --master=http://localhost:8080 --v=4
查看源码调试的信息,发现不能在标准输出上面打印在源码中添加的glog调试信息,您有遇到这种问题吗?谢谢答复哈~
不是看日志的,直接分析代码流程慢慢画出来的。
heymybuddy
heymybuddy
分析的很棒,很赞!有个问题想问您一下,您的这个调度器的函数执行流程图是通过调试k8s调度器部分的源码,使用glog打印输出得出来的结论吗?我也在研究调度器源码,但是使用
/usr/bin/kube-scheduler --master=http://localhost:8080 --v=4
查看源码调试的信息,发现不能在标准输出上面打印在源码中添加的glog调试信息,您有遇到这种问题吗?谢谢答复哈~
Kubernetes之scheduler模块源码分析

传送门 哈哈,隔了太长时间,网上已经有对应的分析,而且我看了以后觉得写的还真的挺好的,基本想要写的他都写的。 Kubernetes Scheduler原理解析 Kubernetes Scheduler源码分析 如何对kuber...

weixin_38975685
2017/09/29
0
0
Kubernetes 1.8 kube-scheduler的源码分析

很长时间没有写文章,一直在啃kubernetes文档,本来立志一定要读完所有的文档。还有它的最佳实践openshift的文档。但目前为止,我并没有读完kubernetes的文档。当前,我们有需求需要客制化k...

店家小二
2018/12/14
0
0
Kubernetes权威指南精彩语录

上述代码的风格和逻辑再也熟悉不过了:创建一个SchedulerServer对象,将命令行参数传入,并且进入SchedulerServer的Run方法,不死不休。——《Kubernetes权威指南》 源码导读 Kubernetes Sch...

mycat
2015/07/30
0
0
解析Kubernetes 1.8中的基于Pod优先级的抢占式调度

Author: xidianwangtao@gmail.com Kubernetes 1.8中对scheduler的更新 【Alpha】支持定义PriorityClass,并指定给Pod来定义Pod Priority; 【Alpha】支持基于Pod Priority的抢占式调度; 【A...

WaltonWang
2017/11/02
0
2
Kubernetes Local Persistent Volume源码分析

Author: xidianwangtao@gmail.com 摘要:上一篇博客”深度解析Kubernetes Local Persistent Volume“对local volume的基本原理和注意事项进行了分析,本文将进行源码分析,涉及scheduler、p...

WaltonWang
2018/09/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

聊聊flink的Table API及SQL Programs

序 本文主要研究一下flink的Table API及SQL Programs 实例 // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironmentStreamExecutionEnvironment env = Stre......

go4it
22分钟前
0
0
mysqldump应用

备份单个库/表数据或库/表结构 命令行下具体用法如下: mysqldump -u用戶名 -p密码 -d 数据库名 表名 > 备份文件名 1、导出数据库为dbname的表结构(其中用戶名為root,密码为dbpasswd,生成的...

阿dai
30分钟前
0
0
shell脚本与Python的交互

1、Python针对shell获取传入,输出参数 传入:"$num" 例如: $0表示文件名,$1表示shell获取的第一个参数 输出:通过打印shell结果的方式,输出参数给Python。 例如: echo "{$iplist}",Python调...

一口今心
32分钟前
0
0
Euler 今日问世!国内首个工业级的图深度学习开源框架,阿里妈妈造

阿里妹导读:千呼万唤始出来!阿里妈妈正式公布重磅开源项目——图深度学习框架Euler。这是国内首个在核心业务大规模应用后开源的图深度学习框架。此次开源,Euler内置了大量的算法供用户直接...

阿里云官方博客
39分钟前
0
0
TiDB 3.0 Beta Release Notes

2019 年 1 月 19 日,TiDB 发布 3.0 Beta 版,对应 master branch 的 TiDB-Ansible。相比 2.1 版本,该版本对系统稳定性、优化器、统计信息以及执行引擎做了很多改进。 TiDB 新特性 支持 Vi...

TiDB
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部