Kubernetes 1.12.0 Kube-controller-manager之deployment-controller源码阅读分析

原创
2018/12/01 22:51
阅读数 3K

前言

Kube-controller-manager组件最终启动了很多controller,本文将对其中的deployment-controller的代码进行走读。

启动DeploymentController

startDeploymentController函数是Kube-Controller-Manager启动DeploymentController的入口。

  • 检查是否启动了Deployment的api
  • 调用deployment.NewDeploymentController函数创建DeploymentController实例dc
  • go routine启动DeploymentController.Run方法,并将--concurrent-deployment-syncs参数(默认值:5)值传入
k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go:82

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
   if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
      return nil, false, nil
   }
   dc, err := deployment.NewDeploymentController(
      ctx.InformerFactory.Apps().V1().Deployments(),
      ctx.InformerFactory.Apps().V1().ReplicaSets(),
      ctx.InformerFactory.Core().V1().Pods(),
      ctx.ClientBuilder.ClientOrDie("deployment-controller"),
   )
   if err != nil {
      return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
   }
   go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
   return nil, true, nil
}

创建DeploymentController实例

startDeploymentController调用deployment.NewDeploymentController函数创建DeploymentController实例,NewDeploymentController函数逻辑如下:

  • 注册Deployment Informer的Event Handler,AddFunc注册为dc.addDeployment方法,UpdateFunc注册为dc.updateDeployment方法,DeleteFunc注册为dc.deleteDeployment方法
  • 注册ReplicaSet Informer的Event Handler,AddFunc注册为dc.ReplicaSet方法,UpdateFunc注册为dc.ReplicaSet方法,DeleteFunc注册为dc.ReplicaSet方法
  • 注册Pod Informer的Event Handler,只注册DeleteFunc为dc.deletepod方法
  • 注册dc.syncHandler为dc.syncDeployment方法,因为dc.syncHandler是Deployment Controller的主要逻辑,所以在之后调用该方法之后再详细分析
  • 注册dc.enqueueDeployment为dc.enqueue方法,dc.enqueue方法是将Deployment添加到DeploymentController维护的RateLimiter类型的queue中
  • 最后注册Deployment Lister、ReplicaSet Lister、Pod Lister以及对应的Synced函数

可以看出Deployment listwatch集群Deployment、ReplicaSet、Pod对象,接下来就来看看Deployment、ReplicaSet、Pod Event Handler注册几个方法分别做了什么

k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:100

func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartLogging(glog.Infof)
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

   if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
      if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
         return nil, err
      }
   }
   dc := &DeploymentController{
      client:        client,
      eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
      queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
   }
   dc.rsControl = controller.RealRSControl{
      KubeClient: client,
      Recorder:   dc.eventRecorder,
   }

   dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    dc.addDeployment,
      UpdateFunc: dc.updateDeployment,
      // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
      DeleteFunc: dc.deleteDeployment,
   })
   rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    dc.addReplicaSet,
      UpdateFunc: dc.updateReplicaSet,
      DeleteFunc: dc.deleteReplicaSet,
   })
   podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      DeleteFunc: dc.deletePod,
   })

   dc.syncHandler = dc.syncDeployment
   dc.enqueueDeployment = dc.enqueue

   dc.dLister = dInformer.Lister()
   dc.rsLister = rsInformer.Lister()
   dc.podLister = podInformer.Lister()
   dc.dListerSynced = dInformer.Informer().HasSynced
   dc.rsListerSynced = rsInformer.Informer().HasSynced
   dc.podListerSynced = podInformer.Informer().HasSynced
   return dc, nil
}

Deployment EventHandler

Deployment EventHandler注册的3个Func都比较简单,最终都是调用dc.enqueueDeployment(即dc.enqueue)方法,将Deployment添加到DeploymentController的queue中供后续处理

k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:166

func (dc *DeploymentController) addDeployment(obj interface{}) {
   d := obj.(*apps.Deployment)
   glog.V(4).Infof("Adding deployment %s", d.Name)
   dc.enqueueDeployment(d)
}

func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
   oldD := old.(*apps.Deployment)
   curD := cur.(*apps.Deployment)
   glog.V(4).Infof("Updating deployment %s", oldD.Name)
   dc.enqueueDeployment(curD)
}

func (dc *DeploymentController) deleteDeployment(obj interface{}) {
   d, ok := obj.(*apps.Deployment)
   if !ok {
      tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
         return
      }
      d, ok = tombstone.Obj.(*apps.Deployment)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj))
         return
      }
   }
   glog.V(4).Infof("Deleting deployment %s", d.Name)
   dc.enqueueDeployment(d)
}

ReplicaSet EventHandler

ReplicaSet EventHandler注册的EventHandler相比Deployment多了一些检查,最终也是将Deployment添加到DeploymentController的queue中供后续处理,接下来详细看看对应的代码。

AddFunc

ReplicaSet EventHandler的AddFunc注册的是dc.addReplicaSet方法。

  • 检查rs的DeletionTimestamp是否不为空,即检查rs是否已经删除,如果删除,则执行dc.deleteReplicaSet方法,之后return
  • 检查rs是否有ControllerRef,如果有,取出对应的Deployment,并调用dc.enqueueDeployment方法将Deployment添加到DeploymentController的queue中
  • 如果上一步检查到rs未有ControllerRef,说明rs是orphan的,则调用dc.getDeploymentsForReplicaSet方法检查所有Deployment,查出Selector与rs Labels相符的Deployment,并调用dc.enqueueDeployment方法将查询出的Deployment添加到DeploymentController的queue中(注:K8s的最佳实践建议不直接建立rs,而是通过Deployment管理rs,所以正常情况下ReplicaSet的AddFunc不应该有orphan的rs)
k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:198

func (dc *DeploymentController) addReplicaSet(obj interface{}) {
   rs := obj.(*apps.ReplicaSet)

   //检查rs是否已经删除
   if rs.DeletionTimestamp != nil {
      // On a restart of the controller manager, it's possible for an object to
      // show up in a state that is already pending deletion.
      dc.deleteReplicaSet(rs)
      return
   }

   //检查rs是否有ControllerRef,如果有,取出对应的Deployment,并调用dc.enqueueDeployment方法将Deployment添加到DeploymentController的queue中
   // If it has a ControllerRef, that's all that matters.
   if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
      d := dc.resolveControllerRef(rs.Namespace, controllerRef)
      if d == nil {
         return
      }
      glog.V(4).Infof("ReplicaSet %s added.", rs.Name)
      dc.enqueueDeployment(d)
      return
   }

   //如果上一步检查到rs未有ControllerRef,说明rs是orphan的,则调用dc.getDeploymentsForReplicaSet方法检查所有Deployment,查出Selector与rs Labels相符的Deployment,并调用dc.enqueueDeployment方法将查询出的Deployment添加到DeploymentController的queue中
   // Otherwise, it's an orphan. Get a list of all matching Deployments and sync
   // them to see if anyone wants to adopt it.
   ds := dc.getDeploymentsForReplicaSet(rs)
   if len(ds) == 0 {
      return
   }
   glog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
   for _, d := range ds {
      dc.enqueueDeployment(d)
   }
}

UpdateFunc

ReplicaSet EventHandler的UpdateFunc注册的是dc.updateReplicaSet方法。

  • 检查新旧rs的version是否一致,如果一致,说明rs未有更新,则直接return
  • 检查新旧rs的ControllerRef,如果有ControllerRef有改变,且oldControllerRef不为nil,则调用dc.enqueueDeployment方法将旧的rs所属的Deployment添加到DeploymentController的queue中
  • 如果新rs的ControllerRef不为nil,则调用dc.enqueueDeployment方法将新的rs所属的Deployment添加到DeploymentController的queue中,接着return
  • 新rs的ControllerRef为nil,即rs为orphan的,则检查label或者ControllerRef是否改变,如果改变,则调用dc.getDeploymentsForReplicaSet方法检查所有Deployment,查出Selector与新rs Labels相符的Deployment,并调用dc.enqueueDeployment方法将查询出的Deployment添加到DeploymentController的queue中
k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:255

func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
   curRS := cur.(*apps.ReplicaSet)
   oldRS := old.(*apps.ReplicaSet)
   //检查新旧rs的version是否一致,如果一致,说明rs未有更新,则直接return
   if curRS.ResourceVersion == oldRS.ResourceVersion {
      // Periodic resync will send update events for all known replica sets.
      // Two different versions of the same replica set will always have different RVs.
      return
   }

   curControllerRef := metav1.GetControllerOf(curRS)
   oldControllerRef := metav1.GetControllerOf(oldRS)
   controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)    
   //检查新旧rs的ControllerRef,如果有ControllerRef有改变,且oldControllerRef不为nil,则调用dc.enqueueDeployment方法将旧的rs所属的Deployment添加到DeploymentController的queue中
   if controllerRefChanged && oldControllerRef != nil {
      // The ControllerRef was changed. Sync the old controller, if any.
      if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
         dc.enqueueDeployment(d)
      }
   }

   //如果新rs的ControllerRef不为nil,则调用dc.enqueueDeployment方法将新的rs所属的Deployment添加到DeploymentController的queue中,接着return
   // If it has a ControllerRef, that's all that matters.
   if curControllerRef != nil {
      d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
      if d == nil {
         return
      }
      glog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
      dc.enqueueDeployment(d)
      return
   }

   //新rs的ControllerRef为nil,即rs为orphan的,则检查label或者ControllerRef是否改变,如果改变,则调用dc.getDeploymentsForReplicaSet方法检查所有Deployment,查出Selector与新rs Labels相符的Deployment,并调用dc.enqueueDeployment方法将查询出的Deployment添加到DeploymentController的queue中
   // Otherwise, it's an orphan. If anything changed, sync matching controllers
   // to see if anyone wants to adopt it now.
   labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
   if labelChanged || controllerRefChanged {
      ds := dc.getDeploymentsForReplicaSet(curRS)
      if len(ds) == 0 {
         return
      }
      glog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name)
      for _, d := range ds {
         dc.enqueueDeployment(d)
      }
   }
}

Delete Func

ReplicaSet EventHandler的DeleteFunc注册的是dc.deleteReplicaSet方法。

  • 检查传入的obj是否是rs
  • 检查rs的ControllerRef,如果为nil,说明是orphan的rs,没有Deployment控制,所以直接return
  • 取出rs的Deployment,并调用dc.enqueueDeployment方法将查询出的Deployment添加到DeploymentController的queue中
k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:303

