(下)容器能不能将 volume 挂载直接挂到根目录?—— Snapshotter 是如何工作的?

原创
2023/03/22 22:05
阅读数 713

这件事起源于有小伙伴在某群里问,在 K8s 中,能不能把 volume 挂载直接挂到根目录?我的第一反应是不能。容器会使用 union filesystem 将容器的内容挂到根目录下,这点在正常情况下是无法更改的。但是就止于此吗?发现给不出合理解释的时候,突然感觉自己对于容器的认知只停留在了很表面的阶段。

首先通过我们前面的分析,OCI 运行时实际上是允许我们随便定义根目录挂载的。而且我们在本系列第一篇博客(上)第二章节中的实操中,也确实使用了本地的一个随意目录作为 rootfs。

而在系列第二篇博客(中)中,我们分析了通过分析 kubelet 到 containerd 到 runc 的调用链路,了解到 containerd 中的 rootfs 挂载由 snapshotter 来进行维护。本期,我们将分析一下 containerd 默认的 snapshotter 是怎么管理容器根文件系统,以及介绍一下如何自定义一个简单的 snapshotter。

六、Snapshotter 的文档

先看看官方是如何定义此组件的:

Snapshotter 定义了一系列方法用于实现一个 snapshot 的快照器,用于 allocating、snapshotting 以及 mounting fs 变更集。该模型基于构建一系列带有 parent-child 关系的变更集来工作。一个 snapshot 代表一个 fs 状态。每一个 snapshot 都有一个 parent,空字符串表示没有 parent。snapshot 和其 parent 之间可以生成一个经典的 layer。

笔者注:这里的 layer 说的应该是 docker 的那套 layers 快照方案。Docker 容器建立在基于一套叫做 layers 的快照方案上, Layers 提供了 fork 文件系统、进行变更、再将 changesets 保存回 new layer 的能力。

Snapshotter 本也脱胎于原来的这套 Graphdriver 方案,它提出了一个更加灵活的模型来管理 layers。模型将专注于提出基于 snapshotting 的功能性,且不涉及与 images 结构强耦合的 API,最小化的 API 将在不牺牲功能的前提下进行行为简化。致力于使各个 driver 实现面更小,且行为更一致。

调用 Prepare 可用于创建一个 Active Snapshot,在 mounting 后,进行的修改将应用于 snapshot。而提交则会创建一个 Commited SnapshotCommited Snapshot 可以作为 parent,而 Active Snapshot 则不行。

通过它的生命周期,会更利于理解 snapshot。Active Snapshot 总是由 Prepare 或者 View 创建,而 Commited Snapshot 则通过 Commit 创建。但这里要注意的是,Active SnapshotCommitted Snapshot 并不能互相转化。所有的 Snapshot 都可以被删除。

为了保持一致, 这里定义了以下的术语用于贯穿整个 snapshotter 接口的实现:

  • ctx
  • key 指一个 Active Snapshot
  • name 指一个 Committed Snapshot
  • parent 指其 parent

大部分方法都使用了上述这几个 “术语” 的组合。通常情况下,如果没有特殊说明,nameparent 用于获取 Commited Snapshot,而 key 用于获取一个 Active Snapshot

All variables used to access snapshots use the same key space. For example, an active snapshot may not share the same key with a committed snapshot.(不知道咋翻译....)

这里通过几个 example 来覆盖 snapshotter 的功能:

6.1 Importing a layer

要引入一个 layer,我们可以通过 snapshotter 提供一个 mounts 集进行使用,目标目录将会捕获 changeset。

(作者注:这里演示的是如何将一个镜像 layer 转换成 snapshot)

先获取一个 layer tar 压缩包的目录,并创建一个临时目录:

layerPath, tmpDir := getLayerPath(), mkTmpDir() // just a path to layer tar file.

