文档章节

Kubernetes Node Controller源码分析之Taint Controller

WaltonWang
 WaltonWang
发布于 2017/08/01 00:54
字数 2248
阅读 95
收藏 1
点赞 1
评论 0

Author: xidianwangtao@gmail.com

NewNoExecuteTaintManager

Kubernetes Node Controller源码分析之创建篇中提到:

  • PodInformer添加Event Handler时,通过调用taintManager.PodUpdated(oldPod *v1.Pod, newPod *v1.Pod)往tc.podUpdateQueue添加updateItem。
  • NodeInformer添加Event Handler时,通过调用taintManager.NodeUpdated(oldNode *v1.Node, newNode *v1.Node)往tc.nodeUpdateQueue添加updateItem。
  • 当创建NodeController时,如果runTaintManager为true(通过kube-controller-manager的--enable-taint-manager中指定,默认为true),则会通过NewNoExecuteTaintManager来实例化一个Taint Manager。
pkg/controller/node/nodecontroller.go:195

func NewNodeController(..) (*NodeController, error) {
	...
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			...
			if nc.taintManager != nil {
				nc.taintManager.PodUpdated(nil, pod)
			}
		},
		...
	}
	...
	} else {
		nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{
			AddFunc: func(originalObj interface{}) {
				...
				if nc.taintManager != nil {
					nc.taintManager.NodeUpdated(nil, node)
				}
			},
			...
		}
	}
	...
	if nc.runTaintManager {
		nc.taintManager = NewNoExecuteTaintManager(kubeClient)
	}

    ...

	return nc, nil
}

因此,创建NodeController时已经配置了监听pod和node的事件,并会将相关数据发送到tc.podUpdateQueue和tc.nodeUpdateQueue,然后由Taint Manager从中取出数据进行处理。在此之前,我们先来看看NewNoExecuteTaintManager是如何实例化一个Taint Manager的。

pkg/controller/node/taint_controller.go:152

func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
	...

	tm := &NoExecuteTaintManager{
		client:            c,
		recorder:          recorder,
		
		// taintedNodes记录每个Node对应的Taint信息。
		taintedNodes:      make(map[string][]v1.Taint),
		
		// nodeUpdateQueue中取出的updateItem会发送到nodeUpdateChannel,Tait Manager从该Channel中取出对应的node update info。
		nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),
		
		// podUpdateQueue中取出的updateItem会发送到podUpdateChannel,Tait Manager从该Channel中取出对应的pod update info。
		podUpdateChannel:  make(chan *podUpdateItem, podUpdateChannelSize),
        
        // Node Controller监听到的node update info会发送到nodeUpdateQueue。
		nodeUpdateQueue: workqueue.New(),
		
		// Node Controller监听到的pod update info会发送到podUpdateQueue。
		podUpdateQueue:  workqueue.New(),
	}
	
	// CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute deletePodHandler.
	tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))

	return tm
}

相关的代码分析见里面的代码注释。需要强调的是,我们在这里给tm.taintEvictionQueue注册了函数deletePodHandler,用来通过Taint Eviction时删除pod时调用。Taint Manager Run的时候会通过tc.taintEvictionQueue.AddWork()时创建Worker来执行deletePodHandler

func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
	return func(args *WorkArgs) error {
		ns := args.NamespacedName.Namespace
		name := args.NamespacedName.Name
		glog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
		if emitEventFunc != nil {
			emitEventFunc(args.NamespacedName)
		}
		var err error
		
		// 按照失败重试5次,每次间隔10s的重试机制,调用apiserver的api删除对应的Pod。
		for i := 0; i < retries; i++ {
			err = c.Core().Pods(ns).Delete(name, &metav1.DeleteOptions{})
			if err == nil {
				break
			}
			time.Sleep(10 * time.Millisecond)
		}
		return err
	}
}

Run

Kubernetes Node Controller源码分析之执行篇中提到,在Node Controller Run的时候,如果runTaintManager为true,则会调用nc.taintManager.Run启动Taint Manager loop。

