文档章节

从源码看kubernetes与CNI Plugin的集成

WaltonWang
 WaltonWang
发布于 2017/05/23 23:04
字数 1258
阅读 2.7K
收藏 5

码上生花,ECharts 作品展示赛正式启动!>>>

更多关于kubernetes的深入文章,请看我csdn或者oschina的博客主页。

libcni

cni项目提供了golang写的一个library,定义了集成cni插件的应用需调用的cni plugin接口,它就是libcni。其对应的Interface定义如下:

libcni/api.go:51

type CNI interface {
	AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
	DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error

	AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
	DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
}

CNI Plugin在kubelet管理的PLEG中何时被调用

kubelet Run方法方法中会最终调用syncLoopIteration函数,由它通过各种channel对pod进行sync。

pkg/kubelet/kubelet.go:1794
    
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1.  configCh:       a channel to read config events from
// 2.  handler:        the SyncHandler to dispatch pods to
// 3.  syncCh:         a channel to read periodic sync events from
// 4.  houseKeepingCh: a channel to read housekeeping events from
// 5.  plegCh:         a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// Here is an appropriate place to note that despite the syntactical
// similarity to the switch statement, the case statements in a select are
// evaluated in a pseudorandom order if there are multiple channels ready to
// read from when the select is evaluated.  In other words, case statements
// are evaluated in random order, and you can not assume that the case
// statements evaluate in order if multiple channels have events.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
//             handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
//                     containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	kl.syncLoopMonitor.Store(kl.clock.Now())
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			glog.Errorf("Update channel is closed. Exiting the sync loop.")
			return false
		}
    
		switch u.Op {
		case kubetypes.ADD:
			glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
			// DELETE is treated as a UPDATE because of graceful deletion.
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			glog.Errorf("Kubelet does not support snapshot update")
		}
    
		// Mark the source ready after receiving at least one update from the
		// source. Once all the sources are marked ready, various cleanup
		// routines will start reclaiming resources. It is important that this
		// takes place only after kubelet calls the update handler to process
		// the update to ensure the internal pod cache is up-to-date.
		kl.sourcesReady.AddSource(u.Source)
	case e := <-plegCh:
		if isSyncPodWorthy(e) {
			// PLEG event for a pod; sync it.
			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
				glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
				handler.HandlePodSyncs([]*v1.Pod{pod})
			} else {
				// If the pod no longer exists, ignore the event.
				glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
			}
		}
    
		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}
		}
	case <-syncCh:
		// Sync pods waiting for sync
		podsToSync := kl.getPodsToSync()
		if len(podsToSync) == 0 {
			break
		}
		glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
		kl.HandlePodSyncs(podsToSync)
	case update := <-kl.livenessManager.Updates():
		if update.Result == proberesults.Failure {
			// The liveness manager detected a failure; sync the pod.
    
			// We should not use the pod from livenessManager, because it is never updated after
			// initialization.
			pod, ok := kl.podManager.GetPodByUID(update.PodUID)
			if !ok {
				// If the pod no longer exists, ignore the update.
				glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
				break
			}
			glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
			handler.HandlePodSyncs([]*v1.Pod{pod})
		}
	case <-housekeepingCh:
		if !kl.sourcesReady.AllReady() {
			// If the sources aren't ready or volume manager has not yet synced the states,
			// skip housekeeping, as we may accidentally delete pods from unready sources.
			glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
		} else {
			glog.V(4).Infof("SyncLoop (housekeeping)")
			if err := handler.HandlePodCleanups(); err != nil {
				glog.Errorf("Failed cleaning pods: %v", err)
			}
		}
	}
	kl.syncLoopMonitor.Store(kl.clock.Now())
	return true
}
    

说明:

  • HandlePodSyncs, HandlePodUpdates, HandlePodAdditions最终都是invoke dispatchWork来分发pods到podWorker进行异步的pod sync。
  • HandlePodRemoves调用一下接口,将pod从cache中删除,kill pod中进程,并 stop Pod的Probe Workers,最终通过捕获Pod的PLEG Event,通过cleanUpContainersInPod来清理Pod。 pkg/kubelet/kubelet.go:1994 kl.podManager.DeletePod(pod); kl.deletePod(pod); kl.probeManager.RemovePod(pod);
  • HandlePodReconcile中,如果Pod是通过Eviction导致的Failed,则调用kl.containerDeletor.deleteContainersInPod来清除Pod内的容器。

HandlePodSyncs, HandlePodUpdates, HandlePodAdditions

  • Kubelet.dispatchWork最终会invoke podWokers.managePodLoop,podWorkers会嗲用NewMainKubelet时给PodWorkers注册的syncPodFn= (kl *Kubelet) syncPod(o syncPodOptions)。
  • Kubelet.syncPod会根据runtime类型进行区分,我们只看runtime为docker的情况,会invoke DockerManager.SyncPod。
  • DockerManager.SyncPod会dm.network.SetUpPod,然后根据network plugin类型进行区分,我们只看cni plugin,会对应invoke cniNetworkPlugin.SetUpPod进行网络设置。
  • cniNetworkPlugin.SetUpPod invoke cniNetwork.addToNetwork,由后者最终调用CNIConfig.AddNetwork,这就是libcni中对应的AddNetwork Interface。
  • CNIConfig.AddNetwork通过封装好的execPlugin由系统去调用cni plugin bin,到此就完成了pod内的网络设置。

