文档章节

Kubernetes ReplicationController源码分析

WaltonWang
 WaltonWang
发布于 2017/03/16 18:42
字数 3810
阅读 476
收藏 5

更多关于kubernetes的深入文章,请看我csdn或者oschina的博客主页。

虽然在Kubernetes v1.2中,Kubernetes推出了Deployments特性,Deployment通过创建ReplicaSet来管理Pod,ReplicaSet被视为下一代ReplicationController。但实际上ReplicaSet和ReplicationController区别仅仅是其Selector支持的类型不同。

  • ReplicaSet既支持equality-based selector requirements,也支持set-based selector requirements。
  • ReplicationController只支持equality-based selector requirements。

当然Deployments的还是很有用的,可以支持用户的滚动部署的需求。对Deployments的分析我会在后面单独搞一篇博文。在本文,我们只看ReplicationController。

需要澄清一点,这里我们说的ReplicationController是RC控制器,而不是RC Resource。

本文基于kubernetes v1.5的代码进行分析。

ReplicationManager

ReplicationManager就是ReplicationController控制器对象,方便在代码中和ReplicationController Resource API Object进行区分。下面代码是ReplicationManager的结构定义。

pkg/controller/replication/replication_controller.go:75

// ReplicationManager is responsible for synchronizing ReplicationController objects stored in the system with actual running pods.
type ReplicationManager struct {
	kubeClient clientset.Interface
	podControl controller.PodControlInterface

	// internalPodInformer is used to hold a personal informer.  If we're using
	// a normal shared informer, then the informer will be started for us.  If
	// we have a personal informer, we must start it ourselves.   If you start
	// the controller using NewReplicationManager(passing SharedInformer), this
	// will be null
	internalPodInformer cache.SharedIndexInformer

	// An rc is temporarily suspended after creating/deleting these many replicas.
	// It resumes normal action after observing the watch events for them.
	burstReplicas int
	// To allow injection of syncReplicationController for testing.
	syncHandler func(rcKey string) error

	// A TTLCache of pod creates/deletes each rc expects to see.
	expectations *controller.UIDTrackingControllerExpectations

	// A store of replication controllers, populated by the rcController
	rcStore cache.StoreToReplicationControllerLister
	// Watches changes to all replication controllers
	rcController *cache.Controller
	// A store of pods, populated by the podController
	podStore cache.StoreToPodLister
	// Watches changes to all pods
	podController cache.ControllerInterface
	// podStoreSynced returns true if the pod store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	podStoreSynced func() bool

	lookupCache *controller.MatchingCache

	// Controllers that need to be synced
	queue workqueue.RateLimitingInterface

	// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
	// manager behaves differently if GC is enabled.
	garbageCollectorEnabled bool
}

重点对下面个几个对象介绍说明:

  • podControl: 提供Create/Delete Pod的操作接口。
  • burstReplicas: 每次批量Create/Delete Pods时允许并发的最大数量。
  • syncHandler: 真正执行Replica Sync的函数。
  • expectation: 维护的期望状态下的Pod的Uid Cache,并且提供了修正该Cache的接口。
  • rcStore: ReplicationController Resource对象的Indexer,数据由rcController提供和维护。
  • rcController: 用来watch 所有 ReplicationController Resource,watch到的change更新到rcStore中。
  • podStore: Pod的Indexer,数据由podController提供和维护。
  • podController: 用来watch所有Pod Resource,watch到的change更新到podStore中。
  • queue: 用来存放待sync的RC,是一个RateLimit类型的queue。
  • lookupCache: 提供Pod和RC匹配信息的cache,以提高查询效率。

ReplicationController在何处启动的

看过我我的博文: Kubernetes ResourceQuota Controller内部实现原理及源码分析的可能有印象,里面也提到了controller manager是如何启动ResourceQuotaController的,ReplicationController也是一样的。在kube-controller-manager调用newControllerInitializers进行控制器初始化的时候,将startReplicationController注册进去了,用来启动ReplicationController控制器。

cmd/kube-controller-manager/app/controllermanager.go:224

func newControllerInitializers() map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefuleset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["certificatesigningrequests"] = startCSRController

	return controllers
}