pkg/controller/node/nodecontroller.go:550

func (nc *NodeController) Run() {
	go func() {
		...

		if nc.runTaintManager {
			go nc.taintManager.Run(wait.NeverStop)
		}

		...
	}()
}

接下来,我们来看Taint Manager的Run方法。Node Controller启动的Taint Manager实例其实就是NoExecuteTaintManager,其对应的Run方法代码如下。

pkg/controller/node/taint_controller.go:179

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
	glog.V(0).Infof("Starting NoExecuteTaintManager")
	
	// Functions that are responsible for taking work items out of the workqueues and putting them into channels.
	// 从tc.nodeUpdateQueue中获取updateItem,并发送到tc.nodeUpdateChannel。
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.nodeUpdateQueue.Get()
			if shutdown {
				break
			}
			nodeUpdate := item.(*nodeUpdateItem)
			select {
			case <-stopCh:
				break
			case tc.nodeUpdateChannel <- nodeUpdate:
			}
		}
	}(stopCh)

    // 从tc.podUpdateQueue中获取updateItem,并发送到tc.podUpdateChannel。
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.podUpdateQueue.Get()
			if shutdown {
				break
			}
			podUpdate := item.(*podUpdateItem)
			select {
			case <-stopCh:
				break
			case tc.podUpdateChannel <- podUpdate:
			}
		}
	}(stopCh)

	// When processing events we want to prioritize Node updates over Pod updates,
	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
	// start evicting Pods from tainted Nodes.
	for {
		select {
		case <-stopCh:
			break
			
		// 从tc.nodeUpdateChannel获取nodeUpdate数据,然后invoke tc.handleNodeUpdate进行处理。
		case nodeUpdate := <-tc.nodeUpdateChannel:
			tc.handleNodeUpdate(nodeUpdate)
			
		// 从tc.podUpdateChannel获取podUpdate数据,在invoke tc.handlePodUpdate进行处理之前,先确保tc.nodeUpdateQueue中的数据已经被处理完。
		case podUpdate := <-tc.podUpdateChannel:
		
		// If we found a Pod update we need to empty Node queue first.
		priority:
			for {
				select {
				case nodeUpdate := <-tc.nodeUpdateChannel:
					tc.handleNodeUpdate(nodeUpdate)
				default:
					break priority
				}
			}
			
			// After Node queue is emptied we process podUpdate.
			tc.handlePodUpdate(podUpdate)
		}
	}
}

可见, Run方法中分别从对应的queue中取出数据,然后调用tc.handleNodeUpdatetc.handlePodUpdate进行处理。

// pkg/controller/node/taint_controller.go:365

func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {
	// Delete
	// 如果nodeUpdate.newNode == nil,则表明该Node被删除了,那么将该Node的Taints信息从tc.taintedNodes缓存中删除。
	if nodeUpdate.newNode == nil {
		node := nodeUpdate.oldNode
		glog.V(4).Infof("Noticed node deletion: %#v", node.Name)
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		delete(tc.taintedNodes, node.Name)
		return
	}
	
	// Create or Update
	// 如果是Node Create或者Node Update Event,则更新tc.taintedNodes缓存中记录的该Node的Taints信息。
	glog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
	node := nodeUpdate.newNode
	taints := nodeUpdate.newTaints
	func() {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		glog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
		if len(taints) == 0 {
			delete(tc.taintedNodes, node.Name)
		} else {
			tc.taintedNodes[node.Name] = taints
		}
	}()
	
	// 然后,获取该Node上所有pods list。
	pods, err := getPodsAssignedToNode(tc.client, node.Name)
	if err != nil {
		glog.Errorf(err.Error())
		return
	}
	if len(pods) == 0 {
		return
	}
	
	
	// Short circuit, to make this controller a bit faster.
	// 如果该Node上的Taints被删除了,则取消所有该node上的pod evictions。
	if len(taints) == 0 {
		glog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
		for i := range pods {
			tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
		}
		return
	}

    // 否则,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。
	now := time.Now()
	for i := range pods {
		pod := &pods[i]
		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
		tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
	}
}

