文档章节

goroutine的退出

秋风醉了
 秋风醉了
发布于 2016/07/24 17:59
字数 1271
阅读 42
收藏 0

goroutine的退出

有时候我们需要通知goroutine停止它正在干的事情,比如一个正在执行计算的web服务,然而它的客户端已经断开了和服务端的连接。

Go语言并没有提供在一个goroutine中终止另一个goroutine的方法,由于这样会导致goroutine之间的共享变量落在未定义的状态上。

如果我们想要退出两个或者任意多个goroutine怎么办呢?

假设有一个abort channel,所有的goroutine订阅这个channel,可以向这个channel发送发送和goroutine数目一样多的事件来退出它们。如果这些goroutine中已经有一些自己退出了,那么会导致我们的channel里的事件数比goroutine还多,这样导致我们的发送直接被阻塞。另一方面,如果这些goroutine又生成了其它的goroutine,我们的channel里的数目又太少了,所以有些goroutine可能会无法接收到退出消息。一般情况下我们是很难知道在某一个时刻具体有多少个goroutine在运行着的。另外,当一个goroutine从abort channel中接收到一个值的时候,他会消费掉这个值,这样其它的goroutine就没法看到这条信息。为了能够达到我们退出goroutine的目的,我们需要更靠谱的策略,来通过一个channel把消息广播出去,这样goroutine们能够看到这条事件消息,并且在事件完成之后,可以知道这件事已经发生过了。

回忆一下我们关闭了一个channel并且被消费掉了所有已发送的值,操作channel之后的代码可以立即被执行,并且会产生零值。我们可以将这个机制扩展一下,来作为我们的广播机制:不要向channel发送值,而是用关闭一个channel来进行广播。

简单的来说广播机制的原理就是通过关闭channel,这样对该channel的读取操作都不会阻塞,而是会得到一个零值。通过关闭channel来广播消息事件。

简单的用代码表示如下,

package main

import (
	"fmt"
	"os"
	"time"
)

var done = make(chan struct{})

func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

func main() {

	// Cancel traversal when input is detected.
	go func() {
		os.Stdin.Read(make([]byte, 1)) // read a single byte
		close(done)
	}()

	for {
		if cancelled() {
			fmt.Println("cancell")
			return
		} else {
			fmt.Println("press enter to cancell")
		}

		time.Sleep(1000 * time.Millisecond)
	}
}

运行结果,

➜  close git:(master) ✗ go run close.go
press enter to cancell
press enter to cancell
press enter to cancell
press enter to cancell

cancell

那么我们可以利用这个广播机制来关闭所有的goroutine。

首先来看一段代码,

package main

import (
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"time"
)

var done = make(chan struct{})

func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
	for _, entry := range dirents(dir) {
		if entry.IsDir() {
			subdir := filepath.Join(dir, entry.Name())
			walkDir(subdir, fileSizes)
		} else {
			fileSizes <- entry.Size()
		}
	}
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
	entries, err := ioutil.ReadDir(dir)
	if err != nil {
		fmt.Fprintf(os.Stderr, "du1: %v\n", err)
		return nil
	}
	return entries
}

func main() {

	// Cancel traversal when input is detected.
	go func() {
		os.Stdin.Read(make([]byte, 1)) // read a single byte
		close(done)
	}()
	var roots = []string{"/Users/xinxingegeya"}

	// Traverse the file tree.
	fileSizes := make(chan int64)
	go func() {
		for _, root := range roots {
			walkDir(root, fileSizes)
		}
		close(fileSizes)
	}()

	// Print the results.

	tick := time.Tick(1 * time.Second)
	var nfiles, nbytes int64

loop:
	for {
		select {
		case size, ok := <-fileSizes:
			if !ok {
				break loop // fileSizes was closed
			}
			nfiles++
			nbytes += size
		case <-tick:
			printDiskUsage(nfiles, nbytes)
		}
	}
	printDiskUsage(nfiles, nbytes) // final totals
}

func printDiskUsage(nfiles, nbytes int64) {
	fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

这段代码,会遍历指定目录,计算出该目录下文件的数目和所用空间的大小。这段代码还不是并发的执行,我们下面改成并发的执行,并且通过上面所说的广播机制来中断所有运行中的goroutine退出计算任务。

package main

import (
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"sync"
	"time"
)

var done = make(chan struct{})

// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)

func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
	defer n.Done()
	if cancelled() {
		return
	}
	for _, entry := range dirents(dir) {
		if entry.IsDir() {
			n.Add(1)
			subdir := filepath.Join(dir, entry.Name())
			go walkDir(subdir, n, fileSizes)
		} else {
			fileSizes <- entry.Size()
		}
	}
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
	select {
	case sema <- struct{}{}: // acquire token
	case <-done:
		return nil // cancelled
	}
	defer func() { <-sema }() // release token
	entries, err := ioutil.ReadDir(dir)
	if err != nil {
		fmt.Fprintf(os.Stderr, "du1: %v\n", err)
		return nil
	}
	return entries
}

