Kubernetes 源码依赖库中的 wait 库功能介绍

原创
2019/06/11 17:40
阅读数 1.8K

k8s 依赖库中的 wait 库功能

该库提供了很多基于周期性执行的方法,以及约束周期性执行的方法。

周期性执行一个函数

在某些情况下,我们需要周期性地执行一些动作,比如发送心跳请求给master,那么可以使用 wait 库中的 Forever 功能。 这里给一个简单的例子,每隔一秒钟输出当前的时间。

package main

import (
	"fmt"
	"time"

	"k8s.io/apimachinery/pkg/util/wait"
)

func main() {
	wait.Forever(func() {
		fmt.Println(time.Now().String())
	}, time.Second)
}

带StopSignal的周期性执行函数

上面的 Wait 函数其实是 Util 的变体,Util 本身还带有一个 stopSignal 选项。比如我们要删除一个CDN资源,然后删除之后周期性地检查文件是否还可以访问。可以用下面的逻辑。我们这里用counter来代替检查资源状态的判断逻辑。

package main

import (
	"fmt"
	"time"

	"k8s.io/apimachinery/pkg/util/wait"
)

var stopSignal = make(chan struct{})

func main() {
	var counter = 1
	wait.Until(func() {
		if counter > 10 {
			close(stopSignal)
		}
		fmt.Println(time.Now().String())
		counter++
	}, time.Second, stopSignal)

}

sync.WaitGroup 的封装及扩展

最简单的是对WaitGroup的简单封装

package main

import (
	"fmt"

	"k8s.io/apimachinery/pkg/util/wait"
)

func main() {
	g := wait.Group{}
	for i := 0; i < 100; i++ {
		j := i
		g.Start(func() {
			fmt.Println(j)
		})
	}
	g.Wait()
}

我们再假设一个场景,老大说大家去抓网页,差不多抓满1000个网页就结束。这个时候大家并发去抓,想要同步是比较困难的,另外什么时候通知大家结束也比较麻烦。这里,我们可以用下面的这样的框架代码。

package main

import (
	"fmt"
	"time"

	"sync/atomic"

	"k8s.io/apimachinery/pkg/util/wait"
)

var stopSignal = make(chan struct{})

func main() {
	g := wait.Group{}
	var counter int32
	for i := 0; i < 100; i++ {
		j := i
		g.StartWithChannel(stopSignal, func(stopCh <-chan struct{}) {
			for {
				//quit if
				if atomic.LoadInt32(&counter) > 1000 {
					return
				}
				//otherwise
				select {
				case <-stopSignal:
					return
				default:
					fmt.Println(j, time.Now().String())
					atomic.AddInt32(&counter, 1)
					<-time.After(time.Second)
				}
			}
		})
	}
	g.Wait()
}

刚刚的场景还可以使用StartWithContext方法来实现。

package main

import (
	"context"
	"fmt"
	"time"

	"sync/atomic"

	"k8s.io/apimachinery/pkg/util/wait"
)

func main() {
	g := wait.Group{}
	var counter int32
	ctx, cancelFunc := context.WithCancel(context.Background())
	for i := 0; i < 100; i++ {
		j := i
		g.StartWithContext(ctx, func(ctx context.Context) {
			for {
				//quit if
				if atomic.LoadInt32(&counter) > 1000 {
					cancelFunc() //fire cancel signal
				}
				//otherwise
				select {
				case <-ctx.Done(): //cancel signal received
					return
				default:
					fmt.Println(j, time.Now().String())
					atomic.AddInt32(&counter, 1)
					<-time.After(time.Second)
				}
			}
		})
	}
	g.Wait()
}
展开阅读全文
加载中
点击加入讨论🔥(2) 发布并加入讨论🔥
打赏
2 评论
0 收藏
0
分享
返回顶部
顶部