文档章节

NVIDIA/k8s-device-plugin源码分析

WaltonWang
 WaltonWang
发布于 2018/04/17 23:36
字数 1422
阅读 891
收藏 1

Author: xidianwangtao@gmail.com

k8s-device-plugin内部实现原理图

Kubernetes如何通过Device Plugins来使用NVIDIA GPU中,对NVIDIA/k8s-device-plugin的工作原理进行了深入分析,为了方便我们在这再次贴出其内部实现原理图:

输入图片说明

PreStartContainer和GetDevicePluginOptions两个接口,在NVIDIA/k8s-device-plugin中可以忽略,可以认为是空实现。我们主要关注ListAndWatch和Allocate的实现。

启动

一切从main函数开始!核心的代码如下:

func main() {
	log.Println("Loading NVML")
	if err := nvml.Init(); err != nil {
		select {}
	}
    ...
	log.Println("Fetching devices.")
	if len(getDevices()) == 0 {
		select {}
	}

	log.Println("Starting FS watcher.")
	watcher, err := newFSWatcher(pluginapi.DevicePluginPath)
	if err != nil {
		os.Exit(1)
	}
    ...
	log.Println("Starting OS watcher.")
	sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

	restart := true
	var devicePlugin *NvidiaDevicePlugin

L:
	for {
		if restart {
			if devicePlugin != nil {
				devicePlugin.Stop()
			}

			devicePlugin = NewNvidiaDevicePlugin()
			if err := devicePlugin.Serve(); err != nil {
				...
			} else {
				restart = false
			}
		}

		select {
		case event := <-watcher.Events:
			if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
				restart = true
			}

		case err := <-watcher.Errors:

		case s := <-sigs:
			switch s {
			case syscall.SIGHUP:
				restart = true
			default:
				devicePlugin.Stop()
				break L
			}
		}
	}
}

相关说明不需多说,请参考下面的流程逻辑图:

输入图片说明

Serve

k8s-device-plugin启动流程中,devicePlugin.Serve负责启动gRPC Server Start对外提供服务,然后把自己注册到kubelet。

// Serve starts the gRPC server and register the device plugin to Kubelet
func (m *NvidiaDevicePlugin) Serve() error {
	err := m.Start()
	if err != nil {
		log.Printf("Could not start device plugin: %s", err)
		return err
	}
	log.Println("Starting to serve on", m.socket)

	err = m.Register(pluginapi.KubeletSocket, resourceName)
	if err != nil {
		log.Printf("Could not register device plugin: %s", err)
		m.Stop()
		return err
	}
	log.Println("Registered device plugin with Kubelet")

	return nil
}

Start

Start的代码如下:

// Start starts the gRPC server of the device plugin
func (m *NvidiaDevicePlugin) Start() error {
	err := m.cleanup()
	if err != nil {
		return err
	}

	sock, err := net.Listen("unix", m.socket)
	if err != nil {
		return err
	}

	m.server = grpc.NewServer([]grpc.ServerOption{}...)
	pluginapi.RegisterDevicePluginServer(m.server, m)

	go m.server.Serve(sock)

	// Wait for server to start by launching a blocking connexion
	conn, err := dial(m.socket, 5*time.Second)
	if err != nil {
		return err
	}
	conn.Close()

	go m.healthcheck()

	return nil
}

更加深入的代码调用关系,这里不多介绍,直接贴出Start的实现逻辑图:

输入图片说明

Start流程中负责创建nvidia.sock文件。

需要特别说明healthcheck部分:

  • healthcheck启动协程对管理的devices进行健康状态监控,一旦发现有device unhealthy,则发送到NvidiaDevicePlugin的health channel。device plugin的ListAndWatch会从health channel中获取这些unhealthy devices,并通知到kubelet进行更新。
  • 只监控nvmlEventTypeXidCriticalError事件,一旦监控到某个device的这个Event,就认为该device unhealthy。关于nvmlEventTypeXidCriticalError的说明,请参考NVIDIA的nvml api文档
  • 可以通过设置NVIDIA device plugin Pod内的环境变量DP_DISABLE_HEALTHCHECKS为”all”来取消healthcheck。不设置或者设置为其他值都会启动healthcheck,默认部署时不设置。

Register

