破解 Kotlin 协程(6) - 协程挂起篇

关键词:Kotlin 协程 协程挂起 任务挂起 suspend 非阻塞

协程的挂起最初是一个很神秘的东西,因为我们总是用线程的概念去思考,所以我们只能想到阻塞。不阻塞的挂起到底是怎么回事呢?说出来你也许会笑~~(哭?。。抱歉这篇文章我实在是没办法写的更通俗易懂了,大家一定要亲手实践?。?/p>

1. 先看看 delay

我们刚刚学线程的时候,最常见的模拟各种延时用的就是 Thread.sleep 了,而在协程里面,对应的就是 delay。sleep 让线程进入休眠状态,直到指定时间之后某种信号或者条件到达,线程就尝试恢复执行,而 delay 会让协程挂起,这个过程并不会阻塞 CPU,甚至可以说从硬件使用效率上来讲是“什么都不耽误”,从这个意义上讲 delay 也可以是让协程休眠的一种很好的手段。

delay 的源码其实很简单:

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

cont.context.delay.scheduleResumeAfterDelay 这个操作,你可以类比 JavaScript 的 setTimeout,Android 的 handler.postDelay,本质上就是设置了一个延时回调,时间一到就调用 cont 的 resume 系列方法让协程继续执行。

剩下的最关键的就是 suspendCancellableCoroutine 了,这可是我们的老朋友了,前面我们用它实现了回调到协程的各种转换 —— 原来 delay 也是基于它实现的,如果我们再多看一些源码,你就会发现类似的还有 join、await 等等。

2. 再来说说 suspendCancellableCoroutine

既然大家对于 suspendCancellableCoroutine 已经很熟悉了,那么我们干脆直接召唤一个老朋友给大家:

private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
    cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
}

Job.join() 这个方法会首先检查调用者 Job 的状态是否已经完成,如果是,就直接返回并继续执行后面的代码而不再挂起,否则就会走到这个 joinSuspend 的分支当中。我们看到这里只是注册了一个完成时的回调,那么传说中的 suspendCancellableCoroutine 内部究竟做了什么呢?

public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        block(cancellable)
        cancellable.getResult() // 这里的类型是 Any?
    }

suspendCoroutineUninterceptedOrReturn 这个方法调用的源码是看不到的,因为它根本没有源码:P 它的逻辑就是帮大家拿到 Continuation 实例,真的就只有这样。不过这样说起来还是很抽象,因为有一处非常的可疑:suspendCoroutineUninterceptedOrReturn 的返回值类型是 T,而传入的 lambda 的返回值类型是 Any?, 也就是我们看到的 cancellable.getResult() 的类型是 Any?,这是为什么?

我记得在协程系列文章的开篇,我就提到过 suspend 函数的签名,当时是以 await 为例的,这个方法大致相当于:

fun await(continuation: Continuation<User>): Any {
    ...
}

suspend 一方面为这个方法添加了一个 Continuation 的参数,另一方面,原先的返回值类型 User 成了 Continuation 的泛型实参,而真正的返回值类型竟然是 Any。当然,这里因为定义的逻辑返回值类型 User 是不可空的,因此真实的返回值类型也用了 Any 来示意,如果泛型实参是个可空的类型,那么真实的返回值类型也就是 Any? 了,这正与前面提到的 cancellable.getResult() 返回的这个 Any? 相对应。

如果大家去查 await 的源码,你同样会看到这个 getResult() 的调用。

简单来说就是,对于 suspend 函数,不是一定要挂起的,可以在需要的时候挂起,也就是要等待的协程还没有执行完的时候,等待协程执行完再继续执行;而如果在开始 join 或者 await 或者其他 suspend 函数,如果目标协程已经完成,那么就没必要等了,直接拿着结果走人即可。那么这个神奇的逻辑就在于 cancellable.getResult() 究竟返回什么了,且看:

internal fun getResult(): Any? {
    ...
    if (trySuspend()) return COROUTINE_SUSPENDED // ① 触发挂起逻辑
    ...
    if (state is CompletedExceptionally)  // ② 异常立即抛出
        throw recoverStackTrace(state.cause, this) 
    return getSuccessfulResult(state) // ③ 正常结果立即返回
}

这段代码 ① 处就是挂起逻辑了,表示这时候目标协程还没有执行完,需要等待结果,②③是协程已经执行完可以直接拿到异常和正常结果的两种情况。②③好理解,关键是 ①,它要挂起,这返回的是个什么东西?

public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }

这是 1.3 的实现,1.3 以前的实现更有趣,就是一个白板 Any。其实是什么不重要,关键是这个东西是一个单例,任何时候协程见到它就知道自己该挂起了。

3. 深入挂起操作