handleNodeUpdate的逻辑为:

  • 如果nodeUpdate.newNode == nil,则表明该Node被删除了,那么将该Node的Taints信息从tc.taintedNodes缓存中删除。
  • 如果是Node Create或者Node Update Event,则更新tc.taintedNodes缓存中记录的该Node的Taints信息。
    • 获取该Node上所有pods list。
    • 如果该Node上的Taints被删除了,则取消所有该node上的pod evictions。
    • 否则,遍历pods list中的每个pod,分别调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。
// pkg/controller/node/taint_controller.go:334

func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
	// Delete
	// 如果podUpdate.newPod == nil,则表明该Pod被删除了,那么取消该Pod Evictions。
	if podUpdate.newPod == nil {
		pod := podUpdate.oldPod
		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
		glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
		tc.cancelWorkWithEvent(podNamespacedName)
		return
	}
	
	// Create or Update
	// 如果是Pod Create或者Pod Update Event,则取出该pod的node上的Taints info。
	pod := podUpdate.newPod
	podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
	glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
	nodeName := pod.Spec.NodeName
	if nodeName == "" {
		return
	}
	taints, ok := func() ([]v1.Taint, bool) {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		taints, ok := tc.taintedNodes[nodeName]
		return taints, ok
	}()
	// It's possible that Node was deleted, or Taints were removed before, which triggered
	// eviction cancelling if it was needed.
	if !ok {
		return
	}
	
	// 然后,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。
	tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now())
}

handlePodUpdate的逻辑为:

  • 如果podUpdate.newPod == nil,则表明该Pod被删除了,那么取消该Pod Evictions。
  • 如果是Pod Create或者Pod Update Event,则取出该pod的node上的Taints info。
    • 如果node上的Taints info信息为空,表明Taints info被删除了或者Node被删除了,那么就不需要处理该node上的pod eviction了,流程结束。
    • 否则,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。

因此,不管是handlePodUpdate还是handleNodeUpdate,最终都是通过processPodOnNode来处理Pod Eviction的。

pkg/controller/node/taint_controller.go:295

func (tc *NoExecuteTaintManager) processPodOnNode(
	podNamespacedName types.NamespacedName,
	nodeName string,
	tolerations []v1.Toleration,
	taints []v1.Taint,
	now time.Time,
) {

    // 如果该node的taints info为空,则取消Taint Eviction Pods。
	if len(taints) == 0 {
		tc.cancelWorkWithEvent(podNamespacedName)
	}
	
	// 对比node的taints info和pod tolerations info,判断出node的taints是否都能被pod所能容忍。
	allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations)
	
	// 如果不是全部都能容忍,那么调用立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。
	if !allTolerated {
		glog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
		// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
		tc.cancelWorkWithEvent(podNamespacedName)
		tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
		return
	}
	
	// 否则,取pod的所有tolerations的TolerationSeconds的最小值作为minTolerationTime。如果某个Toleration没有设置TolerationSeconds,则表示0,如果设置的值为负数,则用0替代。
	minTolerationTime := getMinTolerationTime(usedTolerations)
	// getMinTolerationTime returns negative value to denote infinite toleration.
	if minTolerationTime < 0 {
		glog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
		return
	}

	startTime := now
	triggerTime := startTime.Add(minTolerationTime)
	
	// 从tc.taintEvictionQueue中获取Worker-scheduledEviction
	scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
	
	// 如果获取到不为空的scheduledEviction,则判断worker创建时间加上minTolerationTime是否达到触发时间要求,如果没达到,则不进行Taint Pod Eviction,流程结束。
	if scheduledEviction != nil {
		startTime = scheduledEviction.CreatedAt
		if startTime.Add(minTolerationTime).Before(triggerTime) {
			return
		} else {
			tc.cancelWorkWithEvent(podNamespacedName)
		}
	}
	
	// 如果达到触发时间要求,则取消worker,并立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。
	tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