HandlePodRemoves, HandlePodReconcile

  • 都是通过invoke podContainerDeleter.deleteContainerInPod来清理容器。
  • 对于docker,deleteContainerInPod会调用DockerManager.delteContainer。
  • 在deleteContainer时,通过invoke containerGC.netContainerCleanup进行容器的网络环境清理。
  • 然后由PluginManger.TearDownPod去调用cniNetworkPlugin.TearDownPod,再执行cniNetwork.deleteFromNetwork。
  • cniNetwork.deleteFromNetwork会调用CNIConfig.DelNetwork,这就是libcni中对应的DelNetwork Interface。
  • CNIConfig.AddNetwork通过封装好的execPlugin由系统去调用cni plugin bin,到此就完成了pod内的网络清理。

kubelet中与cni plugin调用的代码流程图

输入图片说明

更多关于kubernetes的深入文章,请看我csdn或者oschina的博客主页。

© 著作权归作者所有

WaltonWang
粉丝 240
博文 107
码字总数 230865
作品 0
深圳
程序员
私信 提问
加载中
此博客有 1 条评论,请先登录后再查看。
3 天烧脑式容器存储网络训练营 | 深圳站

容器进入云计算AI时代,大规模容器集群即将进入千万家企业,业务特征十分复杂,如:容器数量庞大,需要满足快速部署灵活迁移,保证容器间跨物理机间的通信;保证业务、资源与物理位置无关,容...

DockOne
2017/07/06
337
0
3 天烧脑式容器存储网络训练营 | 深圳站

容器进入云计算AI时代,大规模容器集群即将进入千万家企业,业务特征十分复杂,如:容器数量庞大,需要满足快速部署灵活迁移,保证容器间跨物理机间的通信;保证业务、资源与物理位置无关,容...

DockOne
2017/07/06
87
0
kubernetes calico-network

要求: 1.已存在的k8s集群版本大于v1.1,想要使用NetworkPolicy,需要大于v1.3.0 2.可以被所有节点访问的etcd集群(可以和k8s共享etcd集群,但是最好建立一个独立的集群) calico组件: 整合...

osc_62fff1oi
2018/03/20
11
0
[转帖] 解决:kubeadm init 后master一直处于notready状态

kubeadm init 后master一直处于notready状态 https://blog.csdn.net/wangmiaoyan/article/details/101216496很奇怪 我这边不是的 我这边有的文件是这个: 但是我将这个文件分发了之后 也好了....

osc_ui6dytbk
01/17
1
0
Kubernetes网络设计原则

在配置集群网络插件或者实践K8S 应用/服务部署请时刻想到这些原则: 1.每个Pod都拥有一个独立IP地址,Pod内所有容器共享一个网络命名空间 2.集群内所有Pod都在一个直接连通的扁平网络中,可通...

osc_ett4kx6e
2018/08/20
2
0

没有更多内容

加载失败,请刷新页面

加载更多

springboot单元测试配置

##SpringBoot进行单元测试 ####需要的依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><excl......

RandomObject
17分钟前
17
0
看了同事的代码,我忍不住写了这份代码指南

❝ 作者:xybaby 链接:https://www.cnblogs.com/xybaby/p/11335829.html ❞ 前言 写出整洁的代码,是每个程序员的追求。《clean code》指出,要想写出好的代码,首先得知道什么是肮脏代码、...

osc_fvp5wdwk
25分钟前
24
0
Flutter基础篇(2)-- 老司机用一篇博客带你快速熟悉Dart语法

版权声明:本文为博主原创文章,未经博主允许不得转载。https://www.jianshu.com/p/3d927a7bf020 转载请标明出处: https://www.jianshu.com/p/3d927a7bf020 本文出自 AWeiLoveAndroid的博客...

osc_dg21zk4i
26分钟前
18
0
如何在小程序制作表单活动?

比起纸质的表单,电子版表单更加受市场的青睐,尤其是随着越来越多的东西都被赋予了营销属性,不只是只有广告才能够做宣传,比如说表单也不仅仅只是一个收集信息的工具,我们对表单加以包装,...

osc_9bje7o1h
27分钟前
10
0
Intel x710万兆 SR-IOV 网卡驱动升级

目录 文章目录 目录 环境 获取最新驱动 安装 环境 CentOS7 Intel x710 获取最新驱动 官方地址:https://downloadcenter.intel.com/zh-cn/product/83967/Intel-Ethernet-Converged-Network-A...

osc_b9r67jnt
28分钟前
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部