再通过调用 Prepare 来准备一个新的快照事物,指定某个 key,以及一个空的 parent。为了避免在 layer 解压缩期间被回收,这里加了一个 label:

noGcOpt := snapshots.WithLabels(map[string]string{
	"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})
mounts, err := snapshotter.Prepare(ctx, key, "", noGcOpt)
if err != nil { ... }

调用 Prepare 后,会得到一个被这个 key 指向 Active Snapshot 并返回其 mounts 集。然后将它 mount 到临时目录上:

if err := mount.All(mounts, tmpDir); err != nil { ... }

mount 动作一旦完成之后,这个临时目录就可捕获变更了。在实践中,这个模式很像 fs transaction。下一步是把刚才准备好的 layer 解压到 mount 上去的临时目录,这里有一个指定的方法叫做 unpackLayer 可以将 layer 内容应用到指定目录,并计算 layer 内容的 DiffID:

layer, err := os.Open(layerPath)
if err != nil { ... }
digest, err := unpackLayer(tmpLocation, layer) // unpack into layer location
if err != nil { ... }

动作都完成之后,就有了一个带有解压缩内容的 layer,实现的时候应该重新校验一下 digest 和 DiffID 是否匹配。完成后,就可以 unmount 了:

unmount(mounts) // optional, for now

现在我们已经解压并校验了 layer,可以将 Active Snapshot 提交了。在下面这个例子中,使用 layer digest 作为 name 来创建一个 Committed Snapshot ,而在正常实践中,一般会用 ChainID。(作者注:因为 layer digest 是可能重复的,ChainID 是一组 layer digest 组合起来的字符串,可以定位到一个唯一的 layer)在操作完毕后,Active Snapshot 将会被移除:

if err := snapshotter.Commit(ctx, digest.String(), key, noGcOpt); err != nil { ... }

自此,我们就拥有了一个可以通过 digest 获取到的 layer。

6.2 Importing the Next Layer

让一个 layer 依赖于之前创建的那个 layer 的过程和上面一致,在调用 Prepare 的时候传入刚才那个 digest 作为 parent,以及一个唯一的 key:

mounts, err := snapshotter.Prepare(ctx, key, parentDigest, noGcOpt)

当我们 mount,apply 和 commit 后,这个新的 snapshot 将基于之前那个 layer 的内容。

6.3 Running a Container

在真正运行容器的时候,一般用 Image 创建的 Snapshot 作为 parent,在 mount 之后,这个目录可用于容器的 fs:

mounts, err := snapshotter.Prepare(ctx, containerKey, imageRootFSChainID)

返回的 mounts 可传递给容器运行时,如果想要创建一个新的 Image,可以调用 Commit:

if err := snapshotter.Commit(ctx, newImageSnapshot, containerKey); err != nil { ... }

不过,在大多数情况,一般都会调用 snapshotter.Remove() 来丢弃变更。

七、Overlay Snapshotter 源码简单分析

Overlay Snapshotter 和其他的 plugin 一样,Snapshotter 在 init() 中进行注册,上期我们有简单提到过,plugin 就是个类似 Spring Bean 管理、注册的组件:

	plugin.Register(&plugin.Registration{
		Type:   plugin.SnapshotPlugin,
		ID:     "overlayfs",
		Config: &Config{},
		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
			ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())

			config, ok := ic.Config.(*Config)
			if !ok {
				return nil, errors.New("invalid overlay configuration")
			}

			root := ic.Root
			if config.RootPath != "" {
				root = config.RootPath
			}

			var oOpts []overlay.Opt
			if config.UpperdirLabel {
				oOpts = append(oOpts, overlay.WithUpperdirLabel)
			}

			ic.Meta.Exports["root"] = root
			return overlay.NewSnapshotter(root, append(oOpts, overlay.AsynchronousRemove)...)
		},
	})

========================