processPodOnNode的逻辑为:

  • 如果该node的taints info为空,则取消Taint Eviction Pods。
  • 对比node的taints info和pod tolerations info,判断出node的taints是否都能被pod所能容忍。
  • 如果不是全部都能容忍,那么调用立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。
  • 否则,取pod的所有tolerations的TolerationSeconds的最小值作为minTolerationTime。如果某个Toleration没有设置TolerationSeconds,则表示0。
    • 如果minTolerationTime小于0,则永远容忍,流程结束。
    • 从tc.taintEvictionQueue中获取Worker-scheduledEviction。
      • 如果获取到不为空的scheduledEviction,则判断worker创建时间加上minTolerationTime是否达到触发时间要求,如果没达到,则不进行Taint Pod Eviction,流程结束。
      • 如果达到触发时间要求,则取消worker,并立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。

© 著作权归作者所有

共有 人打赏支持
WaltonWang
粉丝 164
博文 92
码字总数 187622
作品 0
深圳
程序员
深入分析Kubernetes Critical Pod(四)

摘要:本文分析了DeamonSetController及PriorityClass Validate时,对CriticalPod的所做的特殊处理。 Daemonset Controller对CriticalPod的特殊处理 深入分析Kubernetes Critical Pod系列:深...

WaltonWang
07/12
0
0
解析Kubernetes 1.8中的基于Pod优先级的抢占式调度

Author: xidianwangtao@gmail.com Kubernetes 1.8中对scheduler的更新 【Alpha】支持定义PriorityClass,并指定给Pod来定义Pod Priority; 【Alpha】支持基于Pod Priority的抢占式调度; 【A...

WaltonWang
2017/11/02
0
2
kubeadm安装1.9版本

kubernetes 1.9.0 kubeadm方式安装 1、安装rpm包 2、修改内核参数 修改 /etc/sysctl.conf,添加以下内容 修改后,及时生效 3、修改kubelet配置文件 kubelet和docker 的cgroup driver 有2种方...

战狐
01/04
0
0
Kubernetes 1.7.2 版本发布,容器集群管理系统

Kubernetes 1.7.2 版本发布了,该版本从1.7.1版本以来有15处修改,其中修复一个Pod BUG#48786, 下载k8s 1.7.2版本,查看 1.7 版本相关介绍。 以下是 1.7.2版本更新内容: Use port 20256 f...

O0oo0O
2017/07/25
1K
5
Kubernetes 1.10.5 发布,使用 TPU v1 API

Kubernetes 1.10.5 发布了,Kubernetes 是一个开源的,用于管理云平台中多个主机上的容器化的应用,Kubernetes 的目标是让部署容器化的应用简单并且高效(powerful),Kubernetes 提供了应用...

h4cd
06/23
0
0
使用kubeadm部署Kubernetes1.8.5

>在完成科学上网的前提下,我们准备使用kubeadm通过http代理部署Kubernetes。 环境准备(在所有节点上执行) hostname IP 作用 k8s-master 172.16.100.50 master/etcd k8s-node1 172.16.100....

Vnimos
2017/12/22
0
0
kubernetes DaemonSet资源对象

What is a DaemonSet? DaemonSet能够让所有(或者一些特定)的Node节点运行同一个pod。当节点加入到kubernetes集群中,pod会被(DaemonSet)调度到该节点上运行,当节点从kubernetes集群中被...

yzy121403725
04/13
0
0
K8S 1.9.0二进制包部署(二)

3、k8s master #############kube-apiserver cp kube-apiserver /usr/bin chmod 755 /usr/bin/kube-apiserver mkdir -p /app/kubernetes/conf mkdir -p /app/kubernetes/log vi /usr/lib/sy......

sai_2008
02/05
0
0
全视角了解基于容器的编排工具Kubernetes