Start之后,接着进入Register流程,其代码如下:

// Register registers the device plugin for the given resourceName with Kubelet.
func (m *NvidiaDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
	conn, err := dial(kubeletEndpoint, 5*time.Second)
	if err != nil {
		return err
	}
	defer conn.Close()

	client := pluginapi.NewRegistrationClient(conn)
	reqt := &pluginapi.RegisterRequest{
		Version:      pluginapi.Version,
		Endpoint:     path.Base(m.socket),
		ResourceName: resourceName,
	}

	_, err = client.Register(context.Background(), reqt)
	if err != nil {
		return err
	}
	return nil
}

Register的实现流程图如下:

输入图片说明

  • 注册的Resource Name是nvidia.com/gpu
  • 注册的Version是v1beta1

Stop

Stop的代码如下:

// Stop stops the gRPC server
func (m *NvidiaDevicePlugin) Stop() error {
	if m.server == nil {
		return nil
	}

	m.server.Stop()
	m.server = nil
	close(m.stop)

	return m.cleanup()
}

Stop的实现流程图如下:

输入图片说明

  • Stop流程中负责停止gRPC Server,并删除nvidia.sock。

ListAndWatch

ListAndWatch接口主要负责监控health channel,发现有gpu变成unhealthy后,将完成的gpu list信息(ID和health状态)发送给kubelet进行更新。

// ListAndWatch lists devices and update that list according to the health status
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
	s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})

	for {
		select {
		case <-m.stop:
			return nil
		case d := <-m.health:
			// FIXME: there is no way to recover from the Unhealthy state.
			d.Health = pluginapi.Unhealthy
			s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
		}
	}
}

ListAndWatch的实现流程图如下:

输入图片说明

Allocate

Allocate负责接口kubelet为Container请求分配gpu的请求,请求的结构体如下:

// - Allocate is expected to be called during pod creation since allocation
//   failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
//   environment as directed by the plugin.
// - Allocate allows Device Plugin to run device specific operations on
//   the Devices requested
type AllocateRequest struct {
	ContainerRequests []*ContainerAllocateRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests" json:"container_requests,omitempty"`
}

type ContainerAllocateRequest struct {
	DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"`
}

device plugin Allocate的Response结构体定义如下:

// AllocateResponse includes the artifacts that needs to be injected into
// a container for accessing 'deviceIDs' that were mentioned as part of
// 'AllocateRequest'.
// Failure Handling:
// if Kubelet sends an allocation request for dev1 and dev2.
// Allocation on dev1 succeeds but allocation on dev2 fails.
// The Device plugin should send a ListAndWatch update and fail the
// Allocation request
type AllocateResponse struct {
	ContainerResponses []*ContainerAllocateResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses" json:"container_responses,omitempty"`
}

type ContainerAllocateResponse struct {
	// List of environment variable to be set in the container to access one of more devices.
	Envs map[string]string `protobuf:"bytes,1,rep,name=envs" json:"envs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	// Mounts for the container.
	Mounts []*Mount `protobuf:"bytes,2,rep,name=mounts" json:"mounts,omitempty"`
	// Devices for the container.
	Devices []*DeviceSpec `protobuf:"bytes,3,rep,name=devices" json:"devices,omitempty"`
	// Container annotations to pass to the container runtime
	Annotations map[string]string `protobuf:"bytes,4,rep,name=annotations" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}

Allocate的代码实现如下:

// Allocate which return list of devices.
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
	devs := m.devs
	responses := pluginapi.AllocateResponse{}
	for _, req := range reqs.ContainerRequests {
		response := pluginapi.ContainerAllocateResponse{
			Envs: map[string]string{
				"NVIDIA_VISIBLE_DEVICES": strings.Join(req.DevicesIDs, ","),
			},
		}

		for _, id := range req.DevicesIDs {
			if !deviceExists(devs, id) {
				return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
			}
		}

		responses.ContainerResponses = append(responses.ContainerResponses, &response)
	}

	return &responses, nil
}

下面是其实现逻辑图:

输入图片说明

  • Allocate中会遍历ContainerRequests,将DeviceIDs封装到ContainerAllocateResponse的Envs:NVIDIA_VISIBLE_DEVICES中,格式为:”${ID_1},${ID_2},...
  • 除此之外,并没有封装Mounts, Devices, Annotations。