// NewSnapshotter returns a Snapshotter which uses overlayfs. The overlayfs
// diffs are stored under the provided root. A metadata file is stored under
// the root.
func NewSnapshotter(root string, opts ...Opt) (snapshots.Snapshotter, error) {
	var config SnapshotterConfig
	for _, opt := range opts {
		if err := opt(&config); err != nil {
			return nil, err
		}
	}

	if err := os.MkdirAll(root, 0700); err != nil {
		return nil, err
	}
	supportsDType, err := fs.SupportsDType(root)
	if err != nil {
		return nil, err
	}
	if !supportsDType {
		return nil, fmt.Errorf("%s does not support d_type. If the backing filesystem is xfs, please reformat with ftype=1 to enable d_type support", root)
	}
	ms, err := storage.NewMetaStore(filepath.Join(root, "metadata.db"))
	if err != nil {
		return nil, err
	}

	if err := os.Mkdir(filepath.Join(root, "snapshots"), 0700); err != nil && !os.IsExist(err) {
		return nil, err
	}
	// figure out whether "userxattr" option is recognized by the kernel && needed
	userxattr, err := overlayutils.NeedsUserXAttr(root)
	if err != nil {
		logrus.WithError(err).Warnf("cannot detect whether \"userxattr\" option needs to be used, assuming to be %v", userxattr)
	}

	return &snapshotter{
		root:          root,
		ms:            ms,
		asyncRemove:   config.asyncRemove,
		upperdirLabel: config.upperdirLabel,
		indexOff:      supportsIndex(),
		userxattr:     userxattr,
	}, nil
}

这里主要是初始化对象,注册了一个 root 目录用来存放 diffs,并在 root 下创建了一个 metadata.db,用于存放元数据。在文章后续内容中,我们会直接把它称为 root,不过要注意,这个 root 不等同于 /

7.1 创建容器过程中的 Prepare

再回想一下上一片文章里,kubelet 调用创建 containerd 来创建容器时的代码,我们提到有一个初始化容器对象的责任链,这里面就有 Snapshotter 的身影:

// CreateContainer creates a new container in the given PodSandbox.
func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (_ *runtime.CreateContainerResponse, retErr error) {
  
  ...

	// Grab any platform specific snapshotter opts.
	sOpts := snapshotterOpts(c.config.ContainerdConfig.Snapshotter, config)

	// Set snapshotter before any other options.
	opts := []containerd.NewContainerOpts{
		containerd.WithSnapshotter(c.runtimeSnapshotter(ctx, ociRuntime)),
		// Prepare container rootfs. This is always writeable even if
		// the container wants a readonly rootfs since we want to give
		// the runtime (runc) a chance to modify (e.g. to create mount
		// points corresponding to spec.Mounts) before making the
		// rootfs readonly (requested by spec.Root.Readonly).
		customopts.WithNewSnapshot(id, containerdImage, sOpts...),
	}

  ...

	var cntr containerd.Container
	if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
		return nil, fmt.Errorf("failed to create containerd container: %w", err)
	}
	return &runtime.CreateContainerResponse{ContainerId: id}, nil
}

我们看看它到底做了什么,第一部分 containerd.WithSnapshotter(c.runtimeSnapshotter(ctx, ociRuntime)) 很简单,这里不细说了,就是记了个名字,也就是上一章提到的那个 PrepareView 要用到的 key。第二步就是真正的 Prepare

// WithNewSnapshot wraps `containerd.WithNewSnapshot` so that if creating the
// snapshot fails we make sure the image is actually unpacked and retry.
func WithNewSnapshot(id string, i containerd.Image, opts ...snapshots.Opt) containerd.NewContainerOpts {
	f := containerd.WithNewSnapshot(id, i, opts...)
	return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
		if err := f(ctx, client, c); err != nil {
			if !errdefs.IsNotFound(err) {
				return err
			}

			if err := i.Unpack(ctx, c.Snapshotter); err != nil {
				return fmt.Errorf("error unpacking image: %w", err)
			}
			return f(ctx, client, c)
		}
		return nil
	}
}