func main() {

	// Cancel traversal when input is detected.
	go func() {
		os.Stdin.Read(make([]byte, 1)) // read a single byte
		close(done)
	}()
	var roots = []string{"/Users/xinxingegeya"}

	// Traverse the file tree.
	fileSizes := make(chan int64)

	var n sync.WaitGroup
	for _, root := range roots {
		n.Add(1)
		go walkDir(root, &n, fileSizes)
	}
	go func() {
		n.Wait()
		close(fileSizes)
	}()

	// Print the results.
	tick := time.Tick(1 * time.Second)
	var nfiles, nbytes int64

loop:
	for {
		select {
		case <-done:
			// Drain fileSizes to allow existing goroutines to finish.
			for range fileSizes {
				// Do nothing.
			}
			return
		case size, ok := <-fileSizes:
			if !ok {
				break loop // fileSizes was closed
			}
			nfiles++
			nbytes += size
		case <-tick:
			printDiskUsage(nfiles, nbytes)
		}
	}
	printDiskUsage(nfiles, nbytes) // final totals
}

func printDiskUsage(nfiles, nbytes int64) {
	fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

运行结果,

➜  interrupt git:(master) ✗ go run interrupt.go
du1: open /Users/xinxingegeya/Library/Saved Application State/com.bitrock.appinstaller.savedState: permission denied
11553 files  32.6 GB
27593 files  44.1 GB
27929 files  44.2 GB
56182 files  46.3 GB
70592 files  48.2 GB
85680 files  49.9 GB
97835 files  49.9 GB
110396 files  49.9 GB
119635 files  49.9 GB

当按下enter键后,程序会退出。

===========END===========

© 著作权归作者所有

秋风醉了
粉丝 252
博文 532
码字总数 405694
作品 0
朝阳
程序员
私信 提问
goroutine退出方式的总结

goroutine的退出机制 大家都知道goroutine是Go语言并发的利器,通过goroutine我们可以很容易的编写高并发的程序。但是goroutine设计的退出机制是由goroutine自己退出,不能在外部强制结束一个...

mickelfeng
2018/10/25
0
0
Goroutine退出时如何安全调用清理函数?

今天遇到一个问题(应该算是坑吧): 在Goroutine中退出时调用RPC的Close函数, 但是server总是提示网络非正常退出. 最终发现是Goroutine退出时调用Close可能导致阻塞, 阻塞导致Goroutine切换到m...

chai2010
2013/04/26
620
0
Go基础编程:并发编程—goroutine

1 goroutine是什么 goroutine是Go并行设计的核心。goroutine说到底其实就是协程,但是它比线程更小,十几个goroutine可能体现在底层就是五六个线程,Go语言内部帮你实现了这些goroutine之间的...

tennysonsky
2018/01/15
0
0
如何优雅的控制goroutine的数量

1,为什么要控制goroutine的数量? goroutine固然好,但是数量太多了,往往会带来很多麻烦,比如耗尽系统资源导致程序崩溃,或者CPU使用率过高导致系统忙不过来。比如: 2,用什么方法控制gor...

borey
2016/05/27
7.1K
12
Go圣经-学习笔记之并发的字典遍历

上一篇 Go圣经-学习笔记之select多路复用 面试题:输入目录或者文件,求目录或者文件的磁盘占用大小,类似du命令? 编程实现 改进一 实时输出 上面这个DEMO,如果目录或者文件比较大的话,用...

cdh0805010
2017/10/29
265
0

没有更多内容

加载失败,请刷新页面

加载更多

一起来学Java8(四)——复合Lambda

在一起来学Java8(二)——Lambda表达式中我们学习了Lambda表达式的基本用法,现在来了解下复合Lambda。 Lambda表达式的的书写离不开函数式接口,复合Lambda的意思是在使用Lambda表达式实现函...

猿敲月下码
34分钟前
9
0
debian10使用putty配置交换机console口

前言:Linux的推广普及,需要配合解决实际应用方能有成效! 最近强迫自己用linux进行实际工作,过程很痛苦,还好通过网络一一解决,感谢各位无私网友博客的帮助! 系统:debian10 桌面:xfc...

W_Lu
今天
10
0
aelf Enterprise 0.8.0 beta有奖公测,“Bug奖金计划”重磅开启

2019年9月30日,aelf Enterprise 0.8.0 beta版正式发布。aelf Enterprise 0.8.0 beta是一个完备的区块链系统, 包含完备的区块链系统、开发套件、开发文档、以及配套的基础应用和基础服务。 ...

AELF开发者社区
今天
10
0
oracle 初始化数据库脚本

create user lpf identified by 123456; create tablespace lpf_ts_cms datafile '/opt/app/oracle/product/11.2.0/lpf.dbf' size 200M; alter user lpf default tablespace lpf_ts_cms; sel......

internetafei
今天
8
0
深入了解Redis底层数据结构

说明 说到Redis的数据结构,我们大概会很快想到Redis的5种常见数据结构:字符串(String)、列表(List)、散列(Hash)、集合(Set)、有序集合(Sorted Set),以及他们的特点和运用场景。不过它们是...

TurboSanil
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部