Kotlin 协程 coroutines

原创
2018/01/25 15:29
阅读数 4.4K

Kotlin分享(一)

Kotlin分享(二)

Kotlin分享(三)

Kotlin分享(四)

Kotlin分享(五)

Kotlin 协程 coroutines

前言

    首先,如果要玩协程,协程项目的地址肯定是要知道的https://github.com/Kotlin/kotlinx.coroutines

    kotlin并没有将协程加入标准库中,而是放在了额外的组件库中,如果想要使用协程,那么上面地址当然是必须要的。

正文

第一个协程

fun coroutinesTest(){
    launch { // launch new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        Log.d("sss","World!") // print after delay
    }
    Log.d("sss","Hello!")  // main thread continues while coroutine is delayed
}

    如上代码,我们使用launch创建了我们第一个协程,该协程的内容是等待一秒,然后输出World。

    我们可以使用Thread来修改这段代码

fun threadTest(){
    thread{
        Thread.sleep(1000L)
        Log.d("sss","World!")
    }
    Log.d("sss","Hello!")  // main thread continues while coroutine is delayed
}

    和上面的代码效果是完全相同的。所以说,协程和线程有相同的地方。

阻塞

    说到线程,有一个关键概念是阻塞,比如IO调用,会阻塞当前线程,这就是为什么我们需要将IO和网络操作移出主线程的原因,Thread.sleep是一个典型的阻塞方法。而在携程中使用的是delay来实现等待,delay不会阻塞线程,它是一种叫做 suspending function 的方法,它只能在携程中被调用,用于暂停协程。

    PS : 协程的实现底层确实也有通过线程来做的(并不是全部),所以我们需要将线程和协程的概念区分开来,协程 != 线程

    如果我们想要阻塞当前线程,以前我们会使用Thread.sleep,不过在有了协程之后,我们还可以使用

fun main(args: Array<String>) { 
    .....
    runBlocking {     // but this expression blocks the main thread
        delay(2000L)  // ... while we delay for 2 seconds to keep JVM alive
    } 
}

    runBlocking方法的操作是创建一个新的协程运行,并且阻塞当前线程,直到协程中的工作结束。

    所以上面代码中,main方法所在的线程会被block,直到新协程delay(2000L)完成以后。关于runBlocking还有一种写法

fun coroutinesTest3() = runBlocking<...>{
    ....
    delay(1000L)
    ...
    return@runBlocking ...
}

    从kotlin的语法上来说很好理解,coroutinesTest3只做了一件事情,就是启动一个blocking的协程。

    PS:这里有一段代码

fun coroutinesTest2(){
    launch { // launch new coroutine in background and continue
        Log.d("sss",Thread.currentThread().name)
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        Log.d("sss","World!") // print after delay
    }
    Log.d("sss","Hello!")  // main thread continues while coroutine is delayed
    runBlocking {
        Log.d("sss",Thread.currentThread().name)
        delay(10000L)
        Log.d("sss","I love qqq!")
    }
    Log.d("sss",Thread.currentThread().name)
    Log.d("sss","I love money!")
}

    有两个协程launch启动的和runBlocking启动的,分别打印当前线程,和函数的线程,发现launch启动了一个新的线程,但是runBlocking的线程和当前函数的线程一样,所以这里也说明了协程 != 线程。

Wait等待

    当调用launch之后,会返回一个Job类型的对象来表示这项工作,我们可以使用job的join方法等待这个工作完成。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch { // launch new coroutine and keep a reference to its Job
        delay(1000L)
        println("World!")
    }
    job.join() // wait until child coroutine completes
    println("Hello,")   
}

    上面方法会先打赢 World 再打印Hello 这就是等待的作用。

suspend方法

    现代开发嘛,总归是有很多方法的,我们很难将所有的协程操作都直接写在方法体中,大多数时候还是会分散成各个方法。比如上面代码:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch { // launch new coroutine and keep a reference to its Job
        printWorld()
    }
    job.join() // wait until child coroutine completes
    println("Hello,")   
}