一开始,先是做了下 unpack,在后面的章节,我们再说这个 unpack。我们知道 Snapshotter 的职责并不包括镜像的解压缩,这里会尝试 f(),如果发现这个 key 下的 snapshot 还不存在,则先 unpack,再 f(),我们看看 f() 做了什么:

// WithNewSnapshot allocates a new snapshot to be used by the container as the
// root filesystem in read-write mode
func WithNewSnapshot(id string, i Image, opts ...snapshots.Opt) NewContainerOpts {
	return func(ctx context.Context, client *Client, c *containers.Container) error {
		diffIDs, err := i.RootFS(ctx)
		if err != nil {
			return err
		}

		parent := identity.ChainID(diffIDs).String()
		c.Snapshotter, err = client.resolveSnapshotterName(ctx, c.Snapshotter)
		if err != nil {
			return err
		}
		s, err := client.getSnapshotter(ctx, c.Snapshotter)
		if err != nil {
			return err
		}

		parent, err = resolveSnapshotOptions(ctx, client, c.Snapshotter, s, parent, opts...)
		if err != nil {
			return err
		}
		if _, err := s.Prepare(ctx, id, parent, opts...); err != nil {
			return err
		}
		c.SnapshotKey = id
		c.Image = i.Name()
		return nil
	}
}

i.RootFS(ctx) 是从 manifest 文件中拿到这个镜像的 diffsIDS,然后找到最上层的那个的 ChainID(由于它是按顺序存储的,所以只需要拿最后一个即可),下一步是根据找到 snapshotter,默认情况下就是 overlayfs。

再根据 ChainID 找到 parent 的 key,也就是 resolveSnapshotOptions,最后调用 Prepare,注意这里的 ID 和容器的 ID 是同一个值,都是随机生成的一坨字符串。

我们再来详细看看 resolveSnapshotOptionsPrepare 具体做了什么,resolveSnapshotOptions 是根据传入的 parent(ChainID) 找到它真正的 parent。

代码里面提到了一个 remap 机制,大体意思就是默认情况下,ChanID 就是 parent key,但是这里提供了一套 remap 机制,允许你使用另一个 key,另一套 key 会转移 fs 的 ownership(uid 和 gid),代码的大概做法就是 chown 一个临时目录,把当前 parent prepare + mount 过去,然后 commit 拿到一个新的 parent。代码就不贴了,有兴趣可以去看看

主要看看 prepare:

func (o *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
	return o.createSnapshot(ctx, snapshots.KindActive, key, parent, opts)
}

func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts []snapshots.Opt) (_ []mount.Mount, err error) {
	var (
		s        storage.Snapshot
		td, path string
	)

	if err := o.ms.WithTransaction(ctx, true, func(ctx context.Context) (err error) {
    // 在 root/snapshots/ 创建临时目录,创建 [目录/fs],[目录/work] 目录
		// 通过 key 去数据库创建并记录这个 Snapshot 对象,并拿到真正的目录,记录在数据库里的随机目录
    // 将这个目录挪到真正目录 root/snapshots/随机目录 下
		return nil
	}); err != nil {
		return nil, err
	}

	return o.mounts(s), nil
}

也就是真正干活的是这个 mounts,mounts 里分两种 case,最简单的 case 就是 parents 为空:

if len(s.ParentIDs) == 0 {
		// if we only have one layer/no parents then just return a bind mount as overlay
		// will not work
		roFlag := "rw"
		if s.Kind == snapshots.KindView {
			roFlag = "ro"
		}

		return []mount.Mount{
			{
				Source: o.upperPath(s.ID),
				Type:   "bind",
				Options: []string{
					roFlag,
					"rbind",
				},
			},
		}
	}

