文档章节

深入golang之---goroutine并发控制与通信

潘少online
 潘少online
发布于 2018/06/23 10:28
字数 4420
阅读 17
收藏 0
Go

开发go程序的时候,时常需要使用goroutine并发处理任务,有时候这些goroutine是相互独立的,而有的时候,多个goroutine之间常常是需要同步与通信的。另一种情况,主goroutine需要控制它所属的子goroutine,总结起来,实现多个goroutine间的同步与通信大致有:

  • 全局共享变量
  • channel通信(CSP模型)
  • Context包

本文章通过goroutine同步与通信的一个典型场景-通知子goroutine退出运行,来深入讲解下golang的控制并发。

通知多个子goroutine退出运行

goroutine作为go语言的并发利器,不仅性能强劲而且使用方便:只需要一个关键字go即可将普通函数并发执行,且goroutine占用内存极小(一个goroutine只占2KB的内存),所以开发go程序的时候很多开发者常常会使用这个并发工具,独立的并发任务比较简单,只需要用go关键字修饰函数就可以启用一个goroutine直接运行;但是,实际的并发场景常常是需要进行协程间的同步与通信,以及精确控制子goroutine开始和结束,其中一个典型场景就是主进程通知名下所有子goroutine优雅退出运行。

由于goroutine的退出机制设计是,goroutine退出只能由本身控制,不允许从外部强制结束该goroutine。只有两种情况例外,那就是main函数结束或者程序崩溃结束运行;所以,要实现主进程控制子goroutine的开始和结束,必须借助其它工具来实现。

控制并发的方法

实现控制并发的方式,大致可分成以下三类:

  • 全局共享变量
  • channel通信
  • Context包

全局共享变量

这是最简单的实现控制并发的方式,实现步骤是:

  1. 声明一个全局变量;
  2. 所有子goroutine共享这个变量,并不断轮询这个变量检查是否有更新;
  3. 在主进程中变更该全局变量;
  4. 子goroutine检测到全局变量更新,执行相应的逻辑。

示例如下:

package main

import (
	"fmt"
	"time"
)

func main() {
	running := true
	f := func() {
		for running {
			fmt.Println("sub proc running...")
			time.Sleep(1 * time.Second)
		}
		fmt.Println("sub proc exit")
	}
	go f()
	go f()
	go f()
	time.Sleep(2 * time.Second)
	running = false
	time.Sleep(3 * time.Second)
	fmt.Println("main proc exit")
}

全局变量的优势是简单方便,不需要过多繁杂的操作,通过一个变量就可以控制所有子goroutine的开始和结束;缺点是功能有限,由于架构所致,该全局变量只能是多读一写,否则会出现数据同步问题,当然也可以通过给全局变量加锁来解决这个问题,但那就增加了复杂度,另外这种方式不适合用于子goroutine间的通信,因为全局变量可以传递的信息很小;还有就是主进程无法等待所有子goroutine退出,因为这种方式只能是单向通知,所以这种方法只适用于非常简单的逻辑且并发量不太大的场景,一旦逻辑稍微复杂一点,这种方法就有点捉襟见肘。

channel通信

另一种更为通用且灵活的实现控制并发的方式是使用channel进行通信。
首先,我们先来了解下什么是golang中的channel:Channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication)。
要想理解 channel 要先知道 CSP 模型:

CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,由 Tony Hoare 于 1977 年提出。简单来说,CSP 模型由并发执行的实体(线程或者进程)所组成,实体之间通过发送消息进行通信,这里发送消息时使用的就是通道,或者叫 channel。CSP 模型的关键是关注 channel,而不关注发送消息的实体。Go 语言实现了 CSP 部分理论,goroutine 对应 CSP 中并发执行的实体,channel 也就对应着 CSP 中的 channel。 也就是说,CSP 描述这样一种并发模型:多个Process 使用一个 Channel 进行通信, 这个 Channel 连结的 Process 通常是匿名的,消息传递通常是同步的(有别于 Actor Model)。