Kubernetes在希腊语中是“船长”或者“水手”的意思,Kubernetes诞生于谷歌,2014年开源给了CNCF。它由Go语言开发,目标是建造一个运行大量容器生产环境的强大平台。Kubernetes库可以在GitHu...

m2l0zgssvc7r69efdtj
05/24
0
0
在阿里云容器服务上通过Helm部署Ingress Controller

在 Kubernetes Ingress 高可靠部署最佳实践 中介绍了在Kubernetes集群中如何部署一套高可靠的Ingress接入层,文中通过直接修改YAML的方式来完成,今天主要分享下如何通过Helm的方式在阿里云容...

chenqz
04/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Elasitcsearch High Level Rest Client学习笔记(三)批量api

Bulk Request BulkRequest可以在一起从请求执行批量添加、更新和删除,至少需要添加一个操作 BulkRequest request = new BulkRequest(); //创建BulkRequestrequest.add(new IndexRequest("...

木子SMZ
2分钟前
0
0
mybatis-dynamic sql

OGNL expressions if 判断是否存在值 <select id="findActiveBlogLike" resultType="Blog"> SELECT * FROM BLOG WHERE state = ‘ACTIVE’ <if test="title != null"> AND title like #{tit......

writeademo
9分钟前
0
0
社交系统ThinkSNS+ V1.8.3更新播报

     研发发布版本号:1.8.3   本次版本于2018年7月16日发布   本次发布类型:新增功能、细节调整与优化   社交系统ThinkSNSPlus更新体验:请于官网下载/安装最新版或联系QQ35159...

ThinkSNS账号
13分钟前
0
0
教育思考:选择编程是一场父母和孩子的和解[图]

教育思考:选择编程是一场父母和孩子的和解[图]: 之前有个很热的段子是这样讲的:深夜十点的时候,某小区一女子大声喊叫“什么关系?啊?!到底什么关系?你说!”最后发现原来是一位妈妈陪...

原创小博客
14分钟前
0
0
X64汇编之指令格式解析

最近由于项目组内要做特征码搜索的东西,便于去Hook一些未导出函数,你懂得...于是就闲着学习了一下x86/x64的汇编指令格式。x86的汇编指令格式请参照http://bbs.pediy.com/showthread.php?t...

simpower
16分钟前
0
0
rust 语法概要(只适合不熟悉时快速查阅使用,不适合理解其精髓。未完待续)

注意:本内容只适合快查,不适合理解精髓。精髓请研读 https://kaisery.github.io/trpl-zh-cn/foreword.html 基本数据类型 i8,i16,i32,i64,i128 u8,u16,u32,u64,u128 f32,f64 char bool:true...

捍卫机密
19分钟前
0
0
JS中严格模式和非严格模式

1,使用 严格模式的使用很简单,只有在代码首部加入字符串 "use strict"。必须在首部即首部指其前面没有任何有效js代码除注释,否则无效 2.注意事项 (1)不使用var声明变量严格模式中将不通...

AndyZhouX
20分钟前
0
0
Nginx配置error_page 404 500等自定义的错误页面

Nginx 做web server时, 开发中发现有时候的网站代码有错误,我们需要跳转到一个指定内容的错误页面: 1. 在nginx.conf配置文件上加上一句: fastcgi_intercept_errors on; 2. 服务中加上: er...

MichaelShu
22分钟前
0
0
微服务架构下的监控系统设计(一)——指标数据的采集展示

前言 微服务是一种架构风格,一个大型复杂软件应用通常由多个微服务组成。系统中的各个微服务可被独立部署,各个微服务之间是松耦合的。每个微服务仅关注于完成一件任务并很好地完成该任务。...

UCloudTech
27分钟前
0
0
极客时间《趣谈网络协议》之开篇词学习笔记

出于个人兴趣,本人在极客时间购买了网易研究院云计算技术部的首席架构师刘超老师关于计算机网络的专栏之《趣谈网络协议》,由于知识版权原因,不能直接分享刘超老师的原文,所以,我会在每次...

aibinxiao
29分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部