类型是 bind,目录是 root/snapshots/随机目录/fs,参数是 ro/rw 只读或者读写,以及 rbind 表示递归生效(没有 rbind 只绑定当前这个目录,而不绑定子目录)。

而正常情况下是这样的:

var options []string

	// set index=off when mount overlayfs
	if o.indexOff {
		options = append(options, "index=off")
	}

	if o.userxattr {
		options = append(options, "userxattr")
	}

	if s.Kind == snapshots.KindActive {
		options = append(options,
			fmt.Sprintf("workdir=%s", o.workPath(s.ID)),
			fmt.Sprintf("upperdir=%s", o.upperPath(s.ID)),
		)
	} else if len(s.ParentIDs) == 1 {
		return []mount.Mount{
			{
				Source: o.upperPath(s.ParentIDs[0]),
				Type:   "bind",
				Options: []string{
					"ro",
					"rbind",
				},
			},
		}
	}

	parentPaths := make([]string, len(s.ParentIDs))
	for i := range s.ParentIDs {
		parentPaths[i] = o.upperPath(s.ParentIDs[i])
	}

	options = append(options, fmt.Sprintf("lowerdir=%s", strings.Join(parentPaths, ":")))
	return []mount.Mount{
		{
			Type:    "overlay",
			Source:  "overlay",
			Options: options,
		},
	}

这里会对生成一个 overlay 类型的 Mount,opts 是 workdir=root/snapshots/随机目录/work, upperdir=root/snapshots/随机目录/fslowerdir=parenta:parentb:parentc.....,这个 parents 是从数据库里的组织结构里拿的,是一个树形结构的 kv 存储。和上篇文章的 container 存储类似,每个节点都可以找到自己的 parent,一个类似递归的 for 循环就能找到所有 parent。

7.2 创建容器过程中镜像的 Prepare(Unpack)

那么只剩下一个问题,就是 Parent 这个镜像,及其 DiffIDs,是什么时候被解析、存储到数据库中的呢?答案就在这里,最开始的那个 Opts,在 Unpack 镜像的时候:

// WithNewSnapshot wraps `containerd.WithNewSnapshot` so that if creating the
// snapshot fails we make sure the image is actually unpacked and retry.
func WithNewSnapshot(id string, i containerd.Image, opts ...snapshots.Opt) containerd.NewContainerOpts {
	f := containerd.WithNewSnapshot(id, i, opts...)
	return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
		if err := f(ctx, client, c); err != nil {
			if !errdefs.IsNotFound(err) {
				return err
			}

			if err := i.Unpack(ctx, c.Snapshotter); err != nil {
				return fmt.Errorf("error unpacking image: %w", err)
			}
			return f(ctx, client, c)
		}
		return nil
	}
}

这里会从镜像中的清单文件里面拿到 layers,并调用 rootfs.ApplyLayerWithOpts 进行初始化:

func (i *image) Unpack(ctx context.Context, snapshotterName string, opts ...UnpackOpt) error {
	...
  
	manifest, err := i.getManifest(ctx, i.platform)
	layers, err := i.getLayers(ctx, i.platform, manifest)

	var (
		a  = i.client.DiffService()
		cs = i.client.ContentStore()

		chain    []digest.Digest
		unpacked bool
	)
	snapshotterName, err = i.client.resolveSnapshotterName(ctx, snapshotterName)
	sn, err := i.client.getSnapshotter(ctx, snapshotterName)

	for _, layer := range layers {
		unpacked, err = rootfs.ApplyLayerWithOpts(ctx, layer, chain, sn, a, config.SnapshotOpts, config.ApplyOpts)
		if err != nil {
			return err
		}

		if unpacked {
			// Set the uncompressed label after the uncompressed
			// digest has been verified through apply.
			cinfo := content.Info{
				Digest: layer.Blob.Digest,
				Labels: map[string]string{
					labels.LabelUncompressed: layer.Diff.Digest.String(),
				},
			}
			if _, err := cs.Update(ctx, cinfo, "labels."+labels.LabelUncompressed); err != nil {
				return err
			}
		}

		chain = append(chain, layer.Diff.Digest)
	}

	...
  
	return err
}