fun printWorld(){
    delay(1000L)
    println("World!")
}

    但是如果我们这么写了,那么编译器会提示错误,,delay属于suspend方法,所以无法在普通方法中调用,所以printWorld函数编译不会通过。解决方法很简单,我们只需要在printWorld方法前添加suspend前缀即可!

    一旦我们给方法添加suspend前缀,那么这个方法就变成了协程专用方法,我们无法在非协程中使用,否则就会报错。

 

协程是轻量级异步

    看如下代码

fun coroutinesTest5()= runBlocking<Unit>{
    val jobs = List(10000) { // launch a lot of coroutines and list their jobs
        launch {
            Log.d("sss",Thread.activeCount().toString())
            delay(1000L)
            Log.d("sss",".")
        }
    }
    jobs.forEach { it.join() } // wait for all jobs to complete
}

    很凶残,我们开了1万个协程,每个协程等待一秒,输出一个点好,然后主协程中等待所有协程完成后继续运行。

    使用协程,虽然我们起了超多协程,但是如果打印的activeCount大概只有10个,不会再增多。而如果这里将协程替换为线程,那么……绝大多数情况会出现out of memory的错误。

    所以从这里也看出,协程并不仅仅是线程。

cancel

    我们可以在job上调用cancel() 或者cancelAndJoin停止一个协程。

    协程的cancel和线程的cancel非常类似,它并不会强制停止协程,而是在协程运算结束的时候停止,比如