func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
   rs, ok := obj.(*apps.ReplicaSet)

   //检查传入的obj是否是rs
   // When a delete is dropped, the relist will notice a pod in the store not
   // in the list, leading to the insertion of a tombstone object which contains
   // the deleted key/value. Note that this value might be stale. If the ReplicaSet
   // changed labels the new deployment will not be woken up till the periodic resync.
   if !ok {
      tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
         return
      }
      rs, ok = tombstone.Obj.(*apps.ReplicaSet)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v", obj))
         return
      }
   }

   //检查rs的ControllerRef,如果为nil,说明是orphan的rs,没有Deployment控制,所以直接return
   controllerRef := metav1.GetControllerOf(rs)
   if controllerRef == nil {
      // No controller should care about orphans being deleted.
      return
   }

   //取出rs的Deployment,并调用dc.enqueueDeployment方法将查询出的Deployment添加到DeploymentController的queue中
   d := dc.resolveControllerRef(rs.Namespace, controllerRef)
   if d == nil {
      return
   }
   glog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
   dc.enqueueDeployment(d)
}

Pod EventHandler

Pod EventHandler就只watch了pod的delete func,并为DeleteFunc注册dc.deletePod方法

  • 检查传入的obj是否是pod
  • 调用dc.getDeploymentForPod获取Pod的rs,再通过rs获取到Deployment到d,检查Deployment的Strategy Type是否是Recreate,如果是Recreate,则检查Deployment的pod数量是否是0,如果是0则将调用dc.enqueueDeployment方法将Deployment d添加到DeploymentController的queue中            
k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:337

func (dc *DeploymentController) deletePod(obj interface{}) {
   pod, ok := obj.(*v1.Pod)

   //检查传入的obj是否是pod
   // When a delete is dropped, the relist will notice a pod in the store not
   // in the list, leading to the insertion of a tombstone object which contains
   // the deleted key/value. Note that this value might be stale. If the Pod
   // changed labels the new deployment will not be woken up till the periodic resync.
   if !ok {
      tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
         return
      }
      pod, ok = tombstone.Obj.(*v1.Pod)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
         return
      }
   }
   glog.V(4).Infof("Pod %s deleted.", pod.Name)

   //调用dc.getDeploymentForPod获取Pod的rs,再通过rs获取到Deployment到d,检查Deployment的Strategy Type是否是Recreate,如果是Recreate,则检查Deployment d的pod数量是否是0,如果是0则将调用dc.enqueueDeployment方法将Deployment d添加到DeploymentController的queue中
   if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
      // Sync if this Deployment now has no more Pods.
      rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
      if err != nil {
         return
      }
      podMap, err := dc.getPodMapForDeployment(d, rsList)
      if err != nil {
         return
      }
      numPods := 0
      for _, podList := range podMap {
         numPods += len(podList.Items)
      }
      if numPods == 0 {
         dc.enqueueDeployment(d)
      }
   }
}

执行DeploymentController

startDeploymentController通过go routine启动DeploymentController.Run方法,并将--concurrent-deployment-syncs参数(默认值:5)值传入,下面就来看看DeploymentController.Run方法。

  • 首先调用Controller.WaitForCacheSync,等待DeploymentInformer、ReplicaSetInformer、PodInformer的HasSyncs都返回true,即等待Deployment、ReplicaSet、Pod 这3个Object完成同步
  • 接着启动--concurrent-deployment-syncs个go routine,每个go routine都是每隔1秒循环执行dc.worker方法,因此--concurrent-deployment-syncs参数决定了DeploymentController启动几个go routine来处理Deployment,这样也就能理解--concurrent-deployment-syncs参数的说明了:"The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load"
k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:148

func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   defer dc.queue.ShutDown()

   glog.Infof("Starting deployment controller")
   defer glog.Infof("Shutting down deployment controller")

   if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
      return
   }

   for i := 0; i < workers; i++ {
      go wait.Until(dc.worker, time.Second, stopCh)
   }

   <-stopCh
}

dc.worker方法

可以看到dc.worker只是循环调用了dc.processNextWorkItem()直到它返回false。dc.processNextWorkItem()方法的逻辑非常清晰:

  • 从DeploymentController的queue中取出一个RateLimitQueue到key中,如果未取到,则直接return false
  • 调用dc.syncHandler处理key,新建DeploymentController实例的时候,dc.syncHandler注册的是dc.syncDeployment方法,所以实际是用dc.syncDeployment方法处理key
  • 接着调用dc.handleErr
    • 如果上一步沒有err,則将上一步处理的Key从DeploymentController的queue中删除
    • 如果有error,则检查该key重加到DeploymentController的queue中的次数是否小于15次,如果小于15次则将该key重新加入DeploymentController的queue中,否则则将该key从DeploymentController的queue中删除
  • 到这里就直接返回true

可以看出只有DeploymentController的queue全部处理完成,dc.processNextWorkItem()才会return false,因此dc.worker只有DeploymentController的queue全部处理完成才会结束,接下来就看看dc.syncDeployment是如何处理Deployment的

k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:459

func (dc *DeploymentController) worker() {
   for dc.processNextWorkItem() {
   }
}

func (dc *DeploymentController) processNextWorkItem() bool {
   key, quit := dc.queue.Get()
   if quit {
      return false
   }
   defer dc.queue.Done(key)

   err := dc.syncHandler(key.(string))
   dc.handleErr(err, key)

   return true
}

dc.syncDeployment:检查Deployment对象的状态更新Deployment

dc.syncDeployment方法是真正检查Deployment object各个设置后update Deployment的处理,具体逻辑如下:

  • 获取传入key的namespace和name
  • 通过上一步得来的namespace和name获取Deployment object
  • 检查Deployment的Selector,如果是空,则直接return
  • 调用dc.getReplicaSetsForDeployment获取Deployment的所有rs
  • 调用dc.getPodMapForDeployment获取Deployment的所有pod
  • 检查Deployment的DeletionTimestamp,如果不为nil,即表示Deployment即将删除,调用dc.syncStatusOnly方法update Deployment的status,完成后return
  • 调用dc.checkPausedConditions方法检查Deployment的Pause Condition,dc.checkPausedConditions方法主要处理如下:
    • 调用deploymentutil.GetDeploymentCondition将Deployment的Progressing的Condition给到Cond,如果Cond不为空(即有Progressing的Condition)且Cond Reason为“ProgressDeadlineExceeded”,则不做任何处理return
    • Cond不为空,且Cond Reason为"DeploymentPaused",则将pausedCondExists设置为True,否则将pausedCondExists设为False
    • 如果Deployment的Paused为True且pausedCondExists为False,将needUpdate设为True,且给Deployment增加一个Condition,其中Type为"Progressing",Status为"Unknown",Reason为"DeploymentPaused",Message为"Deployment is paused"
    • 或者Deployment的Paused为False且pausedCondExists为True,将needUpdate设为True,且给Deployment增加一个Condition,其中Type为"Progressing",Status为"Unknown",Reason为"DeploymentResumed",Message为"Deployment is resumed"
    • 如果上两步将needUpdate设置成True了,则通过Kube-apiserver设置Deployment的Status,即增加上两步的Condition
  • 如果Deployment的Pause为true,则调用dc.sync方法update Deployment的status,完成后return
  • 调用getRollbackTo函数,检查Deployment是否有Annotations:"deprecated.deployment.rollback.to",如果有,调用dc.rollback方法rollback Deployment,完成后return
  • 调用dc.isScalingEvent方法检查活动的rs(Replicas大于0)的"deployment.kubernetes.io/desired-replicas"是否和Deployment的Replicas相同,不相同则将scalingEvent设为true,否则将scalingEvent设为false
  • 检查上一步得到的scalingEvent,如果为true(即rs的Desired-replicas和Deployment的Replicas不一致),则调用dc.sync方法update Deployment的status,完成后return
  • 检查Deployment的Strategy.Type,如果是Recreate,则调用dc.rolloutRecreate方法升级Deployment;如果是RollingUpdate,则调用dc.rolloutRolling方法升级Deployment,完成升级后return

可以看到dc.syncDeployment方法最终是通过调用dc.syncStatusOnly,dc.sync,dc.rollback,dc.rolloutRecreate,dc.rolloutRolling来更新Deployment对象的,其中dc.syncStatusOnly,dc.syncStatusOnly & dc.sync都是更新Deployment的Status,dc.rollback是回退Deployment的版本,dc.rolloutRecreate,dc.rolloutRolling是根据不同的更新策略更新Deployment,下面我们就具体来看看这些方法是怎么处理的,首先看看dc.syncStatusOnly方法

k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go:560

func (dc *DeploymentController) syncDeployment(key string) error {
   startTime := time.Now()
   glog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
   defer func() {
      glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
   }()

   //获取传入key的namespace和name
   namespace, name, err := cache.SplitMetaNamespaceKey(key)
   if err != nil {
      return err
   }

   //通过上一步得来的namespace和name获取Deployment object
   deployment, err := dc.dLister.Deployments(namespace).Get(name)
   if errors.IsNotFound(err) {
      glog.V(2).Infof("Deployment %v has been deleted", key)
      return nil
   }
   if err != nil {
      return err
   }

   // Deep-copy otherwise we are mutating our cache.
   // TODO: Deep-copy only when needed.
   d := deployment.DeepCopy()

   //检查Deployment的Selector,如果是空,则直接return
   everything := metav1.LabelSelector{}
   if reflect.DeepEqual(d.Spec.Selector, &everything) {
      dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
      if d.Status.ObservedGeneration < d.Generation {
         d.Status.ObservedGeneration = d.Generation
         dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)
      }
      return nil
   }

   //调用dc.getReplicaSetsForDeployment获取Deployment的所有rs
   // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
   // through adoption/orphaning.
   rsList, err := dc.getReplicaSetsForDeployment(d)
   if err != nil {
      return err
   }
   
   //调用dc.getPodMapForDeployment获取Deployment的所有pod
   // List all Pods owned by this Deployment, grouped by their ReplicaSet.
   // Current uses of the podMap are:
   //
   // * check if a Pod is labeled correctly with the pod-template-hash label.
   // * check that no old Pods are running in the middle of Recreate Deployments.
   podMap, err := dc.getPodMapForDeployment(d, rsList)
   if err != nil {
      return err
   }

   //检查Deployment的DeletionTimestamp,如果不为nil,即表示Deployment即将删除,调用dc.syncStatusOnly方法update Deployment的status,完成后return
   if d.DeletionTimestamp != nil {
      return dc.syncStatusOnly(d, rsList)
   }

   //调用dc.checkPausedConditions方法检查Deployment的Pause Condition
   // Update deployment conditions with an Unknown condition when pausing/resuming
   // a deployment. In this way, we can be sure that we won't timeout when a user
   // resumes a Deployment with a set progressDeadlineSeconds.
   if err = dc.checkPausedConditions(d); err != nil {
      return err
   }

   //如果Deployment的Pause为true,则调用dc.sync方法update Deployment的status,完成后return
   if d.Spec.Paused {
      return dc.sync(d, rsList)
   }

   //调用getRollbackTo函数,检查Deployment是否有Annotations:"deprecated.deployment.rollback.to",如果有,调用dc.rollback方法rollback Deployment,完成后return
   // rollback is not re-entrant in case the underlying replica sets are updated with a new
   // revision so we should ensure that we won't proceed to update replica sets until we
   // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
   if getRollbackTo(d) != nil {
      return dc.rollback(d, rsList)
   }

   //调用dc.isScalingEvent方法检查活动的rs(Replicas大于0)的"deployment.kubernetes.io/desired-replicas"是否和Deployment的Replicas相同,不相同则将scalingEvent设为true,否则将scalingEvent设为false
   scalingEvent, err := dc.isScalingEvent(d, rsList)
   if err != nil {
      return err
   }

   //检查上一步得到的scalingEvent,如果为true,则调用dc.sync方法update Deployment的status,完成后return
   if scalingEvent {
      return dc.sync(d, rsList)
   }

   //检查Deployment的Strategy.Type,如果是Recreate,则调用dc.rolloutRecreate方法升级Deployment;如果是RollingUpdate,则调用dc.rolloutRolling方法升级Deployment,完成升级后return
   switch d.Spec.Strategy.Type {
   case apps.RecreateDeploymentStrategyType:
      return dc.rolloutRecreate(d, rsList, podMap)
   case apps.RollingUpdateDeploymentStrategyType:
      return dc.rolloutRolling(d, rsList)
   }
   return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