// ApplyLayerWithOpts applies a single layer on top of the given provided layer chain,
// using the provided snapshotter, applier, and apply opts. If the layer was unpacked true
// is returned, if the layer already exists false is returned.
func ApplyLayerWithOpts(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts []snapshots.Opt, applyOpts []diff.ApplyOpt) (bool, error) {
	var (
		chainID = identity.ChainID(append(chain, layer.Diff.Digest)).String()
		applied bool
	)
	if _, err := sn.Stat(ctx, chainID); err != nil {
		if !errdefs.IsNotFound(err) {
			return false, fmt.Errorf("failed to stat snapshot %s: %w", chainID, err)
		}

		if err := applyLayers(ctx, []Layer{layer}, append(chain, layer.Diff.Digest), sn, a, opts, applyOpts); err != nil {
			if !errdefs.IsAlreadyExists(err) {
				return false, err
			}
		} else {
			applied = true
		}
	}
	return applied, nil

}

具体干活的 applyLayers 就能看到熟悉的 Prepare 了,这里从底层到上层一层层去进行 Prepare,整个过程和我们第六章讲到的那个 demo 类似,这里的 chainID 有必要小小解释一下,其实就是防止正好有 Digest 一样,原理是这样的 :

// As an example, given the chain of ids `[A, B, C]`, the result `[A,
// ChainID(A|B), ChainID(A|B|C)]` will be written back to the slice.
func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts []snapshots.Opt, applyOpts []diff.ApplyOpt) error {
	var (
		parent  = identity.ChainID(chain[:len(chain)-1])
		chainID = identity.ChainID(chain)
		layer   = layers[len(layers)-1]
		diff    ocispec.Descriptor
		key     string
		mounts  []mount.Mount
		err     error
	)

	for {
		key = fmt.Sprintf(snapshots.UnpackKeyFormat, uniquePart(), chainID)

		// Prepare snapshot with from parent, label as root
		mounts, err = sn.Prepare(ctx, key, parent.String(), opts...)
		if err != nil {
			if errdefs.IsNotFound(err) && len(layers) > 1 {
				if err := applyLayers(ctx, layers[:len(layers)-1], chain[:len(chain)-1], sn, a, opts, applyOpts); err != nil {
					if !errdefs.IsAlreadyExists(err) {
						return err
					}
				}
				// Do no try applying layers again
				layers = nil
				continue
			} else if errdefs.IsAlreadyExists(err) {
				// Try a different key
				continue
			}

			// Already exists should have the caller retry
			return fmt.Errorf("failed to prepare extraction snapshot %q: %w", key, err)
		}
		break
	}
  ...

	return nil
}

7.3 启动容器过程中镜像的 Mount

关于这部分内容,其实上篇文章已经讲的比较清楚了,也就是 3.5 章节 + 4.2 章节的内容:

这里会创建一个 Task 对象,拿取 Snapshotter 中的 mounts,作为请求参数去调用 shim 来真正创建容器,在创建容器之前,会 mount rootfs,这里就不细说了:

// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
  
  ...
  
	for _, m := range r.Rootfs {
		pmounts = append(pmounts, process.Mount{
			Type:    m.Type,
			Source:  m.Source,
			Target:  m.Target,
			Options: m.Options,
		})
	}
  
  rootfs := ""
	if len(pmounts) > 0 {
		rootfs = filepath.Join(r.Bundle, "rootfs")
		if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
			return nil, err
		}
	}
  
  var mounts []mount.Mount
	for _, pm := range pmounts {
		mounts = append(mounts, mount.Mount{
			Type:    pm.Type,
			Source:  pm.Source,
			Target:  pm.Target,
			Options: pm.Options,
		})
	}
	if err := mount.All(mounts, rootfs); err != nil {
		return nil, fmt.Errorf("failed to mount rootfs component: %w", err)
	}
}