代码继续跟到startReplicationController,很简单,启动一个goroutine,调用replicationcontroller.NewReplicationManager创建一个ReplicationManager并执行其中Run方法开始工作。

cmd/kube-controller-manager/app/core.go:55

func startReplicationController(ctx ControllerContext) (bool, error) {
	go replicationcontroller.NewReplicationManager(
		ctx.InformerFactory.Pods().Informer(),
		ctx.ClientBuilder.ClientOrDie("replication-controller"),
		ResyncPeriod(&ctx.Options),
		replicationcontroller.BurstReplicas,
		int(ctx.Options.LookupCacheSizeForRC),
		ctx.Options.EnableGarbageCollector,
	).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
	return true, nil
}

创建ReplicationManager

上面分析到,controller-manager通过NewReplicationManager创建一个ReplicationManager对象,其实就是ReplicationController控制器。

pkg/controller/replication/replication_controller.go:122

// NewReplicationManager creates a replication manager
func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
	return newReplicationManager(
		eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}),
		podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}



pkg/controller/replication/replication_controller.go:132
// newReplicationManager configures a replication manager with the specified event recorder
func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
	if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
		metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter())
	}

	rm := &ReplicationManager{
		kubeClient: kubeClient,
		podControl: controller.RealPodControl{
			KubeClient: kubeClient,
			Recorder:   eventRecorder,
		},
		burstReplicas: burstReplicas,
		expectations:  controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"),
		garbageCollectorEnabled: garbageCollectorEnabled,
	}

	rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer(
		&cache.ListWatch{
			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
				return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options)
			},
			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
				return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)
			},
		},
		&v1.ReplicationController{},
		// TODO: Can we have much longer period here?
		FullControllerResyncPeriod,
		cache.ResourceEventHandlerFuncs{
			AddFunc:    rm.enqueueController,
			UpdateFunc: rm.updateRC,
			// This will enter the sync loop and no-op, because the controller has been deleted from the store.
			// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
			// way of achieving this is by performing a `stop` operation on the controller.
			DeleteFunc: rm.enqueueController,
		},
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
	)

	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: rm.addPod,
		// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
		// the most frequent pod update is status, and the associated rc will only list from local storage, so
		// it should be ok.
		UpdateFunc: rm.updatePod,
		DeleteFunc: rm.deletePod,
	})
	rm.podStore.Indexer = podInformer.GetIndexer()
	rm.podController = podInformer.GetController()

	rm.syncHandler = rm.syncReplicationController
	rm.podStoreSynced = rm.podController.HasSynced
	rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
	return rm
}

newReplicationManager中主要配置ReplicationManager,比如:

  • 通过workqueue.NewNamedRateLimitingQueue配置queue。
  • 通过controller.NewUIDTrackingControllerExpectations配置expectations。
  • 配置rcStore, podStore, rcController, podController。
  • 配置syncHandler为rm.syncReplicationController,这个很重要,所以我单独列出来说。在后面会讲到,syncReplicationController就是做核心工作的的方法,可以说Replica的自动维护都是由它来完成的。

执行ReplicationManger.Run开始工作

ReplicationManager创建好了,接下来得干活啦。Run方法就是干活的起步点,开始进行watching and syncing

pkg/controller/replication/replication_controller.go:217

// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	glog.Infof("Starting RC Manager")
	go rm.rcController.Run(stopCh)
	go rm.podController.Run(stopCh)
	for i := 0; i < workers; i++ {
		go wait.Until(rm.worker, time.Second, stopCh)
	}

	if rm.internalPodInformer != nil {
		go rm.internalPodInformer.Run(stopCh)
	}

	<-stopCh
	glog.Infof("Shutting down RC Manager")
	rm.queue.ShutDown()
}
  • watching
    • go rm.rcController.Run(stopCh)负责watch all rc。
    • go rm.podController.Run(stopCh)负责watch all pod。
  • syncing
    • 启动workers数量的goroutine。
    • 每个goroutine都不断循环执行rm.worker,每个循环之间停留1s。而rm.worker就是负责从queue中获取rc并调用syncHandler进行同步。
    • 每个goroutine直到收到stopCh信号才结束。

下面是rcController和podController的Run方法实现,功能就是完成rc / pod的watch。