dc.syncStatusOnly方法 & dc.sync方法 

可以看到dc.syncStatusOnly & dc.sync方法的逻辑相似都存在以下两个处理:

  • 调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则返回nil)到newRS和获取所有旧的rs到OldRSs
    • 如果rs的Spec Template和Deployment的Spec Template一样,则表示该rs为newrs
    • 除了上一步的rs,其他rs都是oldrs
  • 调用dc.syncDeploymentStatus更新Deployment的Status(主要是更新Status的Replicas,UpdateReplicas,ReadyReplicas,AvailableReplicas,UnavailableReplicas,以及Status中的Available Condition),完成后返回,具体逻辑如下:
    • 调用calculateStatus函数计算Deployment当前的Status到newStatus,calculateStatus函数具体逻辑如下:
      • 调用deploymentutil.GetAvailableReplicaCountForReplicaSets汇总所有rs的Status.AvailableReplicas到availableReplicas
      • 调用deploymentutil.GetReplicaCountForReplicaSets汇总所有的rs的Spec.Replicas到totalReplicas
      • 将totalReplicas - availableReplicas给到unavailableReplicas,如果计算出的unavailableReplicas小于0,则将unavailableReplicas设为0
      • 新建DeploymentStatus类型的status,其中ObservedGeneration取用deployment.Generation;调用deploymentutil.GetActualReplicaCountForReplicaSets汇总所有rs的Status.Replicas到Replicas;调用deploymentutil.GetActualReplicaCountForReplicaSets汇总newrs的Status.Replicas到UpdatedReplicas;调用deploymentutil.GetReadyReplicaCountForReplicaSets汇总所有rs的Status.ReadyReplicas的到ReadyReplicas;AvailableReplicas取值之前计算得出的availableReplicas;UnavailableReplicas取值上一步计算得出的unavailableReplicas
      • 将Deployment所有的Status Conditions更新到status
      • 调用deploymentutil.MaxUnavailable计算maxunavailable:(1)如果Deployment的Strategy不是RollingUpdate或者Spec.Replicas为0,则maxunavailable为0;(2)如果没满足第一点的条件,则传入Deployment的Strategy.RollingUpdate.MaxUnavailable和Spec.Replicas调用ResolveFenceposts计算,如果Strategy.RollingUpdate.MaxUnavailable是数量则取该值为maxunavailable;如果Strategy.RollingUpdate.MaxUnavailable为百分比,则将Strategy.RollingUpdate.MaxUnavailable与Spec.Replicas相乘得出数据向下取整
      • 接着将availableReplicas与Deployment.spec.Replicas-maxunavailable做对比,如果前者大于等于后者,则将status的Condition Available设为True,否则将status的Condition Available设为False
    • 对比newStatus与Deployment object中存储的Status,如果一样则直接return,如果不一样则执行后续的逻辑
    • 如果上一步检查到newStatus与Deployment object中存储的Status不一致,则通过Kube-apiserver将Deployment object的Status更新为newStatus
k8s.io/kubernetes/pkg/controller/deployment/sync.go:36

func (dc *DeploymentController) syncStatusOnly(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
   //调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则返回nil)到newRS和获取所有旧的rs到OldRSs
   newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
   if err != nil {
      return err
   }

   //调用dc.syncDeploymentStatus更新Deployment的Status(主要是更新Status的Replicas,UpdateReplicas,ReadyReplicas,AvailableReplicas,UnavailableReplicas,以及Status中的Available Condition)
   allRSs := append(oldRSs, newRS)
   return dc.syncDeploymentStatus(allRSs, newRS, d)
}

// sync is responsible for reconciling deployments on scaling events or when they
// are paused.
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
   //调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则返回nil)到newRS和获取所有旧的rs到OldRSs
   newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
   if err != nil {
      return err
   }
   if err := dc.scale(d, newRS, oldRSs); err != nil {
      // If we get an error while trying to scale, the deployment will be requeued
      // so we can abort this resync
      return err
   }

   //如果Deployment的Paused为true,且getRollbackTo函数返回为空(即Deployment没有Annotations:"deprecated.deployment.rollback.to"),则调用dc.cleanupDeployment方法清除多余的rs
   // Clean up the deployment when it's paused and no rollback is in flight.
   if d.Spec.Paused && getRollbackTo(d) == nil {
      if err := dc.cleanupDeployment(oldRSs, d); err != nil {
         return err
      }
   }

   //调用dc.syncDeploymentStatus更新Deployment的Status(主要是更新Status的Replicas,UpdateReplicas,ReadyReplicas,AvailableReplicas,UnavailableReplicas,以及Status中的Available Condition)
   allRSs := append(oldRSs, newRS)
   return dc.syncDeploymentStatus(allRSs, newRS, d)
}

k8s.io/kubernetes/pkg/controller/deployment/sync.go:463
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
   //调用calculateStatus计算Deployment当前的Status到newStatus
   newStatus := calculateStatus(allRSs, newRS, d)

   //对比newStatus与Deployment object中存储的Status,如果一样则直接return,如果不一样则执行后续的逻辑
   if reflect.DeepEqual(d.Status, newStatus) {
      return nil
   }

   //如果上一步检查到newStatus与Deployment object中存储的Status不一致,则通过Kube-apiserver将Deployment object的Status更新为newStatus
   newDeployment := d
   newDeployment.Status = newStatus
   _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment)
   return err
}

// calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets.
func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus {
   //调用deploymentutil.GetAvailableReplicaCountForReplicaSets汇总所有rs的Status.AvailableReplicas到availableReplicas
   availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
   //调用deploymentutil.GetReplicaCountForReplicaSets汇总所有的rs的Spec.Replicas到totalReplicas
   totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
   //将totalReplicas - availableReplicas给到unavailableReplicas,如果计算出的unavailableReplicas小于0,则将unavailableReplicas设为0
   unavailableReplicas := totalReplicas - availableReplicas
   // If unavailableReplicas is negative, then that means the Deployment has more available replicas running than
   // desired, e.g. whenever it scales down. In such a case we should simply default unavailableReplicas to zero.
   if unavailableReplicas < 0 {
      unavailableReplicas = 0
   }

   //新建DeploymentStatus类型的status,其中ObservedGeneration取用deployment.Generation;调用deploymentutil.GetActualReplicaCountForReplicaSets汇总所有rs的Status.Replicas到Replicas;调用deploymentutil.GetActualReplicaCountForReplicaSets汇总newrs的Status.Replicas到UpdatedReplicas;调用deploymentutil.GetReadyReplicaCountForReplicaSets汇总所有rs的Status.ReadyReplicas的到ReadyReplicas;AvailableReplicas取值之前计算得出的availableReplicas;UnavailableReplicas取值上一步计算得出的unavailableReplicas
   status := apps.DeploymentStatus{
      // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
      ObservedGeneration:  deployment.Generation,
      Replicas:            deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
      UpdatedReplicas:     deploymentutil.GetActualReplicaCountForReplicaSets([]*apps.ReplicaSet{newRS}),
      ReadyReplicas:       deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs),
      AvailableReplicas:   availableReplicas,
      UnavailableReplicas: unavailableReplicas,
      CollisionCount:      deployment.Status.CollisionCount,
   }

   //将Deployment所有的Status Conditions更新到status
   // Copy conditions one by one so we won't mutate the original object.
   conditions := deployment.Status.Conditions
   for i := range conditions {
      status.Conditions = append(status.Conditions, conditions[i])
   }

   //调用deploymentutil.MaxUnavailable计算maxunavailable(1)如果Deployment的Strategy不是RollingUpdate或者Spec.Replicas为0,则maxunavailable为0;
(2)如果没满足第一点的条件,则传入Deployment的Strategy.RollingUpdate.MaxUnavailable和Spec.Replicas调用ResolveFenceposts计算,如果Strategy.RollingUpdate.MaxUnavailable是数量则取该值给maxunavailable;如果Strategy.RollingUpdate.MaxUnavailable为百分比,则将Strategy.RollingUpdate.MaxUnavailable与Spec.Replicas相乘得出数据以后向下取整
    接着将availableReplicas与Deployment.spec.Replicas-maxunavailable,如果前者大于等于后者,则将status的Condition Available设为True,否则将status的Condition Available设为False
   if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
      minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.")
      deploymentutil.SetDeploymentCondition(&status, *minAvailability)
   } else {
      noMinAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.")
      deploymentutil.SetDeploymentCondition(&status, *noMinAvailability)
   }

   return status
}