先来看示例代码:

package main
import (
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)
func consumer(stop <-chan bool) {
	for {
		select {
		case <-stop:
			fmt.Println("exit sub goroutine")
			return
		default:
			fmt.Println("running...")
			time.Sleep(500 * time.Millisecond)
		}
	}
}
func main() {
		stop := make(chan bool)
        var wg sync.WaitGroup
        // Spawn example consumers
        for i := 0; i < 3; i++ {
            wg.Add(1)
            go func(stop <-chan bool) {
                defer wg.Done()
                consumer(stop)
            }(stop)
        }
        waitForSignal()
        close(stop)
        fmt.Println("stopping all jobs!")
        wg.Wait()
}
func waitForSignal() {
    sigs := make(chan os.Signal)
    signal.Notify(sigs, os.Interrupt)
    signal.Notify(sigs, syscall.SIGTERM)
    <-sigs
}

这里可以实现优雅等待所有子goroutine完全结束之后主进程才结束退出,借助了标准库sync里的Waitgroup,这是一种控制并发的方式,可以实现对多goroutine的等待,官方文档是这样描述的:

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

简单来讲,它的源码里实现了一个类似计数器的结构,记录每一个在它那里注册过的协程,然后每一个协程完成任务之后需要到它那里注销,然后在主进程那里可以等待直至所有协程完成任务退出。 使用步骤:

  1. 创建一个Waitgroup的实例wg;
  2. 在每个goroutine启动的时候,调用wg.Add(1)注册;
  3. 在每个goroutine完成任务后退出之前,调用wg.Done()注销。
  4. 在等待所有goroutine的地方调用wg.Wait()阻塞进程,知道所有goroutine都完成任务调用wg.Done()注销之后,Wait()方法会返回。

该示例程序是一种golang的select+channel的典型用法,我们来稍微深入一点分析一下这种典型用法:

channel

首先了解下channel,可以理解为管道,它的主要功能点是:

  1. 队列存储数据
  2. 阻塞和唤醒goroutine

channel 实现集中在文件 runtime/chan.go 中,channel底层数据结构是这样的:

type hchan struct {
    qcount   uint           // 队列中数据个数
    dataqsiz uint           // channel 大小
    buf      unsafe.Pointer // 存放数据的环形数组
    elemsize uint16         // channel 中数据类型的大小
    closed   uint32         // 表示 channel 是否关闭
    elemtype *_type // 元素数据类型
    sendx    uint   // send 的数组索引
    recvx    uint   // recv 的数组索引
    recvq    waitq  // 由 recv 行为(也就是 <-ch)阻塞在 channel 上的 goroutine 队列
    sendq    waitq  // 由 send 行为 (也就是 ch<-) 阻塞在 channel 上的 goroutine 队列

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

从源码可以看出它其实就是一个队列加一个锁(轻量),代码本身不复杂,但涉及到上下文很多细节,故而不易通读,有兴趣的同学可以去看一下,我的建议是,从上面总结的两个功能点出发,一个是 ring buffer,用于存数据; 一个是存放操作(读写)该channel的goroutine 的队列。

  • buf是一个通用指针,用于存储数据,看源码时重点关注对这个变量的读写
  • recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。列表的实现是 sudog,其实就是一个对 g 的结构的封装,看源码时重点关注,是怎样通过这两个变量阻塞和唤醒goroutine的

由于涉及源码较多,这里就不再深入。

select

然后是select机制,golang 的 select 机制可以理解为是在语言层面实现了和 select, poll, epoll 相似的功能:监听多个描述符的读/写等事件,一旦某个描述符就绪(一般是读或者写事件发生了),就能够将发生的事件通知给关心的应用程序去处理该事件。 golang 的 select 机制是,监听多个channel,每一个 case 是一个事件,可以是读事件也可以是写事件,随机选择一个执行,可以设置default,它的作用是:当监听的多个事件都阻塞住会执行default的逻辑。

select的源码在runtime/select.go ,看的时候建议是重点关注 pollorder 和 lockorder

  • pollorder保存的是scase的序号,乱序是为了之后执行时的随机性。
  • lockorder保存了所有case中channel的地址,这里按照地址大小堆排了一下lockorder对应的这片连续内存。对chan排序是为了去重,保证之后对所有channel上锁时不会重复上锁。

因为我对这部分源码研究得也不是很深,故而点到为止即可,有兴趣的可以去看看源码啦!

具体到demo代码:consumer为协程的具体代码,里面是只有一个不断轮询channel变量stop的循环,所以主进程是通过stop来通知子协程何时该结束运行的,在main方法中,close掉stop之后,读取已关闭的channel会立刻返回该channel数据类型的零值,因此子goroutine里的<-stop操作会马上返回,然后退出运行。

事实上,通过channel控制子goroutine的方法可以总结为:循环监听一个channel,一般来说是for循环里放一个select监听channel以达到通知子goroutine的效果。再借助Waitgroup,主进程可以等待所有协程优雅退出后再结束自己的运行,这就通过channel实现了优雅控制goroutine并发的开始和结束。

channel通信控制基于CSP模型,相比于传统的线程与锁并发模型,避免了大量的加锁解锁的性能消耗,而又比Actor模型更加灵活,使用Actor模型时,负责通讯的媒介与执行单元是紧耦合的–每个Actor都有一个信箱。而使用CSP模型,channel是第一对象,可以被独立地创建,写入和读出数据,更容易进行扩展。

杀器Context

Context通常被译作上下文,它是一个比较抽象的概念。在讨论链式调用技术时也经常会提到上下文。一般理解为程序单元的一个运行状态、现场、快照,而翻译中上下又很好地诠释了其本质,上下则是存在上下层的传递,上会把内容传递给下。在Go语言中,程序单元也就指的是Goroutine。

每个Goroutine在执行之前,都要先知道程序当前的执行状态,通常将这些执行状态封装在一个Context变量中,传递给要执行的Goroutine中。上下文则几乎已经成为传递与请求同生存周期变量的标准方法。在网络编程下,当接收到一个网络请求Request,在处理这个Request的goroutine中,可能需要在当前gorutine继续开启多个新的Goroutine来获取数据与逻辑处理(例如访问数据库、RPC服务等),即一个请求Request,会需要多个Goroutine中处理。而这些Goroutine可能需要共享Request的一些信息;同时当Request被取消或者超时的时候,所有从这个Request创建的所有Goroutine也应该被结束。

context在go1.7之后被引入到标准库中,1.7之前的go版本使用context需要安装golang.org/x/net/context包,关于golang context的更详细说明,可参考官方文档:context

Context初试

Context的创建和调用关系是层层递进的,也就是我们通常所说的链式调用,类似数据结构里的树,从根节点开始,每一次调用就衍生一个叶子节点。首先,生成根节点,使用context.Background方法生成,而后可以进行链式调用使用context包里的各类方法,context包里的所有方法:

  • func Background() Context
  • func TODO() Context
  • func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
  • func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
  • func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
  • func WithValue(parent Context, key, val interface{}) Context

这里仅以WithCancel和WithValue方法为例来实现控制并发和通信: 话不多说,上码:

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"net/http"
	"sync"
	"time"
)

type favContextKey string

func main() {
	wg := &sync.WaitGroup{}
	values := []string{"https://www.baidu.com/", "https://www.zhihu.com/"}
	ctx, cancel := context.WithCancel(context.Background())

	for _, url := range values {
		wg.Add(1)
		subCtx := context.WithValue(ctx, favContextKey("url"), url)
		go reqURL(subCtx, wg)
	}

	go func() {
		time.Sleep(time.Second * 3)
		cancel()
	}()

	wg.Wait()
	fmt.Println("exit main goroutine")
}