pkg/client/cache/controller.go:84

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	r.RunUntil(stopCh)

	wait.Until(c.processLoop, time.Second, stopCh)
}

sync的关键实现,就在ReplicationManager的worker方法中,代码如下。

pkg/controller/replication/replication_controller.go:488

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
	workFunc := func() bool {
		key, quit := rm.queue.Get()
		if quit {
			return true
		}
		defer rm.queue.Done(key)

		err := rm.syncHandler(key.(string))
		if err == nil {
			rm.queue.Forget(key)
			return false
		}

		rm.queue.AddRateLimited(key)
		utilruntime.HandleError(err)
		return false
	}
	for {
		if quit := workFunc(); quit {
			glog.Infof("replication controller worker shutting down")
			return
		}
	}
}

worker中的主要逻辑为:

  • 从rm的RateLimited Queue中获取一个rc的key。
  • 调用syncHandler Interface,对该rc进行sync。

在newReplicationManager时,通过rm.syncHandler = rm.syncReplicationController注册syncHandler为syncReplicationController了。因此sync rc的逻辑就在syncReplicationController中了。

pkg/controller/replication/replication_controller.go:639

// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked concurrently with the same key.

func (rm *ReplicationManager) syncReplicationController(key string) error {
	trace := util.NewTrace("syncReplicationController: " + key)
	defer trace.LogIfLong(250 * time.Millisecond)

	startTime := time.Now()
	defer func() {
		glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
	}()

	if !rm.podStoreSynced() {
		// Sleep so we give the pod reflector goroutine a chance to run.
		time.Sleep(PodStoreSyncedPollPeriod)
		glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
		rm.queue.Add(key)
		return nil
	}

	obj, exists, err := rm.rcStore.Indexer.GetByKey(key)
	if !exists {
		glog.Infof("Replication Controller has been deleted %v", key)
		rm.expectations.DeleteExpectations(key)
		return nil
	}
	if err != nil {
		return err
	}
	rc := *obj.(*v1.ReplicationController)

	trace.Step("ReplicationController restored")
	rcNeedsSync := rm.expectations.SatisfiedExpectations(key)
	trace.Step("Expectations restored")

	// NOTE: filteredPods are pointing to objects from cache - if you need to
	// modify them, you need to copy it first.
	// TODO: Do the List and Filter in a single pass, or use an index.
	var filteredPods []*v1.Pod
	if rm.garbageCollectorEnabled {
		// list all pods to include the pods that don't match the rc's selector
		// anymore but has the stale controller ref.
		pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
		if err != nil {
			glog.Errorf("Error getting pods for rc %q: %v", key, err)
			rm.queue.Add(key)
			return err
		}
		cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
		matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
		// Adopt pods only if this replication controller is not going to be deleted.
		if rc.DeletionTimestamp == nil {
			for _, pod := range matchesNeedsController {
				err := cm.AdoptPod(pod)
				// continue to next pod if adoption fails.
				if err != nil {
					// If the pod no longer exists, don't even log the error.
					if !errors.IsNotFound(err) {
						utilruntime.HandleError(err)
					}
				} else {
					matchesAndControlled = append(matchesAndControlled, pod)
				}
			}
		}
		filteredPods = matchesAndControlled
		// remove the controllerRef for the pods that no longer have matching labels
		var errlist []error
		for _, pod := range controlledDoesNotMatch {
			err := cm.ReleasePod(pod)
			if err != nil {
				errlist = append(errlist, err)
			}
		}
		if len(errlist) != 0 {
			aggregate := utilerrors.NewAggregate(errlist)
			// push the RC into work queue again. We need to try to free the
			// pods again otherwise they will stuck with the stale
			// controllerRef.
			rm.queue.Add(key)
			return aggregate
		}
	} else {
		pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
		if err != nil {
			glog.Errorf("Error getting pods for rc %q: %v", key, err)
			rm.queue.Add(key)
			return err
		}
		filteredPods = controller.FilterActivePods(pods)
	}

	var manageReplicasErr error
	if rcNeedsSync && rc.DeletionTimestamp == nil {
		manageReplicasErr = rm.manageReplicas(filteredPods, &rc)
	}
	trace.Step("manageReplicas done")

	newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)

	// Always updates status as pods come up or die.
	if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil {
		// Multiple things could lead to this update failing.  Returning an error causes a requeue without forcing a hotloop
		return err
	}

	return manageReplicasErr
}