dc.sync相对于dc.syncStatusOnly,只是在以上两个处理的中间增加了如下两个处理

  • 调用dc.scale方法
    • 调用deploymentutil.FindActiveOrLatest检查活动的rs(即rs的Spec.Replicas大于0):(1)活动的rs数量为0,如果newRS存在,则activeOrLatest等于newRS,否则newRS不存在,则activeOrLatest等于最新的oldRS;(2)如果活动的rs数量为1,则activeOrLatest等于newRS;(3)如果活动的rs数量大于1(即Deployment在进行滚动更新),则activeOrLatest等于nil。检查activeOrLatest,如果不为nil:检查activeOrLatest的Spec.Replicas和Deployment的Spec.Replicas是否一样,如果一样则return;如果不一样则调用dc.scaleReplicaSetAndRecordEvent将activeOrLatest的Replicas scale到Deployment的Spec.Replicas,完成后return
    • 调用deploymentutil.IsSaturated检查newRS的Replicas(1、Annotations "deployment.kubernetes.io/desired-replicas"对应的Replicas;2、rs的Spec.Replicas;3、rs的Status.AvailableReplicas)是否都和Deployment的Spec.Replicas一样,如果一样,则将所有oldRS的Replicas都scale为0,完成后return
    • 调用deploymentutil.IsRollingUpdate检查Deployment的Spec.Strategy.Type,如果是"RollingUpdate",执行以下逻辑
      • 调用controller.FilterActiveReplicaSets检查所有的rs,将所有活动的rs(即rs的Spec.Replicas大于0)放到allRSs,调用deploymentutil.GetReplicaCountForReplicaSets汇总allRSs中所有rs的Spec.Replicas到allRSsReplica
      • 调用deploymentutil.MaxSurge计算maxSurge:(2)如果Spec.Strategy.RollingUpdate.MaxSurge为整数,则将其给maxSurge;(3)如果Spec.Strategy.RollingUpdate.MaxSurge为百分比,则乘以Spec.Replicas,并将得出的数据向上取整给maxSurge。allowedSize = Deployment.Spec.Replicas + maxSurge
      • deploymentReplicasToAdd等于上两步计算得出的allowedSize - allRSsReplicas,检查deploymentReplicasToAdd的值,(1)大于0则将scalingOperation设为”up”,并将allRSs排序,将最新的排在0,最旧的排在最后;(2)小于0则将scalingOperation设为”down”,并将allRSs排序,将最旧的排在0,最新的排在最后;
      • 初始化deploymentReplicasAdded为0,顺序计算allRSs中所有的rs的nameToSize,计算方法如下:
         一、如果deploymentReplicasToAdd不等于0,则
               1、计算proportion,计算方法如下:
                  (1)计算rsFraction,方法如下:
                      a. deploymentReplicas取值Deployment.Spec.Replicas + maxsurge(maxsurge的计算方法参考之前列出的计算方法)
                      b. annotatedReplicas取值rs的Annotation "deployment.kubernetes.io/max-replicas",如果取不到,则使用Deployment的Status.Replicas
                      c. newRSsize = rs.Spec.Replicas * deploymentReplicas / annotatedReplicas
                      d. rsFraction等于四舍五入后的newRSsize - rs.Spec.Replicas
                  (2)allowed := deploymentReplicasToAdd - deploymentReplicasAdded
                  (3)如果deploymentReplicasToAdd大于0,proportion取rsFraction和allowed其中更小的值,如果deploymentReplicasToAdd不大于0,proportion取rsFraction和allowed其中更大的值
               2、nameToSize[rs.Name]取值rs.Spec.Replicas + proportion
               3、deploymentReplicasAdded += proportion
         二、如果deploymentReplicasToAdd等于0,则nameToSize[rs.Name]取值rs.Spec.Replicas
      • 如果deploymentReplicasToAdd不为0,则重新计算allRSs中第一个rs的nameToSize,将第一个rs的nameToSize加上deploymentReplicasToAdd - deploymentReplicasAdded,如果计算后的nameToSize小于0则取值0,再以计算之后的nameToSize对第一个rs做scale,顺序对allRSs中所有的rs调用dc.scaleReplicaSet将rs的Spec.Replicas scale到nameToSize(注:最后两步计算nameToSize的逻辑挺奇怪的,其中deploymentReplicasToAdd = 0时,计算出的nameToSize[rs.Name]是和rs.Spec.Replica一样的,这样即使调用dc.scaleReplicaSet也不会做scale,那么为什么deploymentReplicasToAdd = 0时不跳过计算nameToSizedc.scaleReplicaSet?另外计算第一个rs的nameToSize为什么要分到两个循环中
    • 可以看到dc.scale方法最终调用dc.scaleReplicaSetAndRecordEvent方法或者dc.scaleReplicaSet方法来做rs的scale,其中dc.scaleReplicaSetAndRecordEvent方法最后也是调用dc.scaleReplicaSet方法,只是在调用之前会检查rs的Spec.Replicas和传入的newScale是否相等,如相等则无需后续动作,并对比rs的Spec.Replicas和newScale来设置scalingOperation。那么就来看看dc.scaleReplicaSet方法是怎么处理rs scale的:
      • rs的Spec.Replicas不等于newScale,将sizeNeedsUpdate设为true
      • 调用deploymentutil.ReplicasAnnotationsNeedUpdate,将返回值给到annotationsNeedUpdate
            (1)如果rs的Annotations为nil,返回true给annotationsNeedUpdate
            (2)如果rs中Annotation "deployment.kubernetes.io/desired-replicas"的值等于Deployment的Spec.Replicas,返回true给annotationsNeedUpdate
            (3)如果rs中Annotation "deployment.kubernetes.io/max-replicas"的值等于Deployment的Spec.Replicas + maxsurge,返回true给annotationsNeedUpdate
            (4)如果以上3个条件都不满足,则返回false给annotationsNeedUpdate
      • 如果sizeNeedsUpdate或者annotationsNeedUpdate为true,将rs的Spec.Replicas设为newScale,并调用deploymentutil.SetReplicasAnnotations,将rs中Annotation "deployment.kubernetes.io/desired-replicas"的值设为Deployment的Spec.Replicas,将rs中Annotation "deployment.kubernetes.io/max-replicas"的值设为Deployment的Spec.Replicas + maxsurge,并通过kube-apiserver update rs
  • 如果Deployment的Paused为true,且getRollbackTo函数返回为空(即Deployment没有Annotations:"deprecated.deployment.rollback.to"),则调用dc.cleanupDeployment方法清除所有的oldRSs,dc.cleanupDeployment方法处理如下:
    • 检查Deployment的Spec.RevisionHistoryLimit,如果为nil则直接return
    • 检查rs,如果rs不为nil且rs的DeletionTimestamp为nil,则将rs添加到cleanableRSes中
    • 将cleanableRSes中rs的个数 - Deployment的Spec.RevisionHistoryLimit的差别给到diff,如果diff小于0即oldrs的个数没有超过RevisionHistoryLimit,直接return
    • 将cleanableRSes按CreationTimestamp进行排序,CreationTimestamp最早的排在0,CreationTimestamp最晚的排最后
    • 检查cleanableRSes数组中下标为0-diff的rs,如果rs的Status.Replicas不为0或者Spec.Replicas不为0或者Generation大于Status.ObservedGeneration或者DeletionTimestamp不为nil,则跳过该rs,否则通过kube-apiserver删除该rs
k8s.io/kubernetes/pkg/controller/deployment/sync.go:289

func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
   // If there is only one active replica set then we should scale that up to the full count of the
   // deployment. If there is no active replica set, then we should scale up the newest replica set.
   //调用deploymentutil.FindActiveOrLatest检查活动的rs(即rs的Spec.Replicas大于0):(1)活动的rs数量为0,如果newRS存在,则activeOrLatest等于newRS,否则newRS不存在,则activeOrLatest等于最新的oldRS;(2)如果活动的rs数量为1,则activeOrLatest等于newRS;(3)如果活动的rs数量大于1(即Deployment在进行滚动更新),则activeOrLatest等于nil。检查activeOrLatest,如果不为nil:检查activeOrLatest的Spec.Replicas和Deployment的Spec.Replicas是否一样,如果一样则return;如果不一样则调用dc.scaleReplicaSetAndRecordEvent将activeOrLatest的Replicas scale到Deployment的Spec.Replicas,完成后return
   if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
      if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
         return nil
      }
      _, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment)
      return err
   }

   //调用deploymentutil.IsSaturated检查newRS的Replicas(1、Annotations "deployment.kubernetes.io/desired-replicas"对应的Replicas;2、rs的Spec.Replicas;3、rs的Status.AvailableReplicas)是否都和Deployment的Spec.Replicas一样,如果一样,则将所有oldRS的Replicas都scale为0,完成后return
   // If the new replica set is saturated, old replica sets should be fully scaled down.
   // This case handles replica set adoption during a saturated new replica set.
   if deploymentutil.IsSaturated(deployment, newRS) {
      for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
         if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
            return err
         }
      }
      return nil
   }

   //调用deploymentutil.IsRollingUpdate检查Deployment的Spec.Strategy.Type,如果是"RollingUpdate",执行以下逻辑
   // There are old replica sets with pods and the new replica set is not saturated.
   // We need to proportionally scale all replica sets (new and old) in case of a
   // rolling deployment.
   if deploymentutil.IsRollingUpdate(deployment) {
      //调用controller.FilterActiveReplicaSets检查所有的rs,将所有活动的rs(即rs的Spec.Replicas大于0)放到allRSs,调用deploymentutil.GetReplicaCountForReplicaSets汇总allRSs中所有rs的Spec.Replicas到allRSsReplica
      allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
      allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)

      //调用deploymentutil.MaxSurge计算maxSurge,(1)如果Deployment的Strategy不是RollingUpdate,则maxSurge为0;(2)如果Spec.Strategy.RollingUpdate.MaxSurge为整数,则将其给maxSurge;(3)如果Spec.Strategy.RollingUpdate.MaxSurge为百分比,则乘以Spec.Replicas,并将得出的数据向上取整给maxSurge。allowedSize = Spec.Replicas + maxSurge
      allowedSize := int32(0)
      if *(deployment.Spec.Replicas) > 0 {
         allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
      }

      //deploymentReplicasToAdd等于上两步计算得出的allowedSize - allRSsReplicas,检查deploymentReplicasToAdd的值,(1)大于0则将scalingOperation设为”up”,并将allRSs排序,将最新的排在0,最旧的排在最后;(2)小于0则将scalingOperation设为”down”,并将allRSs排序,将最旧的排在0,最新的排在最后;
      // Number of additional replicas that can be either added or removed from the total
      // replicas count. These replicas should be distributed proportionally to the active
      // replica sets.
      deploymentReplicasToAdd := allowedSize - allRSsReplicas

      // The additional replicas should be distributed proportionally amongst the active
      // replica sets from the larger to the smaller in size replica set. Scaling direction
      // drives what happens in case we are trying to scale replica sets of the same size.
      // In such a case when scaling up, we should scale up newer replica sets first, and
      // when scaling down, we should scale down older replica sets first.
      var scalingOperation string
      switch {
      case deploymentReplicasToAdd > 0:
         sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
         scalingOperation = "up"

      case deploymentReplicasToAdd < 0:
         sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
         scalingOperation = "down"
      }

      //初始化deploymentReplicasAdded为0,顺序计算allRSs中所有的rs的nameToSize,计算方法如下:
        一、如果deploymentReplicasToAdd不等于0,则
            1、计算proportion,计算方法如下:
               (1)计算rsFraction,方法如下:
                  a. deploymentReplicas取值Deployment.Spec.Replicas + maxsurge(maxsurge的计算方法参考之前列出的计算方法)
                  b. annotatedReplicas取值rs的Annotation "deployment.kubernetes.io/max-replicas",如果取不到,则使用Deployment的Status.Replicas
                  c. newRSsize = rs.Spec.Replicas * deploymentReplicas / annotatedReplicas
                  d. rsFraction等于四舍五入后的newRSsize - rs.Spec.Replicas
               (2)allowed := deploymentReplicasToAdd - deploymentReplicasAdded
               (3)如果deploymentReplicasToAdd大于0,proportion取rsFraction和allowed其中更小的值,如果deploymentReplicasToAdd不大于0,proportion取
              rsFraction和allowed其中更大的值
            2、nameToSize[rs.Name]取值rs.Spec.Replicas + proportion
            3、deploymentReplicasAdded += proportion
        二、如果deploymentReplicasToAdd等于0,则nameToSize[rs.Name]取值rs.Spec.Replicas
      // Iterate over all active replica sets and estimate proportions for each of them.
      // The absolute value of deploymentReplicasAdded should never exceed the absolute
      // value of deploymentReplicasToAdd.
      deploymentReplicasAdded := int32(0)
      nameToSize := make(map[string]int32)
      for i := range allRSs {
         rs := allRSs[i]

         // Estimate proportions if we have replicas to add, otherwise simply populate
         // nameToSize with the current sizes for each replica set.
         if deploymentReplicasToAdd != 0 {
            proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)

            nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
            deploymentReplicasAdded += proportion
         } else {
            nameToSize[rs.Name] = *(rs.Spec.Replicas)
         }
      }

      //如果deploymentReplicasToAdd不为0,则重新计算allRSs中第一个rs的nameToSize,将第一个rs的nameToSize加上deploymentReplicasToAdd - deploymentReplicasAdded,如果计算后的nameToSize小于0则取值0,再以计算之后的nameToSize对第一个rs做scale,顺序对allRSs中所有的rs调用dc.scaleReplicaSet将rs的Spec.Replicas scale到nameToSize
      // Update all replica sets
      for i := range allRSs {
         rs := allRSs[i]

         // Add/remove any leftovers to the largest replica set.
         if i == 0 && deploymentReplicasToAdd != 0 {
            leftover := deploymentReplicasToAdd - deploymentReplicasAdded
            nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
            if nameToSize[rs.Name] < 0 {
               nameToSize[rs.Name] = 0
            }
         }

         // TODO: Use transactions when we have them.
         if _, _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
            // Return as soon as we fail, the deployment is requeued
            return err
         }
      }
   }
   return nil
}