fun cancelTest()= runBlocking<Unit>{
    val startTime = System.currentTimeMillis()
    val job = launch {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // computation loop, just wastes CPU
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

    上面代码,协程重复循环,并且根据时间打印,此时,协程一直在进行工作占用cpu,当调用cancel之后不会立即停止,而是会等到运算结束。

    所以说想要cancel一个协程,需要协程自己配合,有两种方式:

    方法1 :定期调用一个 kotlinx.coroutines 中定义的suspend方法(yield是一个不错的选择),它们都会检测当前协程的状态,如果是cancel状态就会停止运行,并且抛出CancellationException。

    方法2:自己检测当前协程状态,比如将上面 while(i<5) 改成 while(isActive),使用isActive来判断当前协程状态。

    正如上面所说的,系统定义的suspend方法如果遇到cancel会抛出CancellationException,我们可以通过try-catch来捕捉这个异常,并且做一些后续处理(线程操作也是这么做的。)

    但是这里有个问题,如果我们需要在catch或者finally中调用suspend方法怎么办?毫无疑问,直接调用还是会报出CancellationException错误。解决方案是使用run(NonCancellable)

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch {
        try {
            ...
        } finally {
            run(NonCancellable) {
                ... //可以调用suspends方法
            }
        }
    }
    ...
}

    但是一般情况下这是不应该出现的,因为关闭文件或者其他结束操作都不应该调用到系统的suspends方法。

Timeout

    什么情况下我们会cancel一个协程,一般是运行时间过长。对于一个协程,我们可以设置它的超时时间,所以相比于手动cancel,timeout或许拥有更好的效果。另外timeout和cancel一样,同样需要协程配合,在运算过程中无法停止。

fun main(args: Array<String>) = runBlocking<Unit> {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

    这段代码,在输出3次之后就会停止,并且报出 TimeoutCancellationException。

    TimeoutCancellationException是CancellationException,但是和CancellationException并不一样,有一个奇怪的地方不得不说一下。

fun timeoutTest() = runBlocking<Unit> {
    try {
        withTimeout(1300L) {
            try {
                repeat(1000) { i ->
                    println("I'm sleeping $i ...")
                    delay(500L)
                }
            } catch (e: TimeoutCancellationException) {
                println("I'm TimeoutCancellationException IN")
            }
        }
    }catch(e: TimeoutCancellationException){
        println("I'm TimeoutCancellationException OUT")
    }
    println("main: I'm continue do something")
}

    上面代码,想象输出值应该是什么样的?

    首先打印 main: I'm continue do something

    然后打印3次 I'm sleeping

    再然后 输出 I'm TimeoutCancellationException IN

    真的是这样吗?实际的运行结果可能出乎意料

I/System.out: I'm sleeping 0 ...
I/System.out: I'm sleeping 1 ...
I/System.out: I'm sleeping 2 ...
I/System.out: I'm TimeoutCancellationException IN
I/System.out: I'm TimeoutCancellationException OUT
I/System.out: main: I'm continue do something

    和我们想象的完全不一样,首先协程内外都会收到一个TimeoutCancellationException这就很有意思了。

    还有一点,withTimeout方法竟然阻塞了线程,直到超时结束后才继续执行下面的代码!

 

异步执行

    假设我们有两个方法,分别做两件事情,这里我们用假的方法来代替一下


suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

    假设,现在我们需要计算两个方法的返回值的和。普通的实现是我们需要先等doSomethingUsefulOne执行完之后再执行doSomethingUsefulTwo。有时候,我们的步骤2执行需要依赖于步骤一的结果,那么这也是我们唯一的实现方案。像这样的,我们只需要在协程中顺序调用就行了

fun measureTimeMillisTest(){
    launch {
        val time = measureTimeMillis {
            val one = doSomethingUsefulOne()
            val two = doSomethingUsefulTwo()
            println("The answer is ${one + two}")
        }
        println("Completed in $time ms")
    }
}

    其中measureTimeMillis只是用来计算块中的代码执行时间的。

    但是在很多时候我们实际的希望是能够让doSomethingUsefulOne和doSomethingUsefulTwo同时开始执行,全部完成后返回结果。

    对于java线程,我们或许可以使用Future这种东西来实现,那么在协程中呢?首先想到的也是launch两个协程,然后等待完成,但是这里有一个问题,launch方法返回一个Job类型,并不会返回值,这个时候async就出场了。

fun asyncTest() = runBlocking<Unit> {
    var num1 = async { doSomethingUsefulOne() }
    var num2 = async { doSomethingUsefulTwo() }
    Log.d("sss","the answer is ${num1.await()+num2.await()}")
}

    async就像launch会开启一个轻量级的协程,但是不同点在于会返回一个Deferred<T>类型的对象,它继承与Job。通过await方法等待完成之后,会返回async方法中定义的代码块的返回值。(这里T的类型如果不清楚,建议参看kotlin的lambda部分。)

Lazy Async

    async还有一个lazy用法   

async(start = CoroutineStart.LAZY) { ... }

    async中的代码块并不会立即执行,只有调用 await或者start之后才会运行。

 

CoroutineContext

    所有的协程都运行在由CoroutineContext类型表示的上下文环境中,在CoroutineContext中包含了几个比较重要的部分,包括前面我们已经看到Job对象,还有一个就是下面即将介绍的Dispatchers。

    在创建一个协程的时候,launch也好,async也好,都有一个参数CoroutineContext。CoroutineContext类型是一个接口,Dispatcher都会继承这个接口,所以在使用launch的时候我们会直接传入一个Dispatcher。

Dispatchers

    dispatcher用来决定协程在哪个线程或者线程池上运行。

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
        println("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
        println("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

运行结果 

'Unconfined': I'm working in thread main
      'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
          'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main

    我们之前所用的 CoroutineDispatcher 都是DefaultDispatcher (缺省值), 在当前的Kotlin版本中DefalutDispatcher就等于CommonPool

    另外如上的newSingleThreadContext有一个非常需要注意的地方,该方法会开启一个只有一个线程的线程池,所以从是一个非常昂贵的方法,在应用中,我们需要手动调用close方法释放资源,或者把他定义在程序生命周期中复用。   

val counterContext:ThreadPoolDispatcher = newSingleThreadContext("CounterContext")
counterContext.close()

 

Unconfined 

    Unconfied Dispatcher 会在定义协程的线程直接调用。但是在遇到第一个暂停点之后,恢复的线程是不确定的。所以对于unconfied Dispatcher其实是无法保证全都在当前线程中调用的。

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("      'Unconfined': After delay in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

输出

      'Unconfined': I'm working in thread main
      'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor

    发现在delay前后并不是运行在同一个线程中。所以在需要消耗cpu或者修改共享数据的情况下并不适合使用。

 

coroutineDispatcher

    corutineDispatcher 和其他几个有点不一样,他并不是一个类,而是CoroutineScope的成员变量。CoroutineScope用来表示当前的协程。所以,如果当前是在非协程中,那么就无法使用corutineDispatcher来当做Dispatcher。

    再来说说corutineDispatcher的作用,他表示父协程的Dispatcher,也就是,新建一个和当前父协程使用同一个Dispatcher的协程。比如

fun main(args: Array<String>) = runBlocking<Unit> {
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

    这个协程就会运行在runBlocking的Dispatcher,于是这个协程就会运行在当前线程中。

    另外,一旦使用了coroutineContext就出现了父子关系,一旦父协程的Job被停止,那么子协程的Job也会被停止

fun main(args: Array<String>) = runBlocking<Unit> {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        // it spawns two other jobs, one with its separate context
        val job1 = launch {
            println("job1: I have my own context and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // and the other inherits the parent context
        val job2 = launch(coroutineContext) {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
        // request completes when both its sub-jobs complete:
        job1.join()
        job2.join()
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

    输出是

job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

    CoroutineContext还重写了 + 方法,操作符右侧的Coroutine Context会替换操作符左侧的Coroutine Context,比如

fun main(args: Array<String>) = runBlocking<Unit> {
    // start a coroutine to process some kind of incoming request
    val request = launch(coroutineContext) { // use the context of `runBlocking`
        // spawns CPU-intensive child job in CommonPool !!! 
        val job = launch(coroutineContext + CommonPool) {
            println("job: I am a child of the request coroutine, but with a different dispatcher")
            delay(1000)
            println("job: I will not execute this line if my parent request is cancelled")
        }
        job.join() // request completes when its sub-job completes
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

 我在launch中使用CommonPool替代了父协程中的 coroutineContext。虽然替换了,但是父子关系没有解除,所以如果我停止父协程,子协程也会被cancel。

withContext

fun main(args: Array<String>) {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
}

    首先是newSingleThreadContext,这个方法我们之前遇到过,实际上就是创建了一个ThreadPoolDispatcher(同时继承与CoroutineContext)。

    我们之前说过,使用newSingleThreadContext会创建一个线程池,需要调用close方法。use的作用就是这样,他是一个辅助方法,和协程并不相关,所有继承与closeable的对象都能调用该方法。表示使用完之后自动close掉。是个实用的辅助方法。

    所以,前面两个newSingleThreadContext相当于创建了两个ThreadPoolDispatcher对象,分别是ctx1,ctx2。runBlocking创建一个协程,并且使用ctx1。

    关键知识点在于,在协程中,可以通过调用withContext来进行CoroutineContext的切换(同时也会切换Dispatcher)。注意,他会阻塞协程,等到block中的内容运行完成之后返回,可以有返回值。

 

父子关系

    前面已经说到了父子协程的简历,一旦建立了父子协程。那么父协程会一直存在,直到所有子协程结束为止

fun main(args: Array<String>) = runBlocking<Unit> {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        repeat(3) { i -> // launch a few children jobs
            launch(coroutineContext)  {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
                println("Coroutine $i is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join() // wait for completion of the request, including all its children
    println("Now processing of the request is complete")
}


//request: I'm done and I don't explicitly join my children that are still active
//Coroutine 0 is done
//Coroutine 1 is done
//Coroutine 2 is done
//Now processing of the request is complete

 

生命周期管理

    对于协程,我们经常需要管理他们的生命周期,比如在android开发中,在一个界面(activity)上开启了几个协程,用来加载数据,或者一些其他处理,在退出这个activity时,我们需要确保cancel协程,防止一些协程持续占用内存,甚至内存泄漏的情况。

    一种方法是我们记录下所有协程,然后在界面退出时手动调用所有协程的cancel。但是这种方法是不是显得有些复杂,容易遗漏?仔细想想,前面的父子关系似乎能解决这个问题,如果所有的协程都是一个总的协程的子协程,那么在哪个协程的Job上调用cancel就能直接cancel所有的协程。

    但是这样也有弊端,保证所有协程都是一个协程的子协程,本身这个写法在实现上就有困难,所有对于这种情况,kotlin还提供了一种其他的简历父子关系的方式

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = Job() // create a job object to manage our lifecycle
    // now launch ten coroutines for a demo, each working for a different time
    val coroutines = List(10) { i ->
        // they are all children of our job object
        launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
            delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
            println("Coroutine $i is done")
        }
    }
    println("Launched ${coroutines.size} coroutines")
    delay(500L) // delay for half a second
    println("Cancelling the job!")
    job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}

    在job对象上的cancel和await都能够作用在所有的子协程上。

 

Channels

    之前说的都是一个一个协程独立工作,但是实际使用中协程经常会相互协作,比如生产者消费者。这其中很重要的一个东西就是协程之间的数据通信。Channel就是协程数据通信的一个重要手段。类似于只有一个元素的BlockingQueue.

fun channelTest() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        Log.d("sss","start send value")
        for (x in 1..5) {
            Log.d("sss","send value ${x*x}")
            channel.send(x * x)
        }
        Log.d("sss","end send value")
    }
    // here we print five received integers:
    repeat(5) {  Log.d("sss",channel.receive().toString()) }
    Log.d("sss","Done!")
}

//  D/sss: start send value
//  D/sss: send value 1
//  D/sss: send value 4
//  D/sss: 1
//  D/sss: send value 9
//  D/sss: 4
//  D/sss: 9
//  D/sss: send value 16
//  D/sss: send value 25
//  D/sss: 16
//  D/sss: 25
//  D/sss: Done!
//  D/sss: end send value

    不论是send还是receive都会阻塞协程。

 

    channel可以通过close来进行关闭,相当于发送了一个结束标识符。另外,channel也有一个iteration,可以通过for循环获取值,直到遇到结束符。

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

    channel的默认容量是1,所以没有缓存,但是在定义时可以设置缓存数量。

生产者和消费者

    理论上我们使用channel也能够实现生产者和消费者的逻辑,不过kotlin还提供了一套更加简单的方法

fun produceSquares() = produce<Int>() {
    for (x in 1..5) {
        Log.d("sss","send value ${x*x}")
        send(x * x)
    }
}

fun produceTest() = runBlocking<Unit> {
    val squares = produceSquares()
    squares.consumeEach { Log.d("sss",it.toString()) }
    Log.d("sss","Done!")
}

    使用produce和consumeEach来完成生产消费的过程。produce方法会开启一个新的协程,并且运行block中的内容,使用send填入数据。produce方法会返回一个ReceiveChannel对象,继承与Job,所以也能够调用cancel等方法。在ReceiveChannel对象上,通过调用consumeEach获取生产内容,没有内容时会进入暂停协程。

    我们也可以在produce上设置缓存,比如produce<Int>(5)这样,但是这里经过测试发现,一旦我退出了produce的生产协程,那么数据通道会就被关闭,consumeEach无法再获取数据,即使还有数据没有被消费掉。(这就很奇怪了,难道是bug?不是,后面会看到这样设计的一个作用。)

 

管道

    上面的生产者模式作用是 a 生产 b消费。但是这太基础了,很多时候会出现 a 生产原料1,b消费原料1,生产出2,c消费2,这个时候数据就像是在管道中通行一样。kotlin中可以直接使用ReceiveChannel来实现,相比java简单不少。

fun produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
    for (x in numbers) send(x * x)
}

fun pipelineTest(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    squares.cancel() // need to cancel these coroutines in a larger app
    numbers.cancel()
}

    produceNumbers负责生产最基础的数据,square接收最基础的数据,进行一次包装继续发送,最后在pipelineTest方法中接收最终数据,由此形成了一个管道。

    下面还有一个例子,实现了寻找质数的粗糙实现

fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
    for (x in numbers) if (x % prime != 0) send(x)
}
fun primeNumberTest(args: Array<String>) = runBlocking<Unit> {
    var cur = numbersFrom(coroutineContext, 2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(coroutineContext, cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

    我们不关心算法的实现是否高明,主要是对管道的应用,这里实现了如下的基本模型

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 

    numbersFrom生产从2开始的所有数据,然后使用filter过滤能被2,4,5,7等整除的数字,我们的元数据经过了多层管道。 

    因为所有的协程使用同一个context,所以可以使用cancelChildren同时结束所有协程,方便管理。

多消费协程

    有时候生产者生产数据,但是会出现多个消费者,消费者之间互相竞争数据。

fun produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("Processor #$id received $it")
    }    
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

    上面代码我们创建了5个processor都去监听produce的channel,没有收到数据的会进入暂停状态,直到收到数据为止。

    有一个值得注意在于这里通过producer.cancel结束了生产协程,生产协程结束,channel会被自动关闭,这个时候,所有监听这个channel的协程都会自动停止。

多生产者

    存在多消费者,那么肯定也会存在多生产者的情况,在多生产者中,无法使用produce方法自动开辟协程和channel放入数据了,所以需要手动创建一个channel,个个协程往其中填入数据。

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch(coroutineContext) { sendString(channel, "foo", 200L) }
    launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

    

同步问题

    在多线程同步中有个经典问题就是,多线程同时修改一个值,会出现脏数据问题。

    比如一百个线程同时开始对a进行+1操作,进行一百次。理论上最终值应该是10000,但是大多数情况下并不会等于10000,这就是脏数据。而开发者常常存在一个误区,使用Volatile来解决这个问题。但是实际上这是没有效果,同样无法得到想要的10000的结果。

    volatile并不能保证整个赋值过程都具有原子性,详细的原理并不在本篇的讨论范围之中。

    线程存在这个问题,不幸的是,协程同样存在这样的问题(毕竟协程也会用到线程)。

线程安全对象

    为了解决上面所说的同步问题,很多时候我们会使用线程安全的对象(比如原子操作对象,synchronized对象等)。

    比如这里我们的count如果不使用int,而是用AtomicInteger类型,就能解决同步问题。AtomicInteger保证了其中的增加方法的原子性。

    实际上这也是解决上述问题速度最快,效率最高的方法。

线程约束

    很多时候,我们需要的操作可能比较复杂,并没有提供原子操作的解决方案,这个时候我们可以使用线程约束,这在很多UI操作的时候会用到。

    其实协程的所谓线程约束并不复杂,只是将操作都放入同一个线程进行,这样就不会出现同步问题了……还真是简单直接的解决方式……

    至于实现线程约束的方案实际上我们在之前有过学习,我们知道Context会决定当前协程所在的线程,另外我们知道使用withContext可以用来切换协程运行时的Context。

//开启1000个协程,重复action1000次
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
    val n = 1000 // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch(context) {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) { // run each coroutine in CommonPool
        withContext(counterContext) { // but confine each increment to the single-threaded context
            counter++
        }
    }
    println("Counter = $counter")
}

//或者
//fun main(args: Array<String>) = runBlocking<Unit> {
//    massiveRun(counterContext) { // run each coroutine in CommonPool
//            counter++
//    }
//    println("Counter = $counter")
//}

 

    对于原来的线程来说,所谓的线程约束听上去就像旁门左道一样……放在一个线程里我还多线程个毛线?所以多线程的处理方式多是同步锁,synchronized也好,Lock也好。

    对于协程来说,当然也有锁这种操作,Mutex,这是一种协程锁,会暂停协程,但是不会使线程休眠

val mutex = Mutex()
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        mutex.withLock {
            counter++        
        }
    }
    println("Counter = $counter")
}

    实际上mutex是有lock和unlock方法的,比如lock(mutex),操作结束后unlock(mutex)。不过kotlin将这个操作封装在了mutex.withLock中。

public inline suspend fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
    lock(owner)
    try {
        return action()
    } finally {
        unlock(owner)
    }
}

 

Actor

    kotlin中提供了一种特殊的东西,如果你对android有了解,那么或许你知道handler和message。表示消息处理者和消息。

    我们通过某些操作向handler这个消息处理者发送message消息。仔细想想的话,这种操作也能一定程度上解决同步问题,当然他还有更广大的用途。

    kotlin使用一种actor实现了这个操作,actor实际上也是一个协程,并且带有一个channel,我们可以向这个channel发送消息,而actor自己的协程负责接收消息,并作出相应处理。

sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
fun counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun actorTest(args: Array<String>) = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    massiveRun(CommonPool) {
        counter.send(IncCounter)
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}

    比如counterActor方法定义了一个actor,返回值会是一个SendChannel.然后其他协程向这个sendchannel发送消息。actor负责的协程就处理消息。

 

选择表达式 Select

    select表达式能够同时等待多条协程,并且选择第一个运行的协程。等待方式也有多种,下面会介绍。

    select是一个协程方法,他会暂停当前协程,等到select返回!!

 等待channel

    这是select比较常用也是比较简单的方式,

fun fizz(context: CoroutineContext) = produce<String>(context) {
    while (true) { // sends "Fizz" every 300 ms
        delay(300)
        send("Fizz")
    }
}

fun buzz(context: CoroutineContext) = produce<String>(context) {
    while (true) { // sends "Buzz!" every 500 ms
        delay(500)
        send("Buzz!")
    }
}

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // <Unit> means that this select expression does not produce any result
        fizz.onReceive { value ->  // this is the first select clause
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->  // this is the second select clause
            println("buzz -> '$value'")
        }
    }
}

fun selectChannelTest(args: Array<String>) = runBlocking<Unit> {
    val fizz = fizz(coroutineContext)
    val buzz = buzz(coroutineContext)
    repeat(7) {
        selectFizzBuzz(fizz, buzz)
    }
    coroutineContext.cancelChildren() // cancel fizz & buzz coroutines    
}

//输出
//fizz -> 'Fizz'
//buzz -> 'Buzz!'
//fizz -> 'Fizz'
//fizz -> 'Fizz'
//buzz -> 'Buzz!'
//fizz -> 'Fizz'
//buzz -> 'Buzz!'

 

等待close

    如果使用onReceive方法,当select等到一个channel时,如果这个channel被关闭了,那么select就会报出一个错误。我们可以使用onReceiveOrNull来捕获这个关闭消息。

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
        select<String> {
            a.onReceiveOrNull { value ->
                if (value == null)
                    "Channel 'a' is closed"
                else
                    "a -> '$value'"
            }
            b.onReceiveOrNull { value ->
                if (value == null)
                    "Channel 'b' is closed"
                else
                    "b -> '$value'"
            }
        }

fun selectNullTest(args: Array<String>) = runBlocking<Unit> {
    // we are using the context of the main thread in this example for predictability ...
    val a = produce<String>(coroutineContext) {
        repeat(4) { send("Hello $it") }
    }
    val b = produce<String>(coroutineContext) {
        repeat(4) { send("World $it") }
    }
    repeat(8) { // print first eight results
        println(selectAorB(a, b))
    }
    coroutineContext.cancelChildren()
}
//输出
a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed

    这个输出多少有些奇妙,但是是能够解释的。首先,select如果有多个receive,当多个协程同时可用时,会选择第一个,所以这里就会先选择a。我们知道send会暂停协程,直到channel重新可写,所以这之后b有机会被select。

    关于close的输出有个误会,你以为是coroutineContext.cancelChildren() 这就话结束了协程导致close的吗? 当然不是!而是协程a在repeat(4)发送了4个消息之后就结束了。协程结束之后channel自动关闭,所以会出现close。最后一句cancelChildren实际上作用是保证所有协程都已经关闭,释放资源的……

Select Send

    select除了能够使用onReceive来等待channel收到数据以外,还可以使用send等到能够向channel发送数据。

fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
    for (num in 1..10) { // produce 10 numbers from 1 to 10
        delay(100) // every 100 ms
        select<Unit> {
            onSend(num) {} // Send to the primary channel
            side.onSend(num) {} // or to the side channel
        }
    }
}

fun sendTest() = runBlocking<Unit> {
    val side = Channel<Int>() // allocate side channel
    launch(coroutineContext) { // this is a very fast consumer for the side channel
        side.consumeEach { println("Side channel has $it") }
    }
    produceNumbers(coroutineContext, side).consumeEach {
        println("Consuming $it")
        delay(250) // let us digest the consumed number properly, do not hurry
    }
    println("Done consuming")
    coroutineContext.cancelChildren()
}
//输出
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming

    select抉择性得向自己的channel和一个叫做side的channel发送数据,谁先准备好谁胜出,同时准备好排在第一个的胜出。于是就产生了下面的输出情况。

 

Selecting deferred values

    select还能用用来选择deferred的值(上面说过,async会返回一个带返回值的协程,就是deferred类型。)

fun asyncString(time: Int) = async {
    delay(time.toLong())
    "Waited for $time ms"
}

fun asyncStringsList(): List<Deferred<String>> {
    val random = Random(3)
    return List(12) { asyncString(random.nextInt(1000)) }
}

    比如先定义一个返回Defferred<String> 的方法。它会开启一个协程,并且保证会返回一个String值。

    asycnStringsList方法开启12个这种协程,并且将Defferred<String>保存到list中。

    下面我们就利用select去选择这12个协程中最先有返回值的一个。我们使用onAWait()方法

fun onWaitTest() = runBlocking<Unit> {
    val list = asyncStringsList()
    val result = select<String> {
        list.withIndex().forEach { (index, deferred) ->
            deferred.onAwait { answer ->
                "Deferred $index produced answer '$answer'"
            }
        }
    }
    println(result)
    val countActive = list.count { it.isActive }
    Log.d("sss","$countActive coroutines are still active")
}

    便利list,使用onAwait注册返回值接收。然后一旦出现第一个返回,select就会返回。然后输出result值,最后输出还有多少协程依旧在运行。

    这里有个问题,官方文档上返回值为

Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active

    但是我们必须知道这是不可靠的,因为从select返回到获取存活数量这一步并非是原子性的,也就是可能在此之间又有其他协程运行完毕了。

 

Select channel和deferred混合等待

    我们可以同时等到channel的接收和deferred的,并且选择先发生的协程返回,比如看下面方法

fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
    var current = input.receive() // start with first received deferred value
    while (isActive) { // loop while not cancelled/closed
        val next = select<Deferred<String>?> { // return next deferred value from this select or null
            input.onReceiveOrNull { update ->
                update // replaces next value to wait
            }
            current.onAwait { value ->
                send(value) // send value that current deferred has produced
                input.receiveOrNull() // and use the next deferred from the input channel
            }
        }
        if (next == null) {
            println("Channel was closed")
            break // out of loop
        } else {
            current = next
        }
    }
}

    该方法会开启一个生产协程,并且需要输入一个channel,这个channel会有Deferred<String>类型的产品产出。

    先等待获取channel中的第一个产品。然后通过select选择,是channel中第二个产品先到,还是第一个产品(Deferred<String>)先孵化。

    如果第二个产品先到了,select直接返回这第二个产品。然后继续select是第三个产品先到还是第二个产品先孵化,以此类推。

    如果是第一个产品先孵化,那么就把孵化出来的值发送到自己的通道(其他监听该通道的协程就会收到数据进行处理。)然后select会返回select的第二个产品。

    我们不用纠结其中的逻辑,因为它并不是本章重点,重点在于我们需要知道我们可以在channel监听和deferred之间进行select!

 

 

总结

    至此当前版本的协程功能基本介绍完本,本文为官方文档的学习笔记,官方文档地址在https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#children-of-a-coroutine 

展开阅读全文
打赏
2
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
2
分享
返回顶部
顶部