syncReplicationController的主要逻辑为:

  1. 如果podStore还没有被同步过一次,则将该rc的key重新加入到queue中,以等待podStore同步,流程结束,否则继续后面的流程。
  2. 根据该rc的key值,从rcStore中获取对应的rc object,如果不存在该rc object,则说明该rc已经被删除了,然后根据key从epectations中删除该rc并返回,流程结束。如果存在该rc object,则继续后面的流程。
  3. 检测expectations中的add和del以及距离上一个时间戳是否超时5min,来判断该rc是否需要sync。
  4. 如果启动了GC,则获取podStore中整个namespace下的pods,然后将matchesAndControlled和matchesNeedsController的pods作为过滤后待同步的filteredPods。如果没有启动GC,则直接获取podStore中该namespace下匹配rc.Spec.Selector的Active状态的pods作为过滤后待同步的filteredPods。(关于matchesAndControlled和matchesNeedsController的理解,请参考pkg/controller/controller_ref_manager.go:57中定义的PodControllerRefManager.Classify函数)
  5. 如果第3步中检测到该rc需要sync,并且DeletionTimestamp这个时间戳为nil,则调用manageReplicas方法,使得该rc管理的active状态的pods数量和期望值一样。
  6. 执行完manageReplicas后,需要马上重新计算一下rc的status,更新status中的Conditions,Replicas,FullyLabeledReplicas,ReadyReplicas,AvailableReplicas信息。
  7. 通过updateReplicationControllerStatus方法调用kube-api-server的接口更新该rc的status为上一步重新计算后的新status,流程结束。

上面描述的syncReplicationController流程中,一个很关键的步骤是step 5中调用的manageReplicas方法,它负责rc对应replicas的修复工作(add or delete)。

pkg/controller/replication/replication_controller.go:516

// manageReplicas checks and updates replicas for the given replication controller.
// Does NOT modify <filteredPods>.
func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error {
	diff := len(filteredPods) - int(*(rc.Spec.Replicas))
	rcKey, err := controller.KeyFunc(rc)
	if err != nil {
		return err
	}
	if diff == 0 {
		return nil
	}

	if diff < 0 {
		diff *= -1
		if diff > rm.burstReplicas {
			diff = rm.burstReplicas
		}
		// TODO: Track UIDs of creates just like deletes. The problem currently
		// is we'd need to wait on the result of a create to record the pod's
		// UID, which would require locking *across* the create, which will turn
		// into a performance bottleneck. We should generate a UID for the pod
		// beforehand and store it via ExpectCreations.
		errCh := make(chan error, diff)
		rm.expectations.ExpectCreations(rcKey, diff)
		var wg sync.WaitGroup
		wg.Add(diff)
		glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
		for i := 0; i < diff; i++ {
			go func() {
				defer wg.Done()
				var err error
				if rm.garbageCollectorEnabled {
					var trueVar = true
					controllerRef := &metav1.OwnerReference{
						APIVersion: getRCKind().GroupVersion().String(),
						Kind:       getRCKind().Kind,
						Name:       rc.Name,
						UID:        rc.UID,
						Controller: &trueVar,
					}
					err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
				} else {
					err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc)
				}
				if err != nil {
					// Decrement the expected number of creates because the informer won't observe this pod
					glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
					rm.expectations.CreationObserved(rcKey)
					errCh <- err
					utilruntime.HandleError(err)
				}
			}()
		}
		wg.Wait()

		select {
		case err := <-errCh:
			// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
			if err != nil {
				return err
			}
		default:
		}

		return nil
	}

	if diff > rm.burstReplicas {
		diff = rm.burstReplicas
	}
	glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
	// No need to sort pods if we are about to delete all of them
	if *(rc.Spec.Replicas) != 0 {
		// Sort the pods in the order such that not-ready < ready, unscheduled
		// < scheduled, and pending < running. This ensures that we delete pods
		// in the earlier stages whenever possible.
		sort.Sort(controller.ActivePods(filteredPods))
	}
	// Snapshot the UIDs (ns/name) of the pods we're expecting to see
	// deleted, so we know to record their expectations exactly once either
	// when we see it as an update of the deletion timestamp, or as a delete.
	// Note that if the labels on a pod/rc change in a way that the pod gets
	// orphaned, the rs will only wake up after the expectations have
	// expired even if other pods are deleted.
	deletedPodKeys := []string{}
	for i := 0; i < diff; i++ {
		deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
	}
	// We use pod namespace/name as a UID to wait for deletions, so if the
	// labels on a pod/rc change in a way that the pod gets orphaned, the
	// rc will only wake up after the expectation has expired.
	errCh := make(chan error, diff)
	rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
	var wg sync.WaitGroup
	wg.Add(diff)
	for i := 0; i < diff; i++ {
		go func(ix int) {
			defer wg.Done()
			if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
				// Decrement the expected number of deletes because the informer won't observe this deletion
				podKey := controller.PodKey(filteredPods[ix])
				glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
				rm.expectations.DeletionObserved(rcKey, podKey)
				errCh <- err
				utilruntime.HandleError(err)
			}
		}(i)
	}
	wg.Wait()

	select {
	case err := <-errCh:
		// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
		if err != nil {
			return err
		}
	default:
	}

	return nil

}