func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {
   // No need to scale
   if *(rs.Spec.Replicas) == newScale {
      return false, rs, nil
   }
   var scalingOperation string
   if *(rs.Spec.Replicas) < newScale {
      scalingOperation = "up"
   } else {
      scalingOperation = "down"
   }
   scaled, newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
   return scaled, newRS, err
}

func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {

   //rs的Spec.Replicas不等于newScale,将sizeNeedsUpdate设为true
   sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale

   //调用deploymentutil.ReplicasAnnotationsNeedUpdate,将返回值给到annotationsNeedUpdate
    (1)如果rs的Annotations为nil,返回true给annotationsNeedUpdate
    (2)如果rs中Annotation "deployment.kubernetes.io/desired-replicas"的值等于Deployment的Spec.Replicas,返回true给annotationsNeedUpdate
    (3)如果rs中Annotation "deployment.kubernetes.io/max-replicas"的值等于Deployment的Spec.Replicas + maxsurge,返回true给annotationsNeedUpdate
    (4)如果以上3个条件都不满足,则返回false给annotationsNeedUpdate
   annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))

   //如果sizeNeedsUpdate或者annotationsNeedUpdate为true,将rs的Spec.Replicas设为newScale,并调用deploymentutil.SetReplicasAnnotations,将rs中Annotation "deployment.kubernetes.io/desired-replicas"的值设为Deployment的Spec.Replicas,将rs中Annotation "deployment.kubernetes.io/max-replicas"的值设为Deployment的Spec.Replicas + maxsurge,并通过kube-apiserver update rs
   scaled := false
   var err error
   if sizeNeedsUpdate || annotationsNeedUpdate {
      rsCopy := rs.DeepCopy()
      *(rsCopy.Spec.Replicas) = newScale
      deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
      rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(rsCopy)
      if err == nil && sizeNeedsUpdate {
         scaled = true
         dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
      }
   }
   return scaled, rs, err
}

k8s.io/kubernetes/pkg/controller/deployment/sync.go:426
func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
   //检查Deployment的Spec.RevisionHistoryLimit,如果为nil则直接return
   if deployment.Spec.RevisionHistoryLimit == nil {
      return nil
   }

   //检查rs,如果rs不为nil且rs的DeletionTimestamp为nil,则将rs添加到cleanableRSes中
   // Avoid deleting replica set with deletion timestamp set
   aliveFilter := func(rs *apps.ReplicaSet) bool {
      return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil
   }
   cleanableRSes := controller.FilterReplicaSets(oldRSs, aliveFilter)

   //将cleanableRSes中rs的个数 - Deployment的Spec.RevisionHistoryLimit的差别给到diff,如果diff小于0即oldrs的个数没有超过RevisionHistoryLimit,直接return
   diff := int32(len(cleanableRSes)) - *deployment.Spec.RevisionHistoryLimit
   if diff <= 0 {
      return nil
   }

   //将cleanableRSes进行排序
   sort.Sort(controller.ReplicaSetsByCreationTimestamp(cleanableRSes))
   glog.V(4).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name)

   //检查cleanableRSes数组中下标为0-diff的rs,如果rs的Status.Replicas不为0或者Spec.Replicas不为0或者Generation大于Status.ObservedGeneration或者DeletionTimestamp不为nil,则跳过该rs,否则通过kube-apiserver删除该rs
   for i := int32(0); i < diff; i++ {
      rs := cleanableRSes[i]
      // Avoid delete replica set with non-zero replica counts
      if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
         continue
      }
      glog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name)
      if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(rs.Name, nil); err != nil && !errors.IsNotFound(err) {
         // Return error instead of aggregating and continuing DELETEs on the theory
         // that we may be overloading the api server.
         return err
      }
   }

   return nil
}

dc.rollback:Deployment的rollback处理

dc.rollback方法中是处理Deployment rollback的代码,逻辑如下:

  • 调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则创建一个)到newRS和获取所有旧的rs到allOldRSs
  • 将newRS和allOldRSs合并到allRSs中
  • 调用getRollbackTo获取rs中Annotation "deprecated.deployment.rollback.to"的值到rollbackTo,如果获取到的rollbackTo.Revision是0,则再调用
      deploymentutil.LastRevision获取上一个Revision到rollbackTo.Revision,接着检查rollbackTo.Revision的值做不同的处理
    • 如果rollbackTo.Revision等于0
      • 调用dc.emitRollbackWarningEvent写一个Reason:"DeploymentRollbackRevisionNotFound",Message:"Unable to find last revision."的warning event
      • 调用dc.updateDeploymentAndClearRollbackTo删除Deployment中"deprecated.deployment.rollback.to"的Annotation
    • 如果rollbackTo.Revision不等于0,遍历allRSs中所有rs,检查rs的Revision(即rs中Annotation "deployment.kubernetes.io/revision"对应的值)是否等于rollbackTo.Revision,如果有rs的Revision等于rollbackTo.Revision,则调用dc.rollbackToTemplate检查rs的Spec.Template是否和Deployment的Spec.Template一样
      • 如果不一样,则调用deploymentutil.SetFromReplicaSetTemplate将Deployment的Spec.Template更新为rs的Spec.Template,并调用    deploymentutil.SetDeploymentAnnotationsTo更新Deployment除“kubectl.kubernetes.io/ast-applied-configuration”,  "deployment.kubernetes.io/revision","deployment.kubernetes.io/revision-history", "deployment.kubernetes.io/desired-replicas", "deployment.kubernetes.io/max-replicas","deprecated.deployment.rollback.to"以外的Annotations,将rs除以上之外的Annotations更新到Deployment;
      • 如果一样,则写Info日志及调用dc.emitRollbackWarningEvent写一个"DeploymentRollbackTemplateUnchanged"的warning event
    • 如果上一步中未能在allRSs的所有rs中找到rollbackTo.Revision则
      • 调用dc.emitRollbackWarningEvent写一个Reason:"DeploymentRollbackRevisionNotFound",Message:"Unable to find the revision to rollback to."的warning event
      • 调用dc.updateDeploymentAndClearRollbackTo删除Deployment中"deprecated.deployment.rollback.to"的Annotation
k8s.io/kubernetes/pkg/controller/deployment/rollback.go:32

func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
   //调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则创建一个)到newRS和获取所有旧的rs到allOldRSs
   newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
   if err != nil {
      return err
   }

   //将newRS和allOldRSs合并到allRSs中
   allRSs := append(allOldRSs, newRS)

   //调用getRollbackTo获取rs中Annotation "deprecated.deployment.rollback.to"的值到rollbackTo,如果获取到的rollbackTo.Revision是0,则再调用                                              
     deploymentutil.LastRevision获取上一个Revision到rollbackTo.Revision,接着检查rollbackTo.Revision的值做不同的处理
     (1)如果rollbackTo.Revision等于0
         a.调用dc.emitRollbackWarningEvent写一个Reason:"DeploymentRollbackRevisionNotFound",Message:"Unable to find last revision."的warning 
           event
         b.调用dc.updateDeploymentAndClearRollbackTo删除Deployment中"deprecated.deployment.rollback.to"的Annotation
     (2)如果rollbackTo.Revision不等于0,遍历allRSs中所有rs,检查rs的Revision(即rs中Annotation "deployment.kubernetes.io/revision"对应的值)是否等于
         rollbackTo.Revision,如果有rs的Revision等于rollbackTo.Revision,则调用dc.rollbackToTemplate检查rs的Spec.Template是否和Deployment的
         Spec.Template一样
         a.如果不一样,则调用deploymentutil.SetFromReplicaSetTemplate将Deployment的Spec.Template更新为rs的Spec.Template,并调用
         deploymentutil.SetDeploymentAnnotationsTo更新Deployment除“kubectl.kubernetes.io/ast-applied-configuration”,
         "deployment.kubernetes.io/revision","deployment.kubernetes.io/revision-history","deployment.kubernetes.io/desired-replicas",
         "deployment.kubernetes.io/max-replicas","deprecated.deployment.rollback.to"以外的Annotations,将rs除以上之外的Annotations给到Deployment;
         b.如果一样,则写Info日志及调用dc.emitRollbackWarningEvent写一个"DeploymentRollbackTemplateUnchanged"的warning event
     (3)如果上一步中未能在allRSs的所有rs中找到rollbackTo.Revision则
         a.调用dc.emitRollbackWarningEvent写一个Reason:"DeploymentRollbackRevisionNotFound",Message:"Unable to find the revision to rollback 
           to."的warning
         event
         b.调用dc.updateDeploymentAndClearRollbackTo删除Deployment中"deprecated.deployment.rollback.to"的Annotation
   rollbackTo := getRollbackTo(d)
   // If rollback revision is 0, rollback to the last revision
   if rollbackTo.Revision == 0 {
      if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
         // If we still can't find the last revision, gives up rollback
         dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
         // Gives up rollback
         return dc.updateDeploymentAndClearRollbackTo(d)
      }
   }
   for _, rs := range allRSs {
      v, err := deploymentutil.Revision(rs)
      if err != nil {
         glog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err)
         continue
      }
      if v == rollbackTo.Revision {
         glog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v)
         // rollback by copying podTemplate.Spec from the replica set
         // revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call
         // no-op if the spec matches current deployment's podTemplate.Spec
         performedRollback, err := dc.rollbackToTemplate(d, rs)
         if performedRollback && err == nil {
            dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, rollbackTo.Revision))
         }
         return err
      }
   }
   dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.")
   // Gives up rollback
   return dc.updateDeploymentAndClearRollbackTo(d)
}