func reqURL(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	url, _ := ctx.Value(favContextKey("url")).(string)
	for {
		select {
		case <-ctx.Done():
			fmt.Printf("stop getting url:%s\n", url)
			return
		default:
			r, err := http.Get(url)
			if r.StatusCode == http.StatusOK && err == nil {
				body, _ := ioutil.ReadAll(r.Body)
				subCtx := context.WithValue(ctx, favContextKey("resp"), fmt.Sprintf("%s%x", url, md5.Sum(body)))
				wg.Add(1)
				go showResp(subCtx, wg)
			}
			r.Body.Close()
			//启动子goroutine是为了不阻塞当前goroutine,这里在实际场景中可以去执行其他逻辑,这里为了方便直接sleep一秒
			// doSometing()
			time.Sleep(time.Second * 1)
		}
	}
}

func showResp(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		select {
		case <-ctx.Done():
			fmt.Println("stop showing resp")
			return
		default:
			//子goroutine里一般会处理一些IO任务,如读写数据库或者rpc调用,这里为了方便直接把数据打印
			fmt.Println("printing ", ctx.Value(favContextKey("resp")))
			time.Sleep(time.Second * 1)
		}
	}
}

前面我们说过Context就是设计用来解决那种多个goroutine处理一个Request且这多个goroutine需要共享Request的一些信息的场景,以上是一个简单模拟上述过程的demo。

首先调用context.Background()生成根节点,然后调用withCancel方法,传入根节点,得到新的子Context以及根节点的cancel方法(通知所有子节点结束运行),这里要注意:该方法也返回了一个Context,这是一个新的子节点,与初始传入的根节点不是同一个实例了,但是每一个子节点里会保存从最初的根节点到本节点的链路信息 ,才能实现链式。

程序的reqURL方法接收一个url,然后通过http请求该url获得response,然后在当前goroutine里再启动一个子groutine把response打印出来,然后从ReqURL开始Context树往下衍生叶子节点(每一个链式调用新产生的ctx),中间每个ctx都可以通过WithValue方式传值(实现通信),而每一个子goroutine都能通过Value方法从父goroutine取值,实现协程间的通信,每个子ctx可以调用Done方法检测是否有父节点调用cancel方法通知子节点退出运行,根节点的cancel调用会沿着链路通知到每一个子节点,因此实现了强并发控制,流程如图: Context调用链路 该demo结合前面说的WaitGroup实现了优雅并发控制和通信,关于WaitGroup的原理和使用前文已做解析,这里便不再赘述,当然,实际的应用场景不会这么简单,处理Request的goroutine启动多个子goroutine大多是处理IO密集的任务如读写数据库或rpc调用,然后在主goroutine中继续执行其他逻辑,这里为了方便讲解做了最简单的处理。

Context作为golang中并发控制和通信的大杀器,被广泛应用,一些使用go开发http服务的同学如果阅读过这些很多 web framework的源码就知道,Context在web framework随处可见,因为http请求处理就是一个典型的链式过程以及并发场景,所以很多web framework都会借助Context实现链式调用的逻辑。有兴趣可以读一下context包的源码,会发现Context的实现其实是结合了Mutex锁和channel而实现的,其实并发、同步的很多高级组件万变不离其宗,都是通过最底层的数据结构组装起来的,只要知晓了最基础的概念,上游的架构也可以一目了然。

context使用规范