上面manageReplicas代码的主要逻辑为:

  • 首先计算filteredPods中Pods数量和rc.Spec.Replicas中定义的期望数量的差值diff。
  • 如果差值diff为0,表示当前状态和期望状态一样,直接返回,流程结束。
  • 如果差值diff为负数,表示当前Active状态的Pods数量不足,则启动下面流程:
    • 比较|diff|和burstReplicas的值,以保证这次最多只创建burstReplicas数量的pods。
    • 调用expectations.ExpectCreations接口设置expectations中的add大小为|diff|的值,表示要新创建|diff|数量的pods以达到期望状态。
    • sync.WaitGroup启动|diff|数量的goroutine协程,每个goroutine分别负责调用podControl.CreatePods接口创建一个该namespace.rc管理的对应spec Template的pod。
    • 待所有goroutine都执行完毕后,如果其中一个或者多个pod创建失败,则返回err,否则返回nil,流程结束。
  • 如果差值diff为正数,表示当前Active状态的Pods数量超过了期望值,则启动下面流程:
    • 比较|diff|和burstReplicas的值,以保证这次最多只删除burstReplicas数量的pods。
    • 对filteredPods中的pods进行排序,排序目的是:not-ready < ready, unscheduled < scheduled, and pending < running,让stages越早的pods优先被delete。
    • 排序完之后,挑选前面|diff|个pods作为待delete的Pods。
    • 调用expectations.ExpectDeletions接口设置expectations中的del大小为|diff|的值,表示要新删除|diff|数量的pods以达到期望状态。
    • sync.WaitGroup启动|diff|数量的goroutine协程,每个goroutine分别负责调用podControl.DeletePod接口删除待delete Pods中的一个Pod。
    • 待所有goroutine都执行完毕后,如果其中一个或者多个pod删除失败,则返回err,否则返回nil,流程结束。

至此,我认为关键的代码都已经分析完了,calculateStatus, updateReplicationControllerStatus方法比较简单,有兴趣的自己去瞄瞄。

总结

  • ReplicationManager是ReplicationController控制器的代码实现,以区分ReplicationController Resource Object。

  • 在kube-controller-manager调用newControllerInitializers进行控制器初始化的时候,将startReplicationController注册进去了,用来启动ReplicationController控制器。

  • newReplicationManager中主要配置ReplicationManager时,进行了如下关键配置:

    • 通过workqueue.NewNamedRateLimitingQueue配置queue。
    • 通过controller.NewUIDTrackingControllerExpectations配置expectations。
    • 配置rcStore, podStore, rcController, podController。
    • 配置syncHandler为rm.syncReplicationController,syncReplicationController就是做核心工作的的方法,可以说Replica的自动维护都是由它来完成的。
  • Run方法就是干活的起步点,开始进行watching and syncing:

    • watching
      • go rm.rcController.Run(stopCh)负责watch all rc。
      • go rm.podController.Run(stopCh)负责watch all pod。
    • syncing
      • 启动workers数量的goroutine。
      • 每个goroutine都不断循环执行rm.worker,每个循环之间停留1s。而rm.worker就是负责从queue中获取rc并调用syncHandler进行同步。
      • 每个goroutine直到收到stopCh信号才结束。
  • syncReplicationController流程中,核心步骤是调用的manageReplicas方法,manageReplicas负责rc对应replicas的修复工作(add or delete)。