dc.rolloutRecreate:Deployment更新策略为"Recreate"时的处理

dc.rolloutRecreate方法中是Deployment更新策略为"Recreate"时的处理代码,逻辑如下:

  • 调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则返回nil)到newRS和获取所有旧的rs到OldRSs
  • 将newRS和allOldRSs合并到allRSs中,将oldRSs中活动的rs(即Spec.ReplicaSet大于0)汇总到activeOldRSs中
  • 用dc.scaleDownOldReplicaSetsForRecreate将activeOldRSs中所有活动的rs的sepc.Replicas Scale到0,若果有activeOldRSs中任意的rs有将sepc.Replicas Scale到0,则将scaleDown设置为true
  • 如果scaleDown为true,则调用dc.syncRolloutStatus更新Deployment的status,完成后return
  • 调用oldPodsRunning函数检查是否仍然有旧的Pods在运行,检查逻辑为:(1)oldRSs中所有rs的Status.Replicas总和大于0,则返回true,如果OldRSs中有rs的Pod的状态为不是"Failed"且不是"Succeeded",则返回true,如果oldPodsRunning函数返回true,则调用dc.syncRolloutStatus更新Deployment的status,完成后return
  • newRS为nil,即newRs还未创建,则调用dc.getAllReplicaSetsAndSyncRevision重新获取新的rs(如没有新的rs则创建)到newRS和获取所有旧的到OldRSs,并重新将newRS和allOldRSs合并到allRSs中(注:这里主要的目的是创建newRS)
  • 调用dc.scaleUpNewReplicaSetForRecreate将newRS的Spec.Replicas scale成和Deployment的Spec.Replicas一样
  • 调用util.DeploymentComplete检查Deployment是否已更新完,Deployment Status中的UpdatedReplicas、Replicas、AvailableReplicas和Deployment的Spec.Replicas一样,且Status中的ObservedGeneration大于等于Deployment的Generation则说明更新完,如果Deployment已经更新完,则调用dc.cleanupDeployment方法清除所有的oldRSs
  • 调用dc.syncRolloutStatus更新Deployment的status

可以看到更新策略为"Recreate"时,Deployment是先将所有旧的rs的Spec.Replicas scale为0并等待所有pod都删除后,再新建新的rs,而rs创建以后还有一段时间启动对应的Pod,因此该更新策略在更新应用时,应用会有比较长的一段时间是不可用的,请慎用该更新策略。

func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
   //调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则返回nil)到newRS和获取所有旧的rs到OldRSs
   // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.
   newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
   if err != nil {
      return err
   }

   //将newRS和allOldRSs合并到allRSs中,将oldRSs中活动的rs(即Spec.ReplicaSet大于0)汇总到activeOldRSs中
   allRSs := append(oldRSs, newRS)
   activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)

   //调用dc.scaleDownOldReplicaSetsForRecreate将activeOldRSs中所有活动的rs的sepc.Replicas Scale到0,若果有activeOldRSs中任意的rs有将sepc.Replicas Scale到0,则将scaleDown设置为true
   // scale down old replica sets.
   scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
   if err != nil {
      return err
   }

   //如果scaleDown为true,则调用dc.syncRolloutStatus更新Deployment的status,完成后return
   if scaledDown {
      // Update DeploymentStatus.
      return dc.syncRolloutStatus(allRSs, newRS, d)
   }

   //调用oldPodsRunning函数检查是否仍然有旧的Pods在运行,检查逻辑为:(1)oldRSs中所有rs的Status.Replicas总和大于0,则返回true,如果OldRSs中有rs的Pod的状态为不是"Failed"且不是"Succeeded",则返回true,如果oldPodsRunning函数返回true,则调用dc.syncRolloutStatus更新Deployment的status,完成后return
   // Do not process a deployment when it has old pods running.
   if oldPodsRunning(newRS, oldRSs, podMap) {
      return dc.syncRolloutStatus(allRSs, newRS, d)
   }

   //newRS为nil,即newRs还未创建,则调用dc.getAllReplicaSetsAndSyncRevision重新获取新的rs(如没有新的rs则创建)到newRS和获取所有旧的rs到OldRSs,并重新将newRS和allOldRSs合并到allRSs中(注:这里主要的目的是创建newRS)
   // If we need to create a new RS, create it now.
   if newRS == nil {
      newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
      if err != nil {
         return err
      }
      allRSs = append(oldRSs, newRS)
   }

   //调用dc.scaleUpNewReplicaSetForRecreate将newRS的Spec.Replicas scale成和Deployment的Spec.Replicas一样
   // scale up new replica set.
   if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
      return err
   }

   //调用util.DeploymentComplete检查Deployment是否已更新完,Deployment Status中的UpdatedReplicas、Replicas、AvailableReplicas和Deployment的Spec.Replicas一样,且Status中的ObservedGeneration大于等于Deployment的Generation则说明更新完,如果Deployment已经更新完,则调用dc.cleanupDeployment方法清除所有的oldRSs
   if util.DeploymentComplete(d, &d.Status) {
      if err := dc.cleanupDeployment(oldRSs, d); err != nil {
         return err
      }
   }

   //调用dc.syncRolloutStatus更新Deployment的status
   // Sync deployment status.
   return dc.syncRolloutStatus(allRSs, newRS, d)
}

dc.rolloutRolling:Deployment更新策略为"RollingUpdate"时的处理

dc.rolloutRecreate为Deployment方法中是Deployment更新策略为"RollingUpdate"时的处理代码,逻辑如下:

  • 调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则创建)到newRS和获取所有rs到OldRSs,并重新将newRS和allOldRSs合并到allRSs中
  • 调用dc.reconcileNewReplicaSet Scale up newRS,并返回布尔型的scaledUp,dc.reconcileNewReplicaSet逻辑如下:
    • 如果newRS的Spec.Replicas和Deployment的Spec.Replicas一样,则不做任何处理,返回scaledUp为false
    • 如果newRS的Spec.Replicas大于Deployment的Spec.Replicas,则调用dc.scaleReplicaSetAndRecordEvent将newRS的Spec.Replicas scale down到Deployment的Spec.Replicas,返回scaledUp为true
    • 调用deploymentutil.NewRSNewReplicas计算newRS预期的Replicas给到newReplicasCount,计算方法如下:
      • 计算maxSurge:(1)如果Deployment的Strategy不是RollingUpdate,则maxSurge为0;(2)如果Spec.Strategy.RollingUpdate.MaxSurge为整数,则将其给maxSurge;(3)如果Spec.Strategy.RollingUpdate.MaxSurge为百分比,则乘以Spec.Replicas,并将得出的数据向上取整给maxSurge
      • 将所有rs的Spec.Replicas汇总到currentPodCount
      • 将Deployment的Spec.Replicas加maxSurge给到maxTotalPods
      • 如果currentPodCount大于maxTotalPods,则返回newRS的Spec.Replicas给newReplicasCount,即newRS不会scale了
      • scaleUpCount取maxTotalPods - currentPodCount,Deployment.Spec.Replicas - newRS.Spec.Replicas中更小的值(以免scale之后newRS的Spec.Replicas超过了Deployment)
      • 返回newRS.Spec.Replicas + scaleUpCount给newReplicasCount
    • 调用dc.scaleReplicaSetAndRecordEvent scale newRS,如果newReplicasCount和newRS.Spec.Replicas不一样,返回scaledUp为true
  • 如果scaledUp为true,则调用dc.syncRolloutStatus更新Deployment的status,完成后return
  • 调用dc.reconcileOldReplicaSets scale down oldRSs中所有活动的rs,并返回布尔型的scaledDown,dc.reconcileOldReplicaSets逻辑如下:
    • 汇总所有oldRSs中rs的Spec.Replicas到oldPodsCount,如果oldPodsCount为0,则直接返回scaledDown为false,不做其他处理
    • 汇总所有allRSs中rs的Spec.Replicas到allPodsCount
    • 计算maxUnavailable:(1)如果Deployment的Strategy不是RollingUpdate或者Spec.Replicas为0,则maxUnavailable为0; (2)如果没满足第一点的条件,则传入Deployment的Strategy.RollingUpdate.MaxUnavailable和Spec.Replicas调用ResolveFenceposts计算,如果Strategy.RollingUpdate.MaxUnavailable是数量则取该值给maxUnavailable;如果Strategy.RollingUpdate.MaxUnavailable为百分比,则将Strategy.RollingUpdate.MaxUnavailable与Spec.Replicas相乘得出数据以后向下取整给到maxUnavailable
    • minAvailable为Deployment的Spec.Replicas - maxUnavailable
    • 计算newRS没有ready的pod newRSUnavailablePodCount,等于newRS的Spec.Replicas - newRS的Status.AvailableReplicas
    • 计算maxScaleDown,等于allPodsCount - minAvailable - newRSUnavailablePodCount
    • 调用dc.cleanupUnhealthyReplicas scale down不健康的oldRSs(即Spec.Replicas不等于Status.AvailableReplicas的rs),最多scale down maxScaleDown个Replicas,dc.cleanupUnhealthyReplicas逻辑如下:
      • 将oldRSs按CreationTimestamp排序
      • 遍历OldRSs中所有的rs,执行以下逻辑
        • 如果totalScaledDown大于maxScaleDown,退出遍历
        • 如果rs的Spec.Replicas为0,跳过该rs
        • 如果rs的Spec.Replicas等于Status.AvailableReplicas,跳过该rs
        • 计算scaleDownCount,取maxScaleDown - totalScaledDown与rs的Spec.Replicas - Status.AvailableReplicas这两个结果的更小值
        • 计算newReplicasCount,取值rs的Spec.Replicas - scaleDownCount,如果newReplicasCount大于rs的Spec.Replicas,则报错并return
        • 调用dc.scaleReplicaSetAndRecordEvent将rs的Spec.Replicas更新为newReplicasCount
        • totalScaledDown累加scaleDownCount
      • 返回更新后的oldRSs以及返回totalScaledDown给到CleanupCoun
    • 将上一步更新后的oldRSs和newRS合并到allRSs,接着调用dc.scaleDownOldReplicaSetsForRollingUpdate更新所有的allRSs,dc.scaleDownOldReplicaSetsForRollingUpdate逻辑如下:
      • 计算maxUnavailable,计算逻辑参考step 3
      • 计算minAvailable,计算逻辑参考step 4
      • 计算availablePodCount,即汇总allRSs中所有rs的Status.AvailableReplicas到availablePodCount
      • 如果availablePodCount小于等于minAvailable,则将scaledDownCount设为0,并返回
      • 将oldRSs按CreationTimestamp排序,计算totalScaleDownCount为availablePodCount - minAvailable
      • 遍历OldRSs中所有的rs,执行以下逻辑
        • 如果totalScaledDown大于等于totalScaleDownCount,退出遍历
        • 如果rs的Spec.Replicas为0,跳过该rs
        • 计算scaleDownCount,取rs的Spec.Replicas与totalScaleDownCount - totalScaledDown这两个值中更小的值
        • 计算newReplicasCount,取值rs的Spec.Replicas - scaleDownCount,如果newReplicasCount大于rs的Spec.Replicas,则报错并return
        • 调用dc.scaleReplicaSetAndRecordEvent将rs的Spec.Replicas更新为newReplicasCount
        • totalScaledDown累加scaleDownCount
        • 返回totalScaledDown给到scaleDownCount
    • 将totalScaledDown设为前两步返回值CleanupCount + scaleDownCount的和,如果totalScaledDown > 0则将scaledDown设为True,否则将scaledDown设为false