既然说到挂起,大家可能觉得还是一知半解,还是不知道挂起究竟怎么做到的,怎么办?说真的这个挂起是个什么操作其实一直没有拿出来给大家看,不是我们太小气了,只是太早拿出来会比较吓人。。

suspend fun hello() = suspendCoroutineUninterceptedOrReturn<Int>{
    continuation ->
    log(1)
    thread {
        Thread.sleep(1000)
        log(2)
        continuation.resume(1024)
    }
    log(3)
    COROUTINE_SUSPENDED
}

我写了这么一个 suspend 函数,在 suspendCoroutineUninterceptedOrReturn 当中直接返回了这个传说中的白板 COROUTINE_SUSPENDED,正常来说我们应该在一个协程当中调用这个方法对吧,可是我偏不,我写一段 Java 代码去调用这个方法,结果会怎样呢?

public class CallCoroutine {
    public static void main(String... args) {
        Object value = SuspendTestKt.hello(new Continuation<Integer>() {
            @NotNull
            @Override
            public CoroutineContext getContext() {
                return EmptyCoroutineContext.INSTANCE;
            }

            @Override
            public void resumeWith(@NotNull Object o) { // ①
                if(o instanceof Integer){
                    handleResult(o);
                } else {
                    Throwable throwable = (Throwable) o;
                    throwable.printStackTrace();
                }
            }
        });

        if(value == IntrinsicsKt.getCOROUTINE_SUSPENDED()){ // ②
            LogKt.log("Suspended.");
        } else {
            handleResult(value);
        }
    }

    public static void handleResult(Object o){
        LogKt.log("The result is " + o);
    }
}

这段代码看上去比较奇怪,可能会让人困惑的有两处:

① 处,我们在 Kotlin 当中看到的 resumeWith 的参数类型是 Result,怎么这儿成了 Object 了?因为 Result 是内联类,编译时会用它唯一的成员替换掉它,因此就替换成了 Object (在Kotlin 里面是 Any?

② 处 IntrinsicsKt.getCOROUTINE_SUSPENDED() 就是 Kotlin 的 COROUTINE_SUSPENDED

剩下的其实并不难理解,运行结果自然就是如下所示了:

07:52:55:288 [main] 1
07:52:55:293 [main] 3
07:52:55:296 [main] Suspended.
07:52:56:298 [Thread-0] 2
07:52:56:306 [Thread-0] The result is 1024

其实这段 Java 代码的调用方式与 Kotlin 下面的调用已经很接近了:

suspend fun main() {
    log(hello())
}

只不过我们在 Kotlin 当中还是不太容易拿到 hello 在挂起时的真正返回值,其他的返回结果完全相同。

12:44:08:290 [main] 1
12:44:08:292 [main] 3
12:44:09:296 [Thread-0] 2
12:44:09:296 [Thread-0] 1024

很有可能你看到这里都会觉得晕头转向,没有关系,我现在已经开始尝试揭示一些协程挂起的背后逻辑了,比起简单的使用,概念的理解和接受需要有个小小的过程。

4. 深入理解协程的状态转移

前面我们已经对协程的原理做了一些揭示,显然 Java 的代码让大家能够更容易理解,那么接下来我们再来看一个更复杂的例子:

suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{
    continuation ->
    thread {
        Thread.sleep(1000)
        continuation.resume("Return suspended.")
    }
    COROUTINE_SUSPENDED
}

suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{
    log(1)
    "Return immediately."
}

我们首先定义两个挂起函数,第一个会真正挂起,第二个则会直接返回结果,这类似于我们前面讨论 join 或者 await 的两条路径。我们再用 Kotlin 给出一个调用它们的例子:

suspend fun main() {
    log(1)
    log(returnSuspended())
    log(2)
    delay(1000)
    log(3)
    log(returnImmediately())
    log(4)
}

运行结果如下:

08:09:37:090 [main] 1
08:09:38:096 [Thread-0] Return suspended.
08:09:38:096 [Thread-0] 2
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 3
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 4

好,现在我们要揭示这段协程代码的真实面貌,为了做到这一点,我们用 Java 来仿写一下这段逻辑:

注意,下面的代码逻辑上并不能做到十分严谨,不应该出现在生产当中,仅供学习理解协程使用。

public class ContinuationImpl implements Continuation<Object> {

    private int label = 0;
    private final Continuation<Unit> completion;

    public ContinuationImpl(Continuation<Unit> completion) {
        this.completion = completion;
    }

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object o) {
        try {
            Object result = o;
            switch (label) {
                case 0: {
                    LogKt.log(1);
                    result = SuspendFunctionsKt.returnSuspended( this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 1: {
                    LogKt.log(result);
                    LogKt.log(2);
                    result = DelayKt.delay(1000, this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 2: {
                    LogKt.log(3);
                    result = SuspendFunctionsKt.returnImmediately( this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 3:{
                    LogKt.log(result);
                    LogKt.log(4);
                }
            }
            completion.resumeWith(Unit.INSTANCE);
        } catch (Exception e) {
            completion.resumeWith(e);
        }
    }

    private boolean isSuspended(Object result) {
        return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
    }
}

我们定义了一个 Java 类 ContinuationImpl,它就是一个 Continuation 的实现。

实际上如果你愿意,你还真得可以在 Kotlin 的标准库当中找到一个名叫 ContinuationImpl 的类,只不过,它的 resumeWith 最终调用到了 invokeSuspend,而这个 invokeSuspend 实际上就是我们的协程体,通常也就是一个 Lambda 表达式 —— 我们通过 launch启动协程,传入的那个 Lambda 表达式,实际上会被编译成一个 SuspendLambda 的子类,而它又是 ContinuationImpl 的子类。

有了这个类我们还需要准备一个 completion 用来接收结果,这个类仿照标准库的 RunSuspend 类实现,如果你有阅读前面的文章,那么你应该知道 suspend main 的实现就是基于这个类:

public class RunSuspend implements Continuation<Unit> {

    private Object result;

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object result) {
        synchronized (this){
            this.result = result;
            notifyAll(); // 协程已经结束,通知下面的 wait() 方法停止阻塞
        }
    }

    public void await() throws Throwable {
        synchronized (this){
            while (true){
                Object result = this.result;
                if(result == null) wait(); // 调用了 Object.wait(),阻塞当前线程,在 notify 或者 notifyAll 调用时返回
                else if(result instanceof Throwable){
                    throw (Throwable) result;
                } else return;
            }
        }
    }
}

这段代码的关键点在于 await() 方法,它在其中起了一个死循环,不过大家不要害怕,这个死循环是个纸老虎,如果 resultnull,那么当前线程会被立即阻塞,直到结果出现。具体的使用方法如下:

...
    public static void main(String... args) throws Throwable {
        RunSuspend runSuspend = new RunSuspend();
        ContinuationImpl table = new ContinuationImpl(runSuspend);
        table.resumeWith(Unit.INSTANCE);
        runSuspend.await();
    }
...

这写法简直就是 suspend main 的真实面貌了。

我们看到,作为 completion 传入的 RunSuspend 实例的 resumeWith 实际上是在 ContinuationImplresumeWtih 的最后才会被调用,因此它的 await() 一旦进入阻塞态,直到 ContinuationImpl 的整体状态流转完毕才会停止阻塞,此时进程也就运行完毕正常退出了。

于是这段代码的运行结果如下:

08:36:51:305 [main] 1
08:36:52:315 [Thread-0] Return suspended.
08:36:52:315 [Thread-0] 2
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] 3
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] 4

我们看到,这段普通的 Java 代码与前面的 Kotlin 协程调用完全一样。那么我这段 Java 代码的编写根据是什么呢?就是 Kotlin 协程编译之后产生的字节码。当然,字节码是比较抽象的,我这样写出来就是为了让大家更容易的理解协程是如何执行的,看到这里,相信大家对于协程的本质有了进一步的认识:

  • 协程的挂起函数本质上就是一个回调,回调类型就是 Continuation
  • 协程体的执行就是一个状态机,每一次遇到挂起函数,都是一次状态转移,就像我们前面例子中的 label 不断的自增来实现状态流转一样

如果能够把这两点认识清楚,那么相信你在学习协程其他概念的时候就都将不再是问题了。如果想要进行线程调度,就按照我们讲到的调度器的做法,在 resumeWith 处执行线程切换就好了,其实非常容易理解的。官方的协程框架本质上就是在做这么几件事儿,如果你去看源码,可能一时云里雾里,主要是因为框架除了实现核心逻辑外还需要考虑跨平台实现,还需要优化性能,但不管怎么样,这源码横竖看起来就是五个字:状态机回调。

5. 小结

不同以往,我们从这一篇开始毫无保留的为大家尝试揭示协程背后的逻辑,也许一时间可能有些难懂,不过不要紧,你可以使用协程一段时间之后再来阅读这些内容,相信一定会豁然开朗的。

当然,这一篇内容的安排更多是为后面的序列篇开路,Kotlin 的 Sequence 就是基于协程实现的,它的用法很简单,几乎与普通的 Iterable 没什么区别,因此序列篇我们会重点关注它的内部实现原理,欢迎大家关注。


欢迎关注 Kotlin 中文社区!

中文官网:https://www.kotlincn.net/

中文官方博客:https://www.kotliner.cn/

公众号:Kotlin

知乎专栏:Kotlin

CSDN:Kotlin中文社区

掘金:Kotlin中文社区

简书:Kotlin中文社区

开发者头条:Kotlin中文社区

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容