最后,Context虽然是神器,但开发者使用也要遵循基本法,以下是一些Context使用的规范:

  • Do not store Contexts inside a struct type; instead, pass a Context explicitly to each function that needs it. The Context should be the first parameter, typically named ctx;不要把Context存在一个结构体当中,显式地传入函数。Context变量需要作为第一个参数使用,一般命名为ctx;

  • Do not pass a nil Context, even if a function permits it. Pass context.TODO if you are unsure about which Context to use;即使方法允许,也不要传入一个nil的Context,如果你不确定你要用什么Context的时候传一个context.TODO;

  • Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions;使用context的Value相关方法只应该用于在程序和接口中传递的和请求相关的元数据,不要用它来传递一些可选的参数;

  • The same Context may be passed to functions running in different goroutines; Contexts are safe for simultaneous use by multiple goroutines;同样的Context可以用来传递到不同的goroutine中,Context在多个goroutine中是安全的;

参考链接

© 著作权归作者所有

潘少online
粉丝 12
博文 58
码字总数 107019
作品 2
深圳
程序员
私信 提问
使用golang的channel的坑

很多时候我们经过使用有缓冲channel作为通信控制的功能,以至有一些误解和坑出现。 误解一:有缓存channel是顺序的 执行下面代码。 package mainimport ( "time" "math/rand")func main(){ c...

梦朝思夕
2017/09/18
0
0
关于golang的协程并行的控制与示例程序

首先明确一个观点并行 并发区别: 并行是指程序的运行状态,要有两个线程正在执行才能算是Parallelism;并发指程序的逻辑结构,Concurrency则只要有两个以上线程还在执行过程中即可。简单地说...

r00txx
2016/08/19
0
2
简单的并发控制 —— WaitGroup

声明:本文仅限于简书发布,其他第三方网站均为盗版,原文地址: 简单的并发控制 —— WaitGroup 在 golang 中,我了解的并发同步主要有两种方式,一种是 channel,另外一种就是我想说的 Wa...

yetship
2017/11/06
0
0
goroutine, channel 和 CSP

引子 老听 clojure 社区的人提起 core.async ,说它如何好用,如何简化了并发编程的模型,不由得勾起了我的好奇心,想了解一番其思想的源头:CSP 模型及受其启发的 goroutine 和 channel 。 ...

wangxuwei
2018/01/29
0
0
使goroutine同步的方法总结

前言: 在前面并发性能对比的文章中,我们可以看到Golang处理大并发的能力十分强劲,而且开发也特别方便,只需要用go关键字即可开启一个新的协程。 但当多个goroutine同时进行处理的时候,就会...

oneHand
2018/01/29
3
0

没有更多内容

加载失败,请刷新页面

加载更多

对集合的理解

开端 同事小G提了一点,Set都是无序的,但是我之前有看到过treeSet是有序的,就和他讨论了起来,还百度了一下,有序。然而他只是淡淡的说自己敲代码验证一下。 TreeSet 循环int类型 1~20,毫...

无极之岚
31分钟前
0
0
Kernel字符设备驱动框架

Linux设备分为三大类:字符设备,块设备和网络设备,这三种设备基于不同的设备框架。相较于块设备和网络设备,字符设备在kernel中是最简单的,也是唯一没有基于设备基础框架(device结构)的...

yepanl
35分钟前
0
0
Ajax

定义 Ajax是一种在无需重新加载整个网页的情况下,能够更新部分网页的技术,用于创建动态网页 Ajax=Asynchronous Javascript And XML(异步的JavaScript和XML) 原理 XMLHttpRequest对象(异...

星闪海洋
昨天
2
0
Jenkins 中文本地化的重大进展

本文首发于:Jenkins 中文社区 我从2017年开始,参与 Jenkins 社区贡献。作为一名新成员,翻译可能是帮助社区项目最简单的方法。 本地化的优化通常是较小的改动,你无需了解项目完整的上下文...

Jenkins中文社区
昨天
3
0
Spring中如何使用设计模式

关于设计模式,如果使用得当,将会使我们的代码更加简洁,并且更具扩展性。本文主要讲解Spring中如何使用策略模式,工厂方法模式以及Builder模式。 1. 策略模式 关于策略模式的使用方式,在S...

爱宝贝丶
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部