k8s.io/kubernetes/pkg/controller/deployment/rolling.go:31

func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {

   //调用dc.getAllReplicaSetsAndSyncRevision获取新的rs(如没有新的rs则创建)到newRS和获取所有rs到OldRSs,并重新将newRS和allOldRSs合并到allRSs中
   newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
   if err != nil {
      return err
   }
   allRSs := append(oldRSs, newRS)

   //调用dc.reconcileNewReplicaSet Scale up newRS,并返回布尔型的scaledUp,dc.reconcileNewReplicaSet逻辑如下:
     1、如果newRS的Spec.Replicas和Deployment的Spec.Replicas一样,则不做任何处理,返回scaledUp为false
     2、如果newRS的Spec.Replicas大于Deployment的Spec.Replicas,则调用dc.scaleReplicaSetAndRecordEvent将newRS的Spec.Replicas scale down到Deployment的Spec.Replicas,返回scaledUp为true
     3、调用deploymentutil.NewRSNewReplicas计算newRS预期的Replicas给到newReplicasCount,计算方法如下:
        a. 计算maxSurge:(1)如果Deployment的Strategy不是RollingUpdate,则maxSurge为0;(2)如果Spec.Strategy.RollingUpdate.MaxSurge为整数,则将其给maxSurge;(3)如果Spec.Strategy.RollingUpdate.MaxSurge为百分比,则乘以Spec.Replicas,并将得出的数据向上取整给
           maxSurge
        b. 将所有rs的Spec.Replicas汇总到currentPodCount
        c. 将Deployment的Spec.Replicas加maxSurge给到maxTotalPods
        d. 如果currentPodCount大于maxTotalPods,则返回newRS的Spec.Replicas给newReplicasCount,即newRS不会scale了
        e. scaleUpCount取maxTotalPods - currentPodCount,Deployment.Spec.Replicas - newRS.Spec.Replicas中更小的值(以免scale之后newRS的Spec.Replicas超过了Deployment)
        f. 返回newRS.Spec.Replicas + scaleUpCount给newReplicasCount
     4、调用dc.scaleReplicaSetAndRecordEvent scale newRS,如果newReplicasCount和newRS.Spec.Replicas不一样,返回scaledUp为true
   // Scale up, if we can.
   scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
   if err != nil {
      return err
   }

   //如果scaledUp为true,则调用dc.syncRolloutStatus更新Deployment的status,完成后return
   if scaledUp {
      // Update DeploymentStatus
      return dc.syncRolloutStatus(allRSs, newRS, d)
   }

   //调用dc.reconcileOldReplicaSets scale down oldRSs中所有活动的rs,并返回布尔型的scaledDown,dc.reconcileOldReplicaSets逻辑如下:
     1、汇总所有oldRSs中rs的Spec.Replicas到oldPodsCount,如果oldPodsCount为0,则直接返回scaledDown为false,不做其他处理
     2、汇总所有allRSs中rs的Spec.Replicas到allPodsCount
     3、计算maxUnavailable:(1)如果Deployment的Strategy不是RollingUpdate或者Spec.Replicas为0,则maxUnavailable为0; (2)如果没满足第一点的条件,则传入Deployment的Strategy.RollingUpdate.MaxUnavailable和Spec.Replicas调用ResolveFenceposts计算,如果
        Strategy.RollingUpdate.MaxUnavailable是数量则取该值给maxUnavailable;如果Strategy.RollingUpdate.MaxUnavailable为百分比,则将Strategy.RollingUpdate.MaxUnavailable与Spec.Replicas相乘得出数据以后向下取整给到maxUnavailable
     4、minAvailable为Deployment的Spec.Replicas - maxUnavailable
     5、计算newRS没有ready的pod newRSUnavailablePodCount,等于newRS的Spec.Replicas - newRS的Status.AvailableReplicas
     6、计算maxScaleDown,等于allPodsCount - minAvailable - newRSUnavailablePodCount
     7、调用dc.cleanupUnhealthyReplicas scale down不健康的oldRSs(即Spec.Replicas不等于Status.AvailableReplicas的rs),最多scale down maxScaleDown个Replicas,dc.cleanupUnhealthyReplicas逻辑如下:
        (1). 将oldRSs按CreationTimestamp排序
        (2). 遍历OldRSs中所有的rs,执行以下逻辑
             a. 如果totalScaledDown大于maxScaleDown,退出遍历
             b. 如果rs的Spec.Replicas为0,跳过该rs
             c. 如果rs的Spec.Replicas等于Status.AvailableReplicas,跳过该rs
             d. 计算scaleDownCount,取maxScaleDown - totalScaledDown与rs的Spec.Replicas - Status.AvailableReplicas这两个结果的更小值
             e. 计算newReplicasCount,取值rs的Spec.Replicas - scaleDownCount,如果newReplicasCount大于rs的Spec.Replicas,则报错并return
             f. 调用dc.scaleReplicaSetAndRecordEvent将rs的Spec.Replicas更新为newReplicasCount
             g. totalScaledDown累加scaleDownCount
        (3). 返回更新后的oldRSs以及返回totalScaledDown给到CleanupCount
     8、将上一步更新后的oldRSs和newRS合并到allRSs,接着调用dc.scaleDownOldReplicaSetsForRollingUpdate更新所有的allRSs,dc.scaleDownOldReplicaSetsForRollingUpdate逻辑如下:
        (1). 计算maxUnavailable,计算逻辑参考step 3
        (2). 计算minAvailable,计算逻辑参考step 4
        (3). 计算availablePodCount,即汇总allRSs中所有rs的Status.AvailableReplicas到availablePodCount
        (4). 如果availablePodCount小于等于minAvailable,则将scaledDownCount设为0,并返回
        (5). 将oldRSs按CreationTimestamp排序,计算totalScaleDownCount为availablePodCount - minAvailable
        (6). 遍历OldRSs中所有的rs,执行以下逻辑
             a. 如果totalScaledDown大于等于totalScaleDownCount,退出遍历
             b. 如果rs的Spec.Replicas为0,跳过该rs
             c. 计算scaleDownCount,取rs的Spec.Replicas与totalScaleDownCount - totalScaledDown这两个值中更小的值
             d. 计算newReplicasCount,取值rs的Spec.Replicas - scaleDownCount,如果newReplicasCount大于rs的Spec.Replicas,则报错并return
             e. 调用dc.scaleReplicaSetAndRecordEvent将rs的Spec.Replicas更新为newReplicasCount
             f. totalScaledDown累加scaleDownCount 
        (7). 返回totalScaledDown给到scaleDownCount
     9、将totalScaledDown设为前两步返回值CleanupCount + scaleDownCount的和,如果totalScaledDown > 0则将scaledDown设为True,否则将scaledDown设为false
   // Scale down, if we can.
   scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
   if err != nil {
      return err
   }

   //如果scaledDown为true,则调用dc.syncRolloutStatus更新Deployment的status,完成后return
   if scaledDown {
      // Update DeploymentStatus
      return dc.syncRolloutStatus(allRSs, newRS, d)
   }

   //调用util.DeploymentComplete检查Deployment是否已更新完,Deployment Status中的UpdatedReplicas、Replicas、AvailableReplicas和Deployment的Spec.Replicas一样,且Status中的ObservedGeneration大于等于Deployment的Generation则说明更新完,如果Deployment已经更新完,则调用dc.cleanupDeployment方法清除所有的oldRSs
   if deploymentutil.DeploymentComplete(d, &d.Status) {
      if err := dc.cleanupDeployment(oldRSs, d); err != nil {
         return err
      }
   }

   //调用dc.syncRolloutStatus更新Deployment的status
   // Sync deployment status
   return dc.syncRolloutStatus(allRSs, newRS, d)
}

dc.syncRolloutStatus 更新Deployment的Rollout status

