前言
上一篇文章 我用休眠做并发控制,搞垮了下游服务 发出去后得到不少网友的回应,有人问自己平时用的方案行不行,有人建议借鉴TCP的拥塞控制策略,动态地调整发起的并发数,还有人问为啥我要管下游抗不抗得住。
今天我就来总结几种调用下游服务时做并发控制的方案。
因为我们这篇文章是科普向的文章,主要目的是总结一下应该怎么在享受并发带来效率提升的同时做好并发控制让整个系统的上下游都能更稳定一些,不对限流、控制到底该哪个服务加,出了事故谁负责做讨论。
并发控制方案
前面我们提到用休眠做并发控制的最大弊端是,没有考虑下游服务的感受,每次开固定数量的goroutine
去执行任务后,调用者休眠 1s 再来,而不是等待下游服务的反馈再开启下一批任务执行。
func badConcurrency() {
batchSize := 500
for {
data, _ := queryDataWithSizeN(batchSize)
if len(data) == 0 {
break
}
for _, item := range data {
go func(i int) {
doSomething(i)
}(item)
}
time.Sleep(time.Second * 1)
}
}
此外上游还有请求分配不均的问题,休眠的时候完全没有请求,休眠结束后不管下游有没有执行完成马上又发起一批新的请求。
所以我们应该从等待下游反馈和请求分配尽量均匀两个角度去做并发控制,当然实际项目中应该是两方面结合才行。
本文的可执行示例代码请访问下面的链接查看:
https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go
使用限流器
我们在向下游发起并发请求时可以通过限流器做一下限流,如果达到限制就阻塞直到能再次发起请求。一听到阻塞直到blabla 有的同学是不是马上内心小激动想用 channel
去实现一个限流器啦,「此处应用咳嗽声」其实完全没必要Golang 官方限流器 time/rate
包的 Wait 方法就能给我们提供了这个功能。
func useRateLimit() {
limiter := rate.NewLimiter(rate.Every(1*time.Second), 500)
batchSize := 500
for {
data, _ :=queryDataWithSizeN(batchSize)
if len(data) == 0 {
fmt.Println("End of all data")
break
}
for _, item := range data {
// 阻塞直到令牌桶有充足的Token
err := limiter.Wait(context.Background())
if err != nil {
fmt.Println("Error: ", err)
return
}
go func(i int) {
doSomething(i)
}(item)
}
}
}
// 模拟调用下游服务
func doSomething(i int) {
time.Sleep(2 * time.Second)
fmt.Println("End:", i)
}
// 模拟查询N条数据
func queryDataWithSizeN(size int) (dataList []int, err error) {
rand.Seed(time.Now().Unix())
dataList = rand.Perm(size)
return
}
time/rate
包提供的限流器采用的是令牌桶算法,使用Wait
方法是当桶中没有足够的令牌时调用者会阻塞直到能取到令牌,当然也可以通过Wait
方法接受的Context
参数设置等待超时时间。限流器往桶中放令牌的速率是恒定的这样比单纯使用time.Sleep
请求更均匀些。
关于time/rate 限流器的使用方法的详解,请查看我之前的文章:Golang官方限流器的用法详解
用了限流器了之后,只是让我们的并发请求分布地更均匀了,最好我们能在受到下游反馈完成后再开始下次并发。
使用WaitGroup
我们可以等上批并发请求都执行完后再开始下一批任务,估计大部分同学听到这马上就会想到应该加WaitGroup
WaitGroup适合用于并发-等待的场景:一个
goroutine
在检查点(Check Point)等待一组执行任务的 workergoroutine
全部完成,如果在执行任务的这些workergoroutine
还没全部完成,等待的goroutine
就会阻塞在检查点,直到所有wokergoroutine
都完成后才能继续执行。
func useWaitGroup() {
batchSize := 500
for {
data, _ := queryDataWithSizeN(batchSize)
if len(data) == 0 {
fmt.Println("End of all data")
break
}
var wg sync.WaitGroup
for _, item := range data {
wg.Add(1)
go func(i int) {
doSomething(i)
wg.Done()
}(item)
}
wg.Wait()
fmt.Println("Next bunch of data")
}
}
这里调用程序会等待这一批任务都执行完后,再开始查下一批数据进行下一批请求,等待时间取决于这一批请求中最晚返回的那个响应用了多少时间。
使用Semaphore
如果你不想等一批全部完成后再开始下一批,也可以采用一个完成后下一个补上的策略,这种比使用WaitGroup
做并发控制,如果下游资源够,整个任务的处理时间会更快一些。这种策略需要使用信号量(Semaphore)做并发控制,Go 语言里通过扩展库golang.org/x/sync/semaphore
提供了信号量并发原语。
关于信号量的使用方法和实现原理,可以读读我以前的文章:并发编程-信号量的使用方法和其实现原理
上面的程序改为使用信号量semaphore.Weighted
做并发控制的示例如下:
func useSemaphore() {
var concurrentNum int64 = 10
var weight int64 = 1
var batchSize int = 50
s := semaphore.NewWeighted(concurrentNum)
for {
data, _ := queryDataWithSizeN(batchSize)
if len(data) == 0 {
fmt.Println("End of all data")
break
}
for _, item := range data {
s.Acquire(context.Background(), weight)
go func(i int) {
doSomething(i)
s.Release(weight)
}(item)
}
}
}
使用生产者消费者模式
也有不少读者回复说得加线程池才行,因为每个人公司里可能都有在用的线程池实现,直接用就行,我在这里就不再献丑给大家实现线程池了。在我看来我们其实是需要实现一个生产者和消费者模式,让线程池帮助我们限制只有固定数量的消费者线程去做下游服务的调用,而生产者则是将数据存储里取出来。
channel
正好能够作为两者之间的媒介。
func useChannel() {
batchSize := 50
dataChan := make(chan int)
var wg sync.WaitGroup
wg.Add(batchSize + 1)
// 生产者
go func() {
for {
data, _ := queryDataWithSizeN(batchSize)
if len(data) == 0 {
break
}
for _, item := range data {
dataChan <- item
}
}
close(dataChan)
wg.Done()
}()
// 消费者
go func() {
for i := 0; i < 50; i++ {
go func() {
for {
select {
case v, ok := <- dataChan:
if !ok {
wg.Done()
return
}
doSomething(v)
}
}
}()
}
}()
wg.Wait()
}
这个代码实现里,如果用ErrorGroup
代替WaitGroup
的话还能更简化一些,这个就留给读者自己探索吧。
关于
ErrorGroup
的用法总结,推荐阅读文章:觉得WaitGroup不好用?试试ErrorGroup吧!
总结
通过文章里总结的一些方法,我们也能看出来并发编程的场景下,除了关注发起的并发线程数外,更重要的是还需要关注被异步调用的下层服务的反馈,不是一味的加并发数就能解决问题的。理解我们为什么在并发编程中要关注下层服务的反馈是很重要的,否则我们列举的那些方案其实都可以在goroutine
里再开goroutine
,不关心是否执行完成直接返回,无限套娃下去。
欢迎大家关注大佬公众号,学到很多硬核知识
本文分享自微信公众号 - HHFCodeRv(hhfcodearts)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。