7.4 自定义 Snapshotter (Plugin)

我们知道 Containerd 对自身使用的各个组件使用一套 Plugin 机制统一管理。每个类型的 Plugin 都抽象出了一套接口,并有其一个或多个实现,我们称之为 built-in 插件。除此之外,Containerd 也允许我们自定义自己的 Plugin,Plugin 通过 containerd 的配置文件来申明:

// Config provides containerd configuration data for the server
type Config struct {
  ...
	// ProxyPlugins configures plugins which are communicated to over GRPC
	ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"`
  ...
}

--------
配置如下,声明一个名叫 customsnapshot,grpc 地址为 /var/run/mysnapshotter.sock 的插件:
--------

version = 2

[proxy_plugins]
  [proxy_plugins.customsnapshot]
    type = "snapshot"
    address = "/var/run/mysnapshotter.sock"

在 Containerd 启动时,会读取配置,初始化所有 Plugin:

for name, pp := range config.ProxyPlugins {
	var (
		t plugin.Type
		f func(*grpc.ClientConn) interface{}
		address = pp.Address
	)
	switch pp.Type {
	case string(plugin.SnapshotPlugin), "snapshot":
		t = plugin.SnapshotPlugin
		ssname := name
		f = func(conn *grpc.ClientConn) interface{} {
			return ssproxy.NewSnapshotter(ssapi.NewSnapshotsClient(conn), ssname)
		}
  case xxx...: 
  }
}

// --------------------
    
func NewSnapshotsClient(cc grpc.ClientConnInterface) SnapshotsClient {
	return &snapshotsClient{cc}
}

func (c *snapshotsClient) Prepare(ctx context.Context, in *PrepareSnapshotRequest, opts ...grpc.CallOption) (*PrepareSnapshotResponse, error) {
	out := new(PrepareSnapshotResponse)
	err := c.cc.Invoke(ctx, "/containerd.services.snapshots.v1.Snapshots/Prepare", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

实现一套 Snapshotter 的接口即可:

package main

import (
	"fmt"
	"net"
	"os"

	"google.golang.org/grpc"

	snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
	"github.com/containerd/containerd/contrib/snapshotservice"
	"github.com/containerd/containerd/snapshots/native"
)

func main() {
	// Provide a unix address to listen to, this will be the `address`
	// in the `proxy_plugin` configuration.
	// The root will be used to store the snapshots.
	if len(os.Args) < 3 {
		fmt.Printf("invalid args: usage: %s <unix addr> <root>\n", os.Args[0])
		os.Exit(1)
	}

	// Create a gRPC server
	rpc := grpc.NewServer()

	// Configure your custom snapshotter, this example uses the native
	// snapshotter and a root directory. Your custom snapshotter will be
	// much more useful than using a snapshotter which is already included.
	// https://godoc.org/github.com/containerd/containerd/snapshots#Snapshotter
	sn, err := native.NewSnapshotter(os.Args[2])
	if err != nil {
		fmt.Printf("error: %v\n", err)
		os.Exit(1)
	}

	// Convert the snapshotter to a gRPC service,
	// example in github.com/containerd/containerd/contrib/snapshotservice
	service := snapshotservice.FromSnapshotter(sn)

	// Register the service with the gRPC server
	snapshotsapi.RegisterSnapshotsServer(rpc, service)

	// Listen and serve
	l, err := net.Listen("unix", os.Args[1])
	if err != nil {
		fmt.Printf("error: %v\n", err)
		os.Exit(1)
	}
	if err := rpc.Serve(l); err != nil {
		fmt.Printf("error: %v\n", err)
		os.Exit(1)
	}
}

文章如有错误,感谢指正。

参考资料

containerd源码:https://github.com/containerd/containerd

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部