更多关于kubernetes的深入文章,请看我csdn或者oschina的博客主页。

© 著作权归作者所有

WaltonWang
粉丝 226
博文 106
码字总数 226882
作品 0
深圳
程序员
私信 提问
Kubernetes 设计概要(非完整版)

Kubernetes 设计概要 (原文:https://github.com/GoogleCloudPlatform/kubernetes/blob/master/DESIGN.md) 概述 --------------------------------- * Kubernetes 构建于 Docker之上,是基于......

深蓝苹果
2014/06/11
1K
0
Kubernetes API分类汇总

1. 资源对象 1.1. Namespace 1.2. Endpoints 1.3. Pod Pod操作: 1.4. ReplicationController 1.5. Node 1.6. Service 1.7. ResourceQuota 1.8. Secret 1.9. ServiceAccount 1.10. Persisten......

huwh_
2017/09/10
0
0
Kubernetes核心概念之Replication Controller详解

Replication Controller简称RC,它能够保证Pod持续运行,并且在任何时候都有指定数量的Pod副本,在此基础上提供一些高级特性,比如滚动升级和弹性伸缩 它在k8s中的架构如图: RC会在每个节点...

奋斗的寒霜
2017/12/08
0
0
微服务部署 Docker 和 Kubernetes 译

Docker 几年前,Docker 以优雅的解决方案来实现不变交付(immutable delivery)。Docker 允许我们将应用程序与它需要的所有依赖包(os、jvm、其他应用程序依赖项等)打包,以轻量级、分层、镜像...

woshixin
2018/07/04
45
0
搭建Spark集群?没想到你是这样的k8s

在本文的例子中,你将使用 Kubernetes 和 Docker 创建一个功能型Apache Spark集群。 你将使用Spark standalone模式 安装一个 Spark master服务和一组Spark workers。 对于已熟悉这部分内容的...

时速云
2016/08/24
744
0

没有更多内容

加载失败,请刷新页面

加载更多

linux负载均衡总结性说明 四层负载和七层负载有什么区别

这篇文章主要为大家详细介绍了linux负载均衡的相关资料,什么是负载均衡?四层负载和七层负载有什么区别?具有一定的参考价值,感兴趣的小伙伴们可以参考一下 在常规运维工作中,经常会运用到...

天子剑毅
19分钟前
2
0
mysql in与or效率比较

在网上一直看到的是or和in的效率没啥区别,一直也感觉是这样,前几天刚好在看《mysql数据库开发的36条军规》的文章,里面提到了or和in的效率问题,文中提到or的效率为O(n),而in的效率为O(l...

whatwhowhy
20分钟前
2
0
使用docker 基于pxc镜像搭建mysql高可用集群

前置条件 docker已安装: 第一步:拉取镜像 docker pull percona/percona-xtradb-cluster:5.7.21 第二步:复制重命名镜像(可选) docker tag percona/percona-xtradb-cluster:5.7.21 pxc 第...

小海bug
25分钟前
4
0
windows安装nginx负载均衡

第一步:下载安装nginx 地址:http://nginx.org/en/docs/windows.html 下载完成,比如放在C盘根目录下: cd c:\ unzip nginx-1.15.3.zip //解压文件 cd nginx-1.15.3 //进入目录 start ngin...

你好夜故事
28分钟前
4
0
Jenkins CLI,助你轻松管理 Jenkins

本文首发于:Jenkins 中文社区 作者:Donghui Wang Jenkins CLI,简称 jcli,一个使用 Golang 开发的开源的 Jenkins 命令行工具。 它可以帮忙你轻松地管理 Jenkins。 无论你是 Jenkins 插件开...

Jenkins中文社区
30分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部