dc.rolloutRecreate和dc.rolloutRolling多次调用dc.syncRolloutStatus更新Deployment,那么dc.syncRolloutStatus的具体逻辑是什么样的:

  • 调用calculateStatus计算Deployment当前的Status到newStatus,具体逻辑可参考dc.syncStatusOnly方法 & dc.sync方法段中
  • 如果Deployment的Spec.ProgressDeadlineSeconds为nil或者等于2147483647,则删除Deployment Status中"Progressing"的Condition
  • 将DeploymentStatus中"Progressing"的Condition存入currentCond。如果newStatus的Replicas等于UpdatedReplicas、currentCond不为nil且currentCond的Reason为"NewReplicaSetAvailable",则将isCompleteDeployment设为true,否则将isCompleteDeployment设为false
  • 如果Deployment的Spec.ProgressDeadlineSeconds不为nil且不等于2147483647,并且isCompleteDeployment为false,则做以下处理
    • 调用util.DeploymentComplete检查Deployment是否已更新完,newStatus中的UpdatedReplicas、Replicas、AvailableReplicas和Deployment的Spec.Replicas一样,且newStatus中的ObservedGeneration大于等于Deployment的Generation则说明更新完,若Deployment已更新完,增加Type为"Progressing",Status为"True",Reason为"NewReplicaSetAvailable"的Condition到newStatus
    • 若上一步检查到Deployment未更新完,则调用util.DeploymentProgressing是否正在更新,newStatus中的UpdatedReplicas、Replicas、AvailableReplicas其中之一大于Deployment的UpdatedReplicas、Replicas、AvailableReplicas,或者newStatus的Replicas - UpdatedReplicas小于Deployment的Replicas - UpdatedReplicas,则说明Deployment正在更新中。若Deployment正在更新中,如果currentCond的Status也是"True",则更新newStatus "Progressing" Type的Condition的Reason为"ReplicaSetUpdated"的Condition;如果currentCond为nil或者Status不为true,则增加或更新Type为"Progressing",Status为"True",Reason为"ReplicaSetUpdated"的Condition到newStatus
    • 若util.DeploymentComplete和util.DeploymentProgressing都未返回true,则调用util.DeploymentTimedOut检查是否timeout,如果timeout,则增加Type为"Progressing",Status为"False",Reason为"ProgressDeadlineExceeded"的Condition到newStatus,检查Timeout的逻辑如下:
      • 如果Deployment的Spec.ProgressDeadlineSeconds为nil或者等于2147483647,Timeout为false
      • newStatus中没有Type为"Progressing"的Condition,Timeout为false
      • newStatus中Type为"Progressing"的Condition的Reason为"NewReplicaSetAvailable",Timeout为false
      • newStatus中Type为"Progressing"的Condition的Reason为"ProgressDeadlineExceeded",Timeout为True
      • 检查newStatus中Type为"Progressing"的Condition的LastUpdateTime是否在Spec.ProgressDeadlineSeconds秒之前,如果是,则Timeout为true,否则Timeout为false
  • 调用dc.getReplicaFailures获取所有allRSs中所有rs的Type 为"ReplicaFailure"的Condition,如果有rs有该Condition,则取有Type为"ReplicaFailure"的Condition的rs中相对更新的rs的该Condition到newStatus;如果所有rs都没有该Condition,则删除newStatus中的该Condition
  • 如果Deployment的Status和newStatus一样,则调用dc.requeueStuckDeployment检查是否要重新将Deployment加到DeploymentController的queue中,如果不一样,则通过kube-apiserver更新Deployment的status
k8s.io/kubernetes/pkg/controller/deployment/progress.go:35

func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
   //调用calculateStatus计算Deployment当前的Status到newStatus
   newStatus := calculateStatus(allRSs, newRS, d)

   //如果Deployment的Spec.ProgressDeadlineSeconds为nil或者等于2147483647,则删除Deployment Status中"Progressing"的Condition
   // If there is no progressDeadlineSeconds set, remove any Progressing condition.
   if !util.HasProgressDeadline(d) {
      util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
   }

   //将DeploymentStatus中"Progressing"的Condition存入currentCond。如果newStatus的Replicas等于UpdatedReplicas、currentCond不为nil且currentCond的Reason为"NewReplicaSetAvailable",则将isCompleteDeployment设为true,否则将isCompleteDeployment设为false
   // If there is only one replica set that is active then that means we are not running
   // a new rollout and this is a resync where we don't need to estimate any progress.
   // In such a case, we should simply not estimate any progress for this deployment.
   currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
   isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason

   //如果Deployment的Spec.ProgressDeadlineSeconds不为nil且不等于2147483647,并且isCompleteDeployment为false,则做以下处理
     1、调用util.DeploymentComplete检查Deployment是否已更新完,newStatus中的UpdatedReplicas、Replicas、AvailableReplicas和Deployment的Spec.Replicas一样,且newStatus中的ObservedGeneration大于等于Deployment的Generation则说明更新完,
        若Deployment已更新完,增加Type为"Progressing",Status为"True",Reason为"NewReplicaSetAvailable"的Condition到newStatus
     2、若上一步检查到Deployment未更新完,则调用util.DeploymentProgressing是否正在更新,newStatus中的UpdatedReplicas、Replicas、AvailableReplicas其中之一大于Deployment的UpdatedReplicas、Replicas、AvailableReplicas,或者newStatus的Replicas - 
        UpdatedReplicas小于Deployment的Replicas - UpdatedReplicas,则说明Deployment正在更新中。若Deployment正在更新中,如果currentCond的Status也是"True",则更新newStatus "Progressing" Type的Condition的Reason为"ReplicaSetUpdated"的Condition;如果
        currentCond为nil或者Status不为true,则增加或更新Type为"Progressing",Status为"True",Reason为"ReplicaSetUpdated"的Condition到newStatus
     3、若util.DeploymentComplete和util.DeploymentProgressing都未返回true,则调用util.DeploymentTimedOut检查是否timeout,如果timeout,则增加Type为"Progressing",Status为"False",Reason为"ProgressDeadlineExceeded"的Condition到newStatus,
        检查Timeout的逻辑如下:
        a. 如果Deployment的Spec.ProgressDeadlineSeconds为nil或者等于2147483647,Timeout为false
        b. newStatus中没有Type为"Progressing"的Condition,Timeout为false
        c. newStatus中Type为"Progressing"的Condition的Reason为"NewReplicaSetAvailable",Timeout为false
        d. newStatus中Type为"Progressing"的Condition的Reason为"ProgressDeadlineExceeded",Timeout为True
        e. 检查newStatus中Type为"Progressing"的Condition的LastUpdateTime是否在Spec.ProgressDeadlineSeconds秒之前,如果是,则Timeout为true,否则Timeout为false
   // Check for progress only if there is a progress deadline set and the latest rollout
   // hasn't completed yet.
   if util.HasProgressDeadline(d) && !isCompleteDeployment {
      switch {
      case util.DeploymentComplete(d, &newStatus):
         // Update the deployment conditions with a message for the new replica set that
         // was successfully deployed. If the condition already exists, we ignore this update.
         msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name)
         if newRS != nil {
            msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
         }
         condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
         util.SetDeploymentCondition(&newStatus, *condition)

      case util.DeploymentProgressing(d, &newStatus):
         // If there is any progress made, continue by not checking if the deployment failed. This
         // behavior emulates the rolling updater progressDeadline check.
         msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
         if newRS != nil {
            msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
         }
         condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
         // Update the current Progressing condition or add a new one if it doesn't exist.
         // If a Progressing condition with status=true already exists, we should update
         // everything but lastTransitionTime. SetDeploymentCondition already does that but
         // it also is not updating conditions when the reason of the new condition is the
         // same as the old. The Progressing condition is a special case because we want to
         // update with the same reason and change just lastUpdateTime iff we notice any
         // progress. That's why we handle it here.
         if currentCond != nil {
            if currentCond.Status == v1.ConditionTrue {
               condition.LastTransitionTime = currentCond.LastTransitionTime
            }
            util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
         }
         util.SetDeploymentCondition(&newStatus, *condition)

      case util.DeploymentTimedOut(d, &newStatus):
         // Update the deployment with a timeout condition. If the condition already exists,
         // we ignore this update.
         msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
         if newRS != nil {
            msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
         }
         condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
         util.SetDeploymentCondition(&newStatus, *condition)
      }
   }

   //调用dc.getReplicaFailures获取所有allRSs中所有rs的Type 为"ReplicaFailure"的Condition,如果有rs有该Condition,则取有Type为"ReplicaFailure"的Condition的rs中相对更新的rs的该Condition到newStatus;如果所有rs都没有该Condition,则删除newStatus中的该Condition
   // Move failure conditions of all replica sets in deployment conditions. For now,
   // only one failure condition is returned from getReplicaFailures.
   if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 {
      // There will be only one ReplicaFailure condition on the replica set.
      util.SetDeploymentCondition(&newStatus, replicaFailureCond[0])
   } else {
      util.RemoveDeploymentCondition(&newStatus, apps.DeploymentReplicaFailure)
   }

   //如果Deployment的Status和newStatus一样,则调用dc.requeueStuckDeployment检查是否要重新将Deployment加到DeploymentController的queue中,如果不一样,则通过kube-apiserver更新Deployment的status
   // Do not update if there is nothing new to add.
   if reflect.DeepEqual(d.Status, newStatus) {
      // Requeue the deployment if required.
      dc.requeueStuckDeployment(d, newStatus)
      return nil
   }

   newDeployment := d
   newDeployment.Status = newStatus
   _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment)
   return err
}

总结

Deployment Controller维护了一个Queue存储需要同步的Deployment,监听Deployment、ReplicaSet、Pod的Event,将Event中对应的Deployment加入Deployment Controller维护的Queue中(注:Pod Event只监听了DeleteFunc,且只有Pod对应的Deployment是“Recreate”的更新策略以及Deployment下所有的Pod都删除了才会将Deployment加入Queue),启动--concurrent-deployment-syncs个go routine循环处理Queue中的Deployment,根据Deployment的配置创建、更新或者删除对应的ReplicaSet。其中有以下几点需要注意:

  • Deployment Paused之后会停止Deployment的滚动更新和rollback,但单纯的Scale Deployment的Replicas是执行的,以及仍然会同步Deployment的Status,且如果Deployment没有等待的rollback事件则仍然会检查及清除超过RevisionHistoryLimit的RS。假如对正在进行滚动更新的Deployment做Pause处理,那么分三种情况处理,(1) 如果新的RS未达到Deployment预期的Replicas,Deployment会将所有RS的Replicas的总数停留在Deployment.Spec.Replicas + maxSurge,然后停止RS的Scale up/down;(2) 如果新的RS已是Deployment预期的Replicas,则仍然会将旧的RS Scale down到0;(3)如果新的RS超过Deployment预期的Replicas,则仍然会将新的RS Scale down到Deployment预期的Replicas;
  • Deployment更新策略为"Recreate"时,Deployment是先将所有旧的rs的Spec.Replicas scale设为0并等待所有pod都删除后,再新建新的rs,而rs创建以后还有一段时间启动对应的Pod,因此该更新策略在更新应用时,应用会有比较一段时间是不可用的,请慎用该更新策略。
  • Deployment更新策略为"RollingUpdate"时,maxSurge参数设置为百分比,是将Deployment.Spec.Replicas乘以该百分比向上取整(比如2.4则取3),而maxUnavailable参数设置为百分比,是将Deployment.Spec.Replicas乘以该百分比向下取整(比如2.8则取2)。
  • Deployment滚动更新时,Scale down优先处理旧的RS。(以RS的CreationTimestamp做新旧的判断)
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部