总结

NVIDIA/k8s-device-plugin的代码中,依赖于nvidia-docker代码库,存在很多golang调用C库的地方,还需要大家自行到[nvml api文档](https://docs.nvidia.com/deploy/nvml-api)中查看相关C函数声明。上一篇博客介绍了Kubernetes如何通过Device Plugins来使用NVIDIA GPU,这篇博客介绍NVIDIA/k8s-device-plugin的代码实现流程,下一篇博客我觉得还有必要对kubelet device plugin manger进行代码分析,如此才能完整的理解整个交互细节。

© 著作权归作者所有

共有 人打赏支持
WaltonWang
粉丝 210
博文 104
码字总数 220998
作品 0
深圳
程序员
私信 提问
nvidia-docker2 在 Kubernetes 上实践

女主宣言 nvida-docker2 可以帮助我们将旧的加速计算应用程序容器化,将特定的 GPU 资源分配给容器,并可以轻松地跨不同的环境共享应用程序、协同工作和测试应用程序。今天带来的分享是有关 ...

ZVAyIVqt0UFji
2018/11/29
0
0
Kubernetes中调度GPU资源

Kubernetes中调度GPU资源 Kubernetes 包含一个体验性的功能,支持 AMD和NVIDIA GPUs 跨节点调度。对 NVIDIA GPUs 支持从 v1.6开始,然后经过几次不兼容的叠代修改,对AMD GPUs 的支持从 v1.9...

openthings
01/04
0
0
Kubernetes集群升级NVidia GPU驱动版本

最近Kubernetes、Docker和NVidia GPU驱动都进行了较大的升级,因此考虑对Kubernetes集群升级NVidia GPU驱动版本。我这里使用Ubuntu 18.04LTS + NVidia GPU Driver 410.78 + Kubernetes 1.13....

openthings
01/04
0
0
不会装cuda配环境的小学生怎么躺撸caffe

本文首发于个人博客 不会装cuda配环境的小学生怎么躺撸caffe 收录于简书专题深度学习·计算机视觉与机器学习 DL如今已经快成为全民玄学了,感觉离民科入侵不远了。唯一的门槛可能是环境不好配...

在河之简
2017/06/26
0
0
k8s的扩展资源设计和device-plugin

extended-resources extended-resources在k8s1.9中是一个stable的特性。可以用一句话来概括这个特性: 通过向apiserver发送一个patch node 的请求,为这个node增加一个自定义的资源类型,用于...

店家小二
2018/12/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

带标题的图片轮询展示

<div> <table width="671" cellpadding="0" cellspacing="0"> <tr height="5"> <td style="background-image:url(include/image/news_left_top_circle.jpg)" width="7"> </td> <td style="ba......

JackChenzp
33分钟前
0
0
Hanlp中N最短路径分词详细介绍

N-最短路径 是中科院分词工具NLPIR进行分词用到的一个重要算法,张华平、刘群老师在论文《基于N-最短路径方法的中文词语粗分模型》中做了比较详细的介绍。该算法算法基本思想很简单,就是给定...

左手的倒影
41分钟前
1
0
es 在数据量很大的情况下(数十亿级别)如何提高查询效率啊?

面试题 es 在数据量很大的情况下(数十亿级别)如何提高查询效率啊? 面试官心理分析 这个问题是肯定要问的,说白了,就是看你有没有实际干过 es,因为啥?其实 es 性能并没有你想象中那么好...

架构师springboot
42分钟前
8
0
php面试题常见面试题

又是跳槽季,跳槽就有面试,面试难免会问一些理论问题,前面面试了几家,做了一些面试题,记住了一部分,整理了一下: 1.cookie 和session区别 session存在服务器,cookie存在浏览器;sessi...

后盾风云
48分钟前
3
0
华为Mate X显示适配指导来了!带你完美适配折叠屏

华为Mate X一经发布便引发了世界级的关注,除了史无前例的交互体验外,作为一款可以变形的手机,它还拥有多种不同的形态:展开时是8英寸全面屏,折叠后又变身为6.6英寸和6.38英寸两块屏幕。 ...

安卓绿色联盟
59分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部