探索 Kotlin 协程原理


highlight: a11y-dark

接下来跟大家分享一下我在了解 Kotlin 协程实现的过程中理解的一些概念,如果你发现哪些地方我说错了的话,欢迎提出你的理解。

1. Kotlin 协程原理概述

Kotlin 协程的大致的执行流程如上图所示,这个流程是各种类型的协程执行时都大致遵循的流程,不是一个严格精确的执行流程。

下面先来看下协程执行过程中的一些关键类的介绍。

1. 协程代码块

当我们调用 launch()withContext() 等协程构建器函数时,对应的协程代码块会被 Kotlin 编译器转换为一个匿名内部类,这个匿名内部类继承了 SuspendLambda ,实现了 Function2 接口。

SuspendLambda挂起 Lambda 表示式 的运行时类型,它重写了父类 BaseContinuationImplinvokeSuspend() 方法和 create() 方法,invokeSuspend() 方法中的代码就是协程代码块中的代码,而 create() 方法则是用来实例化代码块对应的匿名类。

2. 任务

任务负责执行协程代码块,任务指的是 DispatchedTask 的子类,比如 DispatchedContinuationAwaitContinuaton 以及 CancellableContinuationImpl

协程分发器会把 SuspendLambda 封装为任务 DispatchedContinuation

CancellableContinuationImpl 是在内联函数 suspendCancellableCoroutine() 中创建的,Retrofitsuspend 函数的支持就是通过这个内联函数 ,CancellableContinuationImpl 中有一个简单的决策状态机,它的状态能从 UNDECIDED(未定) 迁移到 SUSPENDED(挂起) ,也能从 UNDECIDED(未定) 迁移到 RESUMED (恢复),但是 SUSPENDED 和 RESUMED 这两个状态之间无法相互迁移。

AwaitContinuationCancellableContinuationImpl 的子类。当我们调用 CoroutineScopeasync() 扩展函数后,这个函数会返回一个 Deferred 对象,这个对象的具体实现是 DeferredCoroutine ,当我们调用 Deferredawait() 方法等待协程的执行结果时,DeferredCoroutine 就会创建一个 AwaitContinuaton

3. 协程

协程主要负责维护工作节点,包括传播取消、完成和异常等事件给工作结点和父协程,工作结点的类型是 JobNode,它是对任务和子协程的封装。

当我们调用协程构建器函数启动创建协程时,这些函数的内部会创建协程 ,比如 runBlocking() 中会创建BlockingCoroutine ,launch() 中会创建 StandaloneCoroutine ,这些协程都是 JobSupport 的子类,JobSupport 实现了 Job 接口,也就是可以把协程理解为一个工作项,工作项中可以包含子工作项或子任务。

4. 协程分发器

协程分发器负责执行任务,协程启动后会通过协程分发器分发(执行)任务,比如用来执行阻塞任务的 BlockingEventLoop 和默认协程调度器 DefaultScheduler 都是协程分发器 CoroutineDispatcher 的子类。

CoroutineDispatcher实现了 CoroutineContext 接口,也就是协程分发器是一个协程上下文,在协程上下文会通过不同的 Key 保存上下文中的元素 Element,Key 和Element 是 CoroutineContext 中定义的接口, ContinuationInterceptorCoroutineDispatcher 都声明了实现了 Key 接口的伴生对象,而 DefaultScheduler 和 BlockingEventLoop 则实现了 CoroutineContext 中的 Element 接口。

runBlocking() 函数使用的分发器BlockingEventLoop 中有 3 个队列,分别是无限制(优先)任务队列延迟任务队列以及普通任务队列

而默认协程调度器 DefaultScheduler 中有 2 个全局队列和 1 个局部队列。全局队列分别是全局阻塞任务队列全局 CPU 任务队列,它们的类型都是 GlobalQueue 。DefaultScheduler 实现了 Executor 接口,也就是它是一个线程池,在它的工作者线程 Worker 类中,有一个局部任务队列 ,这个队列的类型为WorkQueue ,当 Worker 运行时,会优先从局部任务队列中获取任务执行。

CoroutineDispatcher 实现了 ContinuationInterceptor 接口,启动协程需要通过协程启动选项 CoroutineStart 启动,当 CoroutineStart 通过 startCoroutineCancellable() 等方法创建任务时,会让协程分发器拦截这个任务,所谓拦截就是把接收到的 Continuation 封装为 DispatchedContinuation

5. 挂起点后的延续操作 Continuation

Continuation 是 Kotlin 协程中非常重要的一个概念,它表示一个挂起点之后的延续操作

可以把它理解为一个步骤,而挂起点就是这些步骤之间的边界。Continuation 中声明了一个协程上下文 context 常量和一个用于执行延续操作的 resumeWith() 方法。

如上图所示,协程代码块会根据挂起点分为一段段的Continuation ,Continuation 是以一个接一个 resume 的方式连续执行的,不同的 label 代表代码块的不同段落。

Continuation 的实现类有很多,比如 DispatchdContinuationSuspendLambda 以及各个协程,也就是它们都是一个挂起点之后的延续操作。协程的 resumeWith() 方般是在协程代码块中的任务都完成后,最后调用的。

任务会执行协程构建器函数对应的代码块的代码,Kotlin 编译器会把这些代码转化为继承了 SuspendLambda 的匿名内部类,在匿名内部类的 invokeSuspend() 方法中,就包含了协程代码块的代码,当 DispatchedContinuation 执行时,invokeSuspend() 就会以 completion 的形式被 DispatchedContinuation 调用,也就是completion 表示当前任务完成后,后续需要执行的操作。

上面这段用 Retrofit 发起网络请求的代码的执行时序图如下。

上面这段代码中用 runBlocking() 执行请求,runBlocking 使用的协程分发器是 BlockingEvent当 BlockingEventLoop运行 DispatchedContinuation 时,DispatchedContinuation 会执行代码块 SuspendLambda 的代码,协程代码块中如果有调用 suspendCanellableCoroutine() 的话,上面这段代码是通过 Retrofit 发起请求的,这就会间接调用到 CanecllableContinuationImplgetResult() 方法挂起协程,等待耗时操作执行完成后返回结果。

当OkHttp 返回响应后,就会在 onResponse() 回调中调用 CancellableContinuationImplresume() 方法把执行结果作为自己的状态。

CancellableContinuationImplresume() 方法会把自己作为任务分发到协程分发器中,然后协程分发器就会调用到 DispatchedTaskrun() 方法,DispatchedTask 会获取子类的状态,也就是耗时操作的执行结果,拿到这个状态后,就会再次调用 SuspendLambda 的 invokeSuspend() 方法,也就是 SuspendLambda 的 invokeSuspend() 方法被调用了两次,第一次返回的是挂起标志 COROUTINE_SUSPEND ,返回这个标志就意味着这次任务已经完成,等下一个任务启动后再继续执行。

第二次调用 invokeSuspend() 返回的是耗时操作执行结果,这时会调用 BlockingCoroutineresumeWith() ,并把结果值作为 BlockingCoroutine 的状态值 。

6. 通过状态机决定恢复操作

协程挂起的实现是通过状态机让代码分段执行,协程代码块中的代码执行到挂起点后就不会继续往下执行,直到被恢复(resume) 。对于执行协程的线程来说,当协程执行到挂起点后,就认为这个任务已经执行完成了,直到耗时操作的结果回来,再恢复(resume)新的任务的执行,这时由于代码块对应的匿名内部类内部的状态(label)已经迁移到下一个状态了,所以协程恢复执行的时候会执行下一段代码。

如上图所示,协程代码块对应的匿名内部类的 invokeSuspend() 方法会根据不同的 label 值执行不同的操作,labelSuspendLambda成员变量,默认值为 0

假如协程代码块中执行了两个任务(调用了两次 suspendCancellableCoroutine) ,当 label 为 0 时,就会执行任务 1,假如任务 1 返回了挂起标志 COROUTINE_SUSPENDED,那 SuspendLambda 就不会继续往下执行,也就是不会执行任务 2

当耗时操作结束后(如 OkHttp 响应回来),会调用 CancellableContinuationImplresume() 方法,resume() 方法会再次触发 SuspendLambdainvokeSuspend() 方法,这时由于 label1 ,那么就不会执行任务 1 ,而是执行任务 2 ,然后会通过 ResultthrowOnFailure() 方法检查任务执行结果是否为异常,如果是异常就会抛出异常

除了 suspendCancellableCoroutine() 函数中会调用 CancellableContinuationImplgetResult() 方法会把协程挂起以外,withContext() 方法也会调用 DispatchedCoroutinegetResult() 方法把父协程挂起。

7. 小结

以上图中左侧的代码为例,这段代码中使用了 viewModelviewModelScope 启动了一个用 Retrofit 进行网络请求的协程,在这个协程代码块中,调用了 Service 接口的挂起函数 body()

对于 body() 的调用,Retrofit 会通过 suspendCancellableCoroutine() 创建一个 CancellalbeContinuationImpl ,对于 launch() 代码块,Kotlin 编译器会把它转换为 SuspendLambda,然后协程分发器会把 SuspendLambda 封装为 DispatchedContinuation ,CancellableContinuationImpl 和 DispatcheContinuation 都是 Task 的子类,也就是它们两个都是任务

DispatchedContinuation 持有了 SuspendLambda ,而 CancellableContinuationImpl 则持有了另外一个 DispatchedContinuation 实例,该实例也持有了协程代码块对应的 SuspendLambda 。DispatchedContinuation 和 CancellableContinuationImpl 在 resume 的时候都会调用 SuspendLambda 的 invokeSuspend() 方法执行协程代码块中的代码。

假如这个 ViewModelActivity 的,那么在 Activity 退出的时候,ViewModel 的 clear() 方法就会被调用,clear() 会调用 CloseableCoroutineContextcancel() 扩展函数,cancel() 会通过协程上下文获取到该上下文中的协程,然后会调用协程StandaloneCoroutinecancel() 方法取消协程的执行。StandaloneCoroutine 的父类 JobSupport 中有一个状态机,这个状态机的其中一个状态为多结点状态,这时状态的类型为 NodeList ,也就是一个工作结点列表,通过这个工作结点列表,协程就可以把在协程代码块中启动的任务给取消掉。

2. Kotlin 协程简介

1. 什么是协程?

从广义上来说,协程(Coroutine)是一种并发设计模式,我们可以用它来简化异步执行的代码,Kotlin 协程是在 1.3 版本时引入的,是基于其他语言已有的概念开发的。

2. 协程有哪些特点?

协程的 4 个特点是:轻量、内存泄漏更少、内置取消支持以及 Jetpack 集成。

  • 轻量 一个线程中可以包含多个协程,协程支持挂起,不会让正在运行协程的线程阻塞,与阻塞线程相比,挂起协程的操作更轻量

  • 内存泄漏更少

    协程使用了结构化并发机制,可以在一个作用域内执行多个操作,可以一次性全部取消掉,这样就不用像 RxJava 一样要自己把 Disposable 放在 CompositeDisposable 里

  • 内置取消支持 当我们取消一个协程时,取消操作会在运行中的整个协程层次结构内传播,也就是父协程取消后,子协程也会被取消

  • Jetpack 集成 Jetpack 中的 ViewModelLifecycleLiveData 都提供了对应的协程作用域

另外 Kotlin 协程框架中的挂起函数有另外一个好处,就是可以在编译时就让方法的调用方知道这是一个耗时的操作,需要确定这个操作要放在哪个线程执行,这样就不用像 Android 框架对主线程网络请求的禁止方式一样,在运行时才抛出异常。

3. 什么是结构化并发?

多线程并发是全局的,而结构化并发中每个并发都有自己的作用域

结束线程时,如果想要同时结束这个线程中的子线程,可以通过自定义的共享标记位来结束。

如果想要等待所有子线程都执行完了,再结束父线程,可以使用 CountDownLatch或其他线程协作工具 。

不论是共享标记位还是 CountDownLatch ,都是需要我们编写额外代码才能实现的,线程之间默认是无关的,线程执行的上下文是整个进程,这就是非结构化并发

但是在我们实际的开发中,经常会出现某个任务是另一个任务的子任务,而且它还有可能有自己的子任务,这时我们就不得不编写一些能实现同时取消子任务的额外代码。

但是如果并发是结构化的,每个并发操作都有自己的作用域,并且父作用域内新建的作用域都属于它的子作用域,父作用域的生命周期会持续到所有子作用域执行完,当主动结束父作用域时,能自动结束它的各个子作用域,这就是结构化并发

4. 协程、线程和进程之间有什么区别和联系?

进程是系统资源分配的最小单位,线程是 CPU 调度的最小单位,一个进程中可以有个多个线程,一个线程中可以有多个协程。

  • 进程 进程是系统资源分配的最小单位,拥有独立的地址空间内存空间网络文件资源等,不同进程之间的资源是独立的,进程之间可以通过进程间通信机制交互,比如管道共享内存信号量等方式
  • 线程 线程是 CPU 调度的基本单位,除了拥有运行时的程序计数器寄存器以外,本身不拥有系统资源,进程中的线程会共享进程的资源
  • 协程 协程可以看成是运行在线程上的代码块,协程提供的挂起操作会让协程暂停执行,不会导致线程阻塞,一个线程内部可以创建多个协程

5. 协程挂起示例

假如把一个线程看作是一个人,把做饭的过程看成是一个线程要完成的事情,把相关的任务放进不同的协程中,那么把洗好的米放进电饭煲,就意味着与饭相关的协程可以被挂起,把准备好的煲汤材料放进锅里煮后,那么与相关的协程就可以被挂起,这时线程就可以执行与菜相关的任务。

当菜做完并且装盘后,这时如果电饭煲响了,饭煮好了,就可以把饭盛到碗完里了,也就是负责做饭相关的事情的协程恢复执行了。

如果没有协程,就意味着我们要使用 3 个线程分别做这 3 个不同类型的事情,也就是要 3 个人,如果我们是开小饭店的话,3 个人还凑合,但是如果我们在家里自己做饭的话,3 个人一起做饭就有点多了(不包含土豪家庭)。

上面这个协程使用场景的代码如下。

上面这段代码声明了一个单线程的协程分发器,也就是代码中的 3 个协程只会在一个线程中运行。

并且因为协程执行的线程与测试执行的线程不是同一个线程,为了避免测试线程执行完成后,协程还未结束的问题,所以测试代码中加了 CountDownLatch ,等协程执行的线程运行完成后再让测试线程继续执行。

上面这个单元测试执行后,控制台会打印如下文本。

看完了协程的介绍后,我们来简单看下 Retrofit 是怎么支持挂起函数的。

3. Retrofit 是怎么支持挂起函数的?

我们一般使用协程都是因为要使用 RetrofitRoom 等框架执行网络 IO 或数据库 IO ,而不是自己把协程挂起。这两个框架都支持用挂起函数获取网络响应数据和数据库的数据,下面我们就来看下 Retrofit 是怎么实现在执行网络请求时挂起协程的。

1. KotlinSuspendTest

再看一下前面提到的上面这段单元测试代码,body() 测试方法是在 runBlocking() 代码块中通过 Service 的 body() 方法模拟请求网络数据。

如上图所示,当在 KotlinSuspendTest 中调用 Retrofit 的 create() 方法创建 API 服务时,Retrofit 会通过 ServiceMethod 解析 API注解的内容。

ServiceMethod 会通过请求工厂 RequestFactory 来解析 API 注解的内容,请求工厂则会通过 HttpServiceMethod 调用 Call 接口的 await() 扩展函数,await() 函数会调用 Kotlin 协程框架中的 suspendCancellableCoroutine() 方法挂起协程。

挂起协程后,再把响应回调通过 OkHttpCall 传给 OkHttp 中的 RealCall ,把请求添加到请求队列,当从 OkHttp 中获得响应时,再调用 CancellableContinuationImpl 实例的 resume() 方法恢复协程的执行。

关于 suspendCancellableCoroutine() 函数的实现在后面会讲。

2. KotlinExtentions#await

Retrofit 中的 KotlinExtentionsawiat() 方法的代码如下。

如果我们想在某个耗时操作执行时挂起协程,并在获得结果时恢复协程,我们也可以使用 suspendCancellableCoroutine() 这个方法,当在获取到操作执行结果后,再调用 Continuation 实例的 resume() 即可。

比如为了避免读写 SharedPreferences 时的 ANR 问题,就可以使用这个方法,把 SharedPreferences 读写放在 suspendCancellableCoroutine() 的代码块参数中。

4. runBlocking() 原理概述

前面有讲通过 runBlocking() 结合 Retrofit 发起网络请求的一个简单的程序执行时序图,下面来看下这个过程更细化一些的调用时序。

接下来讲解的 Kotlin 源码的版本为 1.6.10

上面这段代码的调用时序如下,runBlocking() 的执行过程可分为协程启动过程任务分发过程协程挂起过程以及协程恢复过程

1. 协程启动过程

当我们通过 runBlocking() 阻塞地获取协程执行结果时,如果当前线程的事件循环 EventLoop为空,runBlocking() 方法中就会创建一个新的阻塞式事件循环 BlockingEventLoop

BlockingEventLoop 是 EventLoop 的子类,也是 CoroutineDispatcher 的子类,也就是它是一个事件循环,同时也是一个协程分发器。获取到或创建完事件循环后,runBlocking() 中就会创建一个阻塞式协程 BlockingCoroutine ,并调用它的 start() 启动协程,start() 方法会通过启动选项 CoroutineStart 调用 SuspendLambda 的 startCoroutineCancellable() 扩展函数,把 SuspendLambda 封装为一个 DispatchedContinuation,并调用它的 resumeCancellableWith() 方法。

2. 任务分发过程

在 DispatchedContinuation 的 resumeCancellableWith() 方法中,会调用 BlockingEventLoopdispatch() 方法把自己加入到事件循环的任务队列,然后回到 runBlocking() 方法中,在 runBlocking() 方法的最后,会调用 BlockingCoroutine 的 joinBlocking() 方法让分发器执行任务,并返回任务的执行结果。

3. 任务挂起(执行)过程

BlockingCoroutinejoinBlocking() 方法会调用事件循环的 processNextEvent() 方法,这个方法会执行任务(DispatchedContinuation),DispatchedContinuation 的父类 DispatchedTask是 Runnable 的实现类,在它的 run() 方法中,会调用 SuspendLambda 的父类的 resumeWith() 方法,resumeWith() 方法则会调用 runBlocking() 代码块对应的匿名内部类的 invokeSuspend() 方法执行代码块中的内容。

上面这个单元测试中的 runBlocking() 代码块调用了 suspendCancellableCoroutine() 内联函数,这个函数中会创建 CancellableContinuationImpl ,并调用它的 initCancellability() 方法,把它封装为工作结点 ChildContinuation ,并设为 BlockingCoroutine 的状态。

调用完initCancellability() 方法后,就会执行 suspendCancellableCoroutine() 代码块中的内容,也就是启动执行耗时操作的线程,然后再调用 CancellableContinuationImpl 的 getResult() 方法获取结果,这时由于新启动的线程还没执行完成,所以 getResult() 会返回挂起标志,也就是协程代码块的这一段代码已经执行完成,现在等待下一段执行。

4. 任务恢复过程

当用于执行耗时操作,新启动的线程的 run() 方法被调用后,就会把执行结果传给 CancellableContinuationImplresume() 方法,resume() 方法会把自己从协程的工作结点中移除,然后会再次调用 SuspendLambda 的 invokeSuspend() 方法,当 SuspendLambda 接收到这个值后,就会传给 BlockingCoroutine ,把值作为协程的状态值,BlockingCoroutine 会唤醒调用 runBlocking() 的测试线程,最后就会把自己的状态值返回。

5. 协程启动过程

下面我们就来看下协程启动过程具体都发生了什么。

1. runBlocking 字节码

第 4 大节提到的单元测试代码的部分字节码如下。

在上面的字节码中,可以看到 test() 方法调用了匿名内部类 ExampleTest$test$result$1 的构造函数(init),这个匿名内部类继承了 SuspendLambda ,并且实现了 Function2 接口,它的构造函数中会调用 父类 SuspendLambda 构造函数(init)。

2. 反编译后的协程代码块

上面这段字节码进行反编译后,对应的 Java 代码如下。

从上面的代码中我们可以看到,runBlocking的代码块参数 block 被转换为了一个实现了 Function2 接口的匿名内部类,在这个类中声明了 invokeSuspend()create()invoke() 3 个方法。

create() 方法有 var1var2 两个参数,var1 就是 arity(参数数量) ,var2 是一个 Continuation 实例,我们在 runBlocking()launch() 等启动协程的函数中声明的代码块,Kotlin 编译器会为我们生成一个 SuspendLambda 对象,这 var1 和 var2 参数就是 SuspendLambda 的构造函数的参数,Function2 在运行时对应的是一个 SuspendLambda 对象,而 Continuation 则是任务完成后要延续执行的操作。create() 方法拿到这两个参数后,会用来创建了一个 SuspendLambda 实例。

SuspendLambda 的invokeSuspend() 方法中会根据不同的 label 值执行不同的操作,当 label 为 0 时,会创建一个 CancellableContinuationImpl 实例,然后会将任务执行的结果值传给 CancellableContinuationImpl

在绿色虚线矩形中的代码,就是 suspendCancellableCoroutine() 函数中的代码,这是一个内联函数,关于它的实现在后面后讲。

蓝色矩形中的代码,就是前面在 ExampleTest 中的 suspendCancellableCoroutine { } 代码块中的代码。

3. SuspendLambda

SuspendLambda 是一个抽象类,继承了 ContinuationImpl 类,实现了 FunctionBaseSuspendFunction 接口,也就是 SuspendLambda 是一个挂起函数

ContinuationImplBaseContinuationImpl 的子类,BaseContinuationImpl 实现了 Continuation 接口,Continuation 接口表示一个挂起点后要延续执行的操作,也就是 SuspendLambda 是协程恢复执行时要延续执行的操作。

4. runBlocking()

下面来看下 runBlocking() 方法的具体实现。

runBlocking() 方法用于运行一个新的协程,该方法会阻塞调用该方法的线程,直到获取任务的返回结果。

runBlocking() 方法中,首先会尝试从协程上下文 CoroutineContext 获取 ContinuationInterceptor,由于前面的例子中没有往 runBlocking() 方法中传入 context ,所以 context 为默认的 EmptyCoroutineContext ,它在默认情况下并没有保存 ContinuationInterceptor 元素(Element),所以这里的 contextInterceptor 为空。

当 contextInterceptor 为空时,runBlocking() 方法中就会通过 ThreadLocalEventLoop 获取当前线程的 EventLoop ,如果 EventLoop 为空,则调用 createEventLoop() 方法创建一个 BlockingEventLoop 并保存到 ThreadLocal 。然后再通过 CoroutineScope 的 newCoroutineContext() 扩展函数把 BlockingEventLoop 和 CoroutineId 组合成一个 CombinedContext 。

然后会创建一个 BlockingCoroutine 协程实例并调用它的 start() 方法启动协程,最后调用协程的 joinBlocking() 方法把任务加入队列,并等待任务的执行结果。

5. CoroutineContext

CoroutineContext 协程上下文是 Element(元素)实例的索引集,索引集中每一个元素都有一个唯一的 KeyElement 接口用于表示协程上下文的元素,一个协程上下文的元素就是它自己的上下文单例

如上图所示, Element 接口继承了 CoroutineContext 接口,持有了一个类型为 Key 的元素键。CoroutineIdJobCoroutineDispatcher都是协程上下文的元素,也就是每一个协程和分发器都是上下文中的元素。

接下来以上面这段代码为例,看下协程上下文是怎么保存元素的。

协程上下文接口中声明了一个 fold() 函数 ,并且重载了 getplus 两个操作符。在 plus 操作符的重载实现函数中,首先会判断加号右边的上下文是否为空上下文,如果是的话,就没有不要加在一起,直接返回加号左边的上下文。

然后会调用 fold() 方法累加(accumulate)上下文中的元素,这个方法会用 operation 函数参数把上下文中的元素从左到右进行累加。在 Element 接口的 fold 函数中,会直接把 initial 参数和自己传到 operation 函数中。

在前面的示例代码中,context1的值为 supervisor + Dispatchers.Default ,在 CoroutineContext 的 plus 函数中,operation 传回来的 acc 是加号左边的上下文,也就是 supervisor ,和 element 就是加号右边的上下文,也就是 plus 函数的入参。

plus() 函数调用的 fold 代码块中,首先会把加号左边的上下文中中 keyelement 相同的元素给删掉,比如 Dispatchers.Default 对应的 KeyContinuationInterceptorKey ,第一次相加时,由于 SupervisorJob 不是一个 CombinedContext ,没有上下文中没有拦截器 ContinuationInterceptor ,所以没有删除元素。

而第二次把 context1Dispatchers.Main 相加时,context1 是一个 CombinedContext ,这个组合上下文里有 ContinuationInterceptor(Dispatchers.Defualt),这时之前的默认协程分发器就会被删掉。

在 fold 代码块中,删掉了与 elementkey 相同的元素后会赋值给局部变量 removed ,然后会判断删除目标元素后,左边的上下文是否为空上下文,如果是的话,就直接返回加号右边的上下文。

如果删掉目标后的上下文不是空上下文,就会会从删除后的上下文中取拦截器,如果没有拦截器的话,就直接把加号左右两边的上下文结合并返回,因为后面的代码是为了把拦截器放到组合上下文的右边(元素)。

如果 removed 里有拦截器的话,就会判断把 removed 里的拦截器删掉后,是不是只剩下空上下文了,如果是的话,说明 removed 就是拦截器,这时直接把拦截器和加号右边的上下文组合后返回。

如果删掉拦截器后,还有其他上下文的话,就把删掉拦截器后的上下文和加号右边的上下文结合为一个新的 CombinedContext ,然后再把这个 CombinedContext 作为另一个 CombinedContext 的左边的上下文,并把拦截器作为该组合上下文的元素,这么做的目的是为了把拦截器放到上下文的右边,这样获取拦截器的效率会更高,因为取元素的操作是从右到左取的。

6. COROUTINE_SUSPENDED

前面提到的协程代码块对应的匿名内部类中的 invokeSuspend() 函数中第一行就把 COROUTINE_SUSPENDED 标志作为局部变量。

COROUTINE_SUSPENDED 是枚举类 CoroutineSingletons 的值,是挂起协程的标志,CoroutineSingletons 有下面 3 个值:

  1. 已挂起:COROUTINE_SUSPENDED

    DispatchedContinuationDispatchedCoroutine 会用这个标志挂起协程

  2. 未决定:UNDECIDED

    直接通过一个 suspend 代码块createCoroutine() 扩展函数,就可以创建一个 SafeContinuation ,虽然 SafeContinuation 并不是 AbstractCoroutine 的子类,但是在 Kotlin 框架中,协程更广义的定义是 Continuation 。SafeContinuation 的初始结果 initialResult 的值就是 UNDECIDED ,具体的创建方法如下。当我们想要自己控制协程 resume 的时机时,就可以使用 createCoroutine() 方法创建协程。

<!---->

  1. 已恢复:RESUMED

    除了 suspendCancellableCoroutine() 函数可以用来挂起协程,还有 suspendCoroutine() 函数也能用来挂起协程,这两个使用的方法是一样的,但是实现不太一样,suspendCoroutine() 函数中创建的是 SafeContinuation ,而 suspendCancellableCoroutine() 中创建的则是 CancellableContinuationImpl

    CanellableContinuationImpl 与 SafeContinuation 的一个重要区别,就是在父协程取消时,会通知 CancellableContinuationImpl 执行自己的取消回调。

    suspendCancellableCoroutine() 函数会调用 CancellableContinuationImpl 的 getResult() 方法挂起协程,而 suspendCoroutine() 函数则会调用 SafeContinuation 的 getOrThrow() 方法直接获取任务的执行结果。

    SafeContinuationresumeWith() 方法中,会判断结果值是否为挂起标志 COROUTINE_SUSPENDED,如果是的话,就把状态恢复为 RESUEMD ,也就是需要我们自己控制它的状态,比如像下面这样。

7. BlokingEventLoop

runBlocking() 方法中的 BlockingEventLoopEventLoop 的子类,EventLoop 是 协程分发器 CoroutineDispatcher 的子类,CoroutineDispathcer 实现了 ContinuationInterceptor 接口,并且实现了 CoroutineContext 接口,也就是也就是 BlockingEventLoop 是一个协程分发器协程上下文,而协程分发器则要负责拦截 Continuation

EventLoop 是一个抽象类,继承了协程分发器 CoroutineDispatcher ,当 runBlocking() 方法中尝试获取 ThreadLocalEventLoop 的 eventLoop 时,如果当前线程还没有事件循环 ,就会调用 createEventLoop() 方法创建一个新的事件循环。

createEventLoop() 方法会创建一个 BlockingEventLoop(阻塞式事件循环),EventLoopImplBase 继承了 EventLoopImplPlatform 类,实现了 Delay 接口,而 EventLoopImplPlatform 又继承了 EventLoop 类。EventLoop 继承了协程分发器 CoroutineDispatcher 类,CoroutineDispatcher 继承了 AbstractCoroutineContextElement。

8. BlockingCoroutine

BlockingCoroutine 继承了 AbstractCoroutine 抽象协程类,AbstractCoroutine 继承了 JobSupport ,并实现了 JobContinuationCoroutineScope 三个接口,也就是 BlockingCoroutine 是一个工作项(Job),并且是一个任务完成后的延续点,还是一个协程作用域 CoroutineScope

BlockingCoroutine 的父类 JobSupport 则实现了 JobChildJobParentJobSelectClause0 四个接口,也就是 JobSupport 是一个工作项,同时还能是其他工作项的父工作项和子工作项。

在 BlockingCoroutine 中,重写了 JobSupportafterCompletion() 方法,这个方法是在协程执行完成后被调用的,在这个方法中,当调用该方法的线程与事件循环所在的线程不是同一个线程时,会通过 unpark() 方法唤醒事件循环所在的线程。

BlockingCoroutine 的 start() 方法只是简单调用了启动选项 CoroutineStart ,会面会讲到 CoroutineStart 的实现 。

9. JobSupport

AbstractCoroutineBlockingCoroutine 的父类,在AbstractCoroutine 的初始化代码块中,当 initParentJob 参数为 true 时,就会在初始化时调用 initParentJob() 方法设置当前协程与其他协程的父子关系,而 BlockingCoroutine 传给 AbstractCorotuine 的 initParentJob 值就是 true

initParentJob() 方法中接收到的 parent 参数是从协程上下文中取出来的,如果是最外层的协程,parentContext 中就没有设置 KeyJob 的元素,也就是没有父协程,这时 parent 为空,反例则是用 withContext() 启动的协程,parent 就不为空。

当 parent 不为空时,initParnetJob() 方法中就会调用 parent的 start() 方法启动父协程,并调用 parent 的 attachChild() 方法,把自己作为工作节点 JobNode 保存到 JobSupport_state 状态中,attachChild() 方法会返回一个父工作项的句柄。

当协程执行完成后,就会调用父工作项句柄的 dispose() 方法销毁句柄,销毁句柄的具体操作,就是把 JobSupport 中的 _state 重置为 EMPTY_ACTIVE 状态。

10. JobSupport#attachChild

JobSupportattachChild() 方法中,会把子协程 ChildJob 封装为子工作项句柄节点 ChildHandleNode ,并把它作为 ChildHandle 传到nvokeOnCompletion() 方法中,以注册工作完成回调。

11. ChildHandleNode

ChildHandleNode 继承了 JobCancellingNodeJobCancellingNodeJobNode 的子类,也就是 ChildHandleNode 是一个工作结点

ChildHandleNode 还实现了 ChildHandle 接口,这个接口中声明了一个类型为 parent 常量、invoke() 方法和一个 childCancelled() 方法,子工作项可以通过 childCancelled() 方法通知父工作项。

12. JobSupport#invokeOnCompletion

JobSupportinvokeOnCompletion() 方法中,首先会调用 makeNode() 方法把当前协程赋值给节点(handler)的 job 成员变量 ,然后再根据不同的状态执行不同的操作。

当前状态为空状态(无节点) Empty 并且是活跃的话,就把新建的节点作为当前状态,也就是改为单节点状态 Single 。当状态为 Empty 并且是非活跃状态时,就把空状态升级为多节点状态 NodeList

当前状态为未完成状态时,如果状态的列表为空,说明当前状态是一个单节点状态 SINGLE ,这时把状态迁移为多节点状态 NodeList 。如果状态为未完成状态,并且列表不为空,并且正在结束中 Finishing(取消中),这时就把节点添加到节点列表中。

当前状态为已完成状态,并且需要立刻调用时,就调用 CompletionHandlerinvokeIt() 方法,CompletionHandler 的类型是一个函数,也就是直接调用完成回调函数。

13. JobSupport 状态

JobSupport 内部状态之间的迁移路径如上图所示,从是否活跃的角度来说,可以分为新建状态活跃状态不活跃状态

从结点数量的角度来说,可以分为空状态单结点状态多结点状态结束中状态最终状态

空状态根据是否活跃可以分为 EMPTY_NEMPTY_A ,协程是否活跃取决于构造函数中的 active 参数,比如使用 CoroutineScopelaunch() 扩展函数启动的 StandaloneCoroutine 就是活跃的,而延迟启动的 LazyStandaloneCoroutine 则是不活跃的。

最终状态根据协程是否被取消可以分为 FINAL_CFINAL_R

单结点状态可以分为 SINGLESINGLE+,在 CancellableContinuationImpl 初始化的时候,会调用 JobSupportinvokeOnCompetion() 方法注册执行完成回调,协程会把这个任务作为自己的状态,这时协程就进入了 SINGLE 状态。当第二个任务调用 invokeOnCompletion() 的时候,因为 JobSuppot的 state 的 list 列表为空,这时为了初始化 list ,就会通过 promoteSingleToNodeList() 方法,进入 SINGLE+ 中间状态,这个中间状态是为了进一步把状态迁移为多结点状态

多结点状态根据是否活跃可以分为 LIST_NLIST_A ,JobSupport 的状态迁移主要是在 invokeOnCompletion() 方法中执行的,这个方法是在一个 while 循环中执行的,当第二个任务添加后,状态迁移为 SINGLE+ 后,就会再一次执行 while 循环中的代码,这时会把工作结点添加到结点列表的最后,也就是真正进入多结点状态了。

结束中状态根据是否取消可以分为 COMPLETINGCANCELLING ,结束状态是进入最终状态前的中间状态。

14. JobNode

JobSupport 的状态主要有无节点状态 Empty、单节点状态(Single/Single+)以及多节点状态NodeList,在协程取消或完成时,需要通知自己的工作节点 JobNode ,JobNode 就是对子协程和任务的封装,对协程来说任务和子协程就是工作节点,当协程被取消时,就要通知工作节点执行取消操作。

JobNode 的主要实现类有 AwaitAllNodeChildContinuationChildHandleNodeChildCompletionDisposeOnCompletion 以及 ResumeOnCompletion

AwaitAllNode 是在调用 awaitAll() 方法等待所有 Deferred 完成的时候间接创建的。awaitAll() 方法会创建一个 AwaitAll 对象,并调用它的 awiat() 方法,这个 await() 方法会创建一个负责等待所有工作项完成的节点 AwaitAllNode ,当有 Deferred 完成时,Deferred 实例会调用 AwaitAllNode 的invoke() 方法,在这个 invoke() 方法中,会判断是否所有的 Deferred 工作项都以完成,如果是的话,就把值往后传 。

ChildContinuation 是通过suspendCancellableCoroutine() 内联函数创建的,这个函数中会创建 CancellableContinuationImpl(任务),并调用它的initCancellability() 方法初始化任务的可取消性,所谓的可取消性就是在协程取消的时候,让协程有取消该任务的方法。initCancellability() 方法中会调用 initParentHandle() 方法,这个方法中会创建 ChildContinuation,并把它传给协程的 invokeOnCompletion() 方法中,让协程在取消的时候通知工作节点(任务)。在协程被取消后,ChildContinuationinvoke() 方法会被调用,invoke() 方法中会调用任务的 parentCancelled() 方法,在协程被取消后,任务就会把工作项取消异常 JobCancellationException 设为 state ,任务需要执行的代码块也不会继续执行了。当工作项完成后,工作节点会被移除。

ChildHandleNode 是在协程初始化的时候创建的,在 JobSupport 初始化的时候,会调用自己的 initParentJob() 方法,initParentJob() 会调用父协程的 attachChild() 方法把子协程添加到节点列表。attachChild() 方法中会把子协程封装为 ChildHandleNode ,并作为 ChildHandle 传到自己的 invokeOnCompletion() 方法中,invokeOnCompletion() 方法会根据当前的状态把节点作为自己的状态,或者添加到节点列表。

15. BlockingCoroutine#start

runBlocking() 方法中调用的 start() 方法是 AbstractCoroutine 实现的,该方法会调用启动选项 CoroutineStart ,并把代码块和接收者等参数传到 CoroutineStart 中,接收者 receiver 就是 BlockingCoroutine 。

16. CoroutineStart

CoroutineStart 是一个枚举类,CoroutineStart 中定义了 DEFAULTLAZYATOMICUNDISPATCHED 四个启动选项。

  • DEFAULT

    默认启动选项,协程启动后可以取消,启动后会立刻把协程交给协程分发器进行分发,通过 CoroutineScopelaunch() 扩展函数启动的协程默认的启动选项就是 DEFAULT

  • LAZY 延迟启动选项,将该选项传给协程作用域 CoroutineScopelaunch() 扩展函数时,launch() 中会创建一个 LazyStandaloneCoroutine 来执行任务,这个协程跟其他协程的主要区别,就是它不会立刻执行,需要我们自己调用它的 start() 方法才会开始执行

  • ATOMIC

    原子启动选项,启动后不可取消,通过 startCoroutine() 扩展函数启动协程时,启动选项就是 ATOMIC

  • UNDISPATCHED

    立刻启动(不分发)选项,使用该选项后,协程会立刻执行直到第一个挂起点,使用 startCoroutineUndispatched() 扩展函数启动协程时启动选项就是 UNDISPATCHED

17. startCoroutineCancellable()

stratCoroutineCancellable() 中的代码是在一个 runSafely() 代码块中执行的,在这个代码块中,会调用 createCoroutineUnintercepted() 方法实例化协程代码块对应的 SuspendLambda ,然后会通过协程分发器的 interceptContinuation() 方法把代码块封装为任务 DispatchedContinuation ,然后会调用 DispatchedContinuationresumeCancellableWith() 方法开始执行任务。

DispatchedContinuationresumeCancellableWith() 方法会通知协程分发器开始执行当前任务,协程分发器调用 DispatchedContinuationrun() 方法时,DispatchedContinuation 又会调用协程代码块对应的匿名内部类的的 invokeSuspend() 方法执行代码块中的代码。

下面我们来看下使用默认启动选项启动协程时调用的 startCoroutineCancellable() 函数的具体实现。

startCoroutineCancellable() 函数中,会接收到 receivercompletion 两个参数,这两个参数都是 BlockingCoroutine ,然后会在 runSafely() 代码块中调用 createCoroutineUninterpceted() 方法创建协程,并调用该协程的 intercepted() 方法获取 Continuation ,最后再调用 Continuation 的 resumeCancellableWith() 方法让协程恢复执行。

18. runSafely()

runSafely() 方法只是捕获了代码块执行过程中遇到的异常,有异常时则通过 dispatcherFailure() 方法进行分发。

到这一步后,协程启动就算是完成了,下面来看下任务分发的过程。

6. 任务分发过程

1. createCoroutineUnintercepted()

createCoroutineUnintercepted() 方法会判断当前的挂起函数是否为 BaseContinuationImpl 的子类,前面讲到的 ExampleTest 的匿名内部类是 SuspendLambda 的子类,SuspendLambda 就是 BaseContinuationImpl 的子类。

如果是的话说明是 SuspendLambda ,这时就调用 ExampleTest 的匿名内部类中的 create() 方法,并把 probeCompletion 传进去,probeCompletion 对应的是 BlockingCoroutine ,最后返回从 create() 方法中获取到的匿名内部类(Function2)的实例,probeCoroutineCreated() 默认是直接返回参数的空实现,如果是在 Debug 程序的话,则会被替换为初始化堆栈信息的操作。

2. ContinuationImpl#intercepted

intecepted() 方法的实现在 SuspendLambda 的父类 ContinuationImpl 中,当 ContinuationImpl 中的 intercepted 成员变量为空时,它就会从上下文中获取 ContinuationInterceptor ,在 runBlocking() 的例子中,ContinuationInterceptor就是 BlockingEventLoop ,也就是会调用 BlockingEventLoop 的 interceptContinuation() 方法拦截 Continuation ,这里的 Continuation 就是 runBlocking { } 代码块中的代码。

3. CoroutineDispatcher#interceptContinuation

interceptContinuation() 方法是 BlockingEventLoop 的父类 CoroutineDispatcher 的方法,这个方法会把 continuation 封装为一个新的 DispatchedContinuation 实例,DispatchedContinuation 和 SuspendLambda 一样,都是 DispatchedTask 的子类,这里传到 DispatchedCoContinuation 的构造函数中的 continuation 就是 ExampleTest 匿名内部类中的 invokeSuspend() 方法的引用。

4. DispatchedContinuation#resumeCancellableWith()

在通过 CoroutineStart 调用的 startCoroutineCancellable() 扩展函数中,获取到 DispatchedContinuation 后,就会调用它的 resumeCancellableWith() 方法恢复协程的执行。

DispatchedContinuationresumeCancellableWith() 方法中,会判断是否需要分发,默认是需要的,如果使用的是 Unconfined 分发器 ,则不需要分发,直接走 executeUnconfined() 方法。

当需要分发时,就调用 dispatcher 的 dispatch() 方法,而使用 runBlocking() 执行的协程的分发器就是 BlockingEventLoop 。

5. EventLoopImplBase#dispatch

BlokingEventLoop 的 disatpch() 方法的具体实现在 EventLoopImplBase 中,这个方法会直接调用 enqueue() 方法把任务(invokeSuspend)加入队列,enqueue() 方法会调用 enqueueImpl() 方法把任务加入队列,如果加入成功,就调用 unpark() 方法唤醒被挂起的线程。

enqueueImpl() 方法中,如果队列为空,则把 task(DispatchedContinuation)设给 _queue 成员,第一次调用 runBlocking() 时,该成员就是 null 。

当 _queue 不为空时,则会创建一个用来保存任务的队列 Queue ,Queue 的具体实现为 LockFreeTaskQueueCore ,这是一个无锁队列,新加入的任务会放到这个队列的后面。

6. EventLoopImplPlatfom#unpark

unpark() 的实现在 EventLoopImplPlatform 中,在 unpark() 方法中,如果执行任务的线程和调用 runBlocking 的线程不是同一个线程的话,就会调用 unpark() 方法唤醒调用线程。

7. 协程挂起过程

看完了任务分发的过程,下面再来看下协程挂起(任务执行)过程。

runBlocking() 中调用了 BlockingCoroutinestart() 方法把任务加入到事件循环的任务队列后,就会调用 BlockingCoroutinejoinBlocking() 方法让事件循环执行任务,下面我们就来看下 joinBlocking() 的背后做了什么。

1. BlockingCoroutine#joinBlocking

joinBlocking() 方法中会通过 BlockingEventLoopincrementUseCount() 方法增加使用次数的值,当使用次数为 0 时,则 EventLoop 处于不活跃状态,否则处于活跃状态。

然后会在一个 while 循环中不断调用 BlockingEventLoopprocessNextEvent() 方法不断让事件循环处理事件(任务),直到任务(如 DispatchedContinuation)都完成了,这时工作项(Job)的状态为已完成,就退出循环。

EventLoop 的 processNextEvent() 会返回任务的挂起时间,如果工作项(Job)未完成,则调用 parkNanos() 方法挂起线程,当工作项完成后,就调用 decrementUseCount() 减少使用次数,当使用次数为 0 时,就关闭事件循环。

减少完使用次数后,就用 unboxState() 扩展函数再次检查一下 Job 状态是否为未完成,如果不是的话,就看下状态是否为已完成异常,如果是的话则抛出异常,不是的话则返回状态 state 。

2. EventLoopImplBase#processNextEvent()

在 EventLoopImplBase 的 processNextEvent() 方法中,会优先处理无限制的(unconfined)事件(任务),如果没有无限制事件,就会处理需要延迟处理的事件,最后再处理普通事件,使用 suspendCancellableCoroutine() 处理的事件就是 DispatchedTask 的子类 DispatchedContinuation

拿到 DispatchedContinuation 后,就会调用它的 run() 方法。

在 EventLoop 中有 3 种类型的队列:

  1. 无限制事件队列
  2. 延迟事件队列
  3. 普通事件队列

无限制事件队列就是使用 Unconfined 分发器分发的事件,比如 ReceiveChannel 用来发送广播的 broadcast() 扩展函数,使用的协程分发器就是 Unconfined 。

延迟事件队列就是用 delay(time)方法延迟执行的操作。

普通事件就是用 suspendCancellableCoroutine 等用来挂起协程的操作的事件。

3. EventLoopImplBase#nextTime

EventLoopImplBasenextTime 成员变量重写了 get() 方法,获取它的时候,如果父类 EventLoopnextTime0 ,则返回 0 。

否则如果队列为空,则判断有没有延迟任务,没有的话则一直挂起线程,如果有的话,就返回延迟任务与当前时间的时间差,延迟任务也就是在协程代码块中调用的 delay(time) 方法创建的任务。

如果队列不为空,则返回 0 ,也就是直接执行下一个任务。如果队列为已关闭状态 CLOSED_EMPTY,则无限期挂起线程,直到下一个任务进来,当已有的任务都执行完成时,队列的状态就是 CLOSED_EMPTY 。

如果 queue 的值不是已关闭状态,说明队列中有且只有一个任务,这时直接返回 0 ,也就是执行该任务。

EventLoop 的 nextTime 成员变量的 get() 方法也重写了,获取它的时候,会判断无限制队列是否为空,如果为空,就返回 Long.MAX_VALUE ,由子类决定要不要无限期挂起线程。

4. EventLoopImplBase#dequeue

EventLoopImplBasedequeue() 方法中,不断遍历任务队列,如果队列为空,则返回空,如果队列不为空,则返回队列的第一个任务,如果队列为已关闭状态 CLOSED_EMPTY ,则返回空,否则就把 _queue 设为空,并直接把任务作为 Runnable 返回。

通过 suspendCancellableCoroutine() 挂起协程所得到的任务为 DispatchedContinuation ,也就是这时代码执行的是最后一行,把 _queue 设为 null ,并返回 DispatchedContinuation

5. DispatchedTask#run

DispatchedContinuation 的 run() 方法的实现在父类DispatchedTask 中,该方法会从 DispatchedContinuation 中获取 continuation 实例,这个 continuation对应的就是前面讲到的 suspendCoroutineUninterceptedOrReturn 代码块,该方法对应的是匿名内部类 SuspendLambda 实例,获取到 SuspendLambda 后,就会把 state 传给 SuspendLambda 的 resume() 方法。

DispatchedTaskSchedulerTask 的子类,构造函数中有一个 resumeMode(恢复模式)参数。SchedulerTask 的具体实现类为 Task

6. Task

Task 是一个抽象类,实现了 Runnable 接口,它的构造函数中有两个参数,分别是任务提交时间 submissionTime 和任务执行上下文 taskContext ,提交时间参数的默认值为 0 ,taskContext 的默认类型为非阻塞上下文 NonBlockingContext

Task 中还有一个任务执行模式 mode 成员字段,该字段的默认值为 taskContexttaskModeNonBlockingContext 的 taskMode 为 TASK_NON_BLOCKING ,也就是非阻塞任务。

8. 协程恢复过程

看完了协程挂起过程,下面来看下协程恢复的过程都做了什么。

1. BaseContinuationImpl#resumeWith

SuspendLambda 的 resume() 方法的具体实现为 BaseContinuationImplresumeWith() 方法。

resumeWith() 方法中会在一个 while 循环中调用前面在 runBlocking() 方法中声明的 ExampleTest 的匿名内部类的 invokeSuspend() 方法,如果 invokeSuspend() 方法调用了其他的挂起函数,那 invokeSuspend() 方法就会返回 COROUTINE_SUSPEND 标志,这时就会直接退出 while 循环,否则就把 invokeSuspend() 方法输出的结果封装到 Result 中并返回 Result 。

拿到结果后,就会调用 releaseIntercepted() 方法让 ContinuationInterceptor释放拦截到的 Continuation 。

释放 Continuation 后,就会判断 completion 是否为 BaseContinuationImpl ,如果是的话就把当前要调用的 invokeSuspend() 方法的对象改为 BaseContinuationImpl 的 completion ,completion 的类型也是 Continuation 。

如果 completion 不是 BaseContinuationImpl ,就调用 completion 的 resumeWith() 方法 ,比如使用 runBlocking() 方法启动协程时,第一个任务的 completion 就是 BlockingCorotuine ,因为 BlockingCoroutine 的父类是 AbstractCoroutine ,而 AbstractCoroutine 实现了 Continuation 接口。

2. AbstractCoroutine#resumeWith

AbstractCoroutineresumeWith() 方法中,会调用 makeCompletingOnce() 方法开始任务完成的过程。

如果 makeCompletingO方法返回的是等待子协程(COMPLETING_WAITING_CHILDREN)标志就返回,否则调用 afterResume() 方法,afterResume() 方法默认会调用 afterCompletion() 方法,比如 BlockingCoroutine 的 afterCompletion() 方法就会在当任务恢复执行的线程与调用 runBlocking 的线程不是同一个线程时,唤醒调用 runBlocking 的线程(blockedThread)。

3. AbstractCoroutine#makeCompletingOnce

makeCompletingOnce() 方法中的代码是在一个 loopOnState() 代码块中执行的,这个代码块中的代码是在一个不断循环的 while 循环中执行的。

在这个代码块中,首先会调用 tryMakeCompleting() 方法获取最终状态,如果最终状态是已完成 COMPLETING_ALREADY 的话,就抛出非法状态异常,如果最终状态是重试标志 COMPLETING_RETRY 的话,说明状态(state)不是未完成,这时就再次执行 loop 代码块。

如果最终状态是 COMPLETING_WAITING_CHILDREN 或最终状态(最终结果)的话,就直接返回最终状态。

tryMakeCompleting() 方法中,如果 state 不是 Incomplete(未完成)的话,就返回重试标志。

如果状态是 Incomplete 的话,就有两条路径可以执行,一条是快路径,一条是慢路径。

快路径在简单状态下执行的,简单状态指的是无节点状态 Empty和单节点状态 JobNode ,当 JobSupport 处于简单状态并且状态不是 ChildHandleNode ,并且建议更新的值不是已完成异常时,就会调用 tryFinalizeSimpleState() 方法 ,ChildHandleNode 是子协程(JobSupport)调用父协程的 attachChild() 方法时,会把自己封装为 ChildHandleNode 。

tryFinalizeSimpleState() 方法返回 true 时,说明执行完成成功,这时直接返回建议更新的值,否则返回重试标志。

如果 JobSupport 不是简单状态或者是其他协程的子协程时,又或者 proposedUpdate 是已完成异常时,就要调用 tryMakeCompletingSlowPath() 方法执行慢路径的逻辑。

4. JobSupport#tryFinalizeSimpleState

tryFinalizeSimpleState() 方法中,会把任务执行结果 update 封装为 IncompleteStateBox 并设为自己的状态,然后会调用 completeStateFinalization() 方法把自己从父协程的结点列表中移除,并且调用工作结点的完成回调。

在 completeStateFinalization() 方法中,会调用 parentHandle 的 dispose() 方法把自己从父协程的结点列表中移除,然后会判断是否为单结点状态,如果是的话,就直接调用状态的完成回调(JobNode 是 CompletionHandlerBase的子类)。

如果是多结点状态的话,就调用每一个结点的完成回调。

5. JobSupport#tryMakeCompletingSlowPath

JobSupport 完成时如果是处于多结点状态的话,就会走慢路径 tryMakeCompletingSlowPath() ,在这个方法中,首先hi调用 getOrPromoteCancellingList() 方法为没有 list 的 state 设置 list ,也就是把状态升为多结点状态。

然后会把状态设为结束中 FINISHING ,然后会等待工作结点的工作完成,如果已经没有工作结点需要等待,就会通过 finalizeFinishingState() 方法看下是否有异常,如果没有异常的话,就把结果值 proposedUpdate 返回。

6. JobSupport#getOrPromoteCancellingList()

getOrPromoteCanellingList() 方法中,首先会判断状态的 list 是否为空,如果不为空,则直接返回状态的 list 列表。

如果状态的 list 为空,则会判断当前状态是否为无结点状态 Empty ,如果是的话,则返回一个结点列表 NodeList ,NodeList 是一个双向链表

如果协程当前的状态是单结点状态 JobNode ,则把状态升为多结点状态 NodeList ,如果当前状态是多结点状态,则抛出异常。

7. JobSupport#finalizeFinishingState

finalizeFinishingState() 方法用于设置状态 state 为最终状态,并且会调用 completeStateFinalization() 通知工作结点工作已完成。

8. JobSupport#completeStateFinalization

completeStateFinalization() 只是简单地通知了一下工作结点工作已完成。

9. JobSupport#notifyCompletion

notifyHandlers() 就是用来通知工作结点工作已完成的,只是简单地遍历了一下结点列表,逐个调用它们 invoke() 方法,如果回调执行时出现了异常,就调用 handleOnCompletionException() 方法把异常传给协程异常处理器 CoroutineExceptionHandler

10. JobSupport#tryWiatForChild

在 tryWaitForChild() 方法中,会把自己封装为 ChildCompletion ,并通过子协程的 invokeOnCompletion() 方法让子协程在工作执行完成后,调用父协程的 continueCompleting() 方法继续等待其他的工作结点的工作完成。

11. JobSupport#finalizeFinishingState

JobSupportfinalizeFinishingState() 方法中,会判断任务执行结果是否为已完成异常 CompletedExceptionally ,如果是的话,那就会把异常的 cause 字段作为建议的异常。

如果没有异常的话,就把任务执行结果 proposedUpdate 作为最终状态,如果有异常的话,就通知父协程当前协程已完成,如果没有异常的话,就把最终状态封装为 IncompleteStateBox 并设为当前协程的状态,最后通过 completeStateFinalization() 方法把自己从父协程的工作结点列表中移除。

9. suspendCancellableCoroutine() 原理

看完了 runBlocking() 的实现,下面我们来看 suspendCancellableCoroutine() 内联函数的实现。

1. suspendCancellableCoroutine() 执行过程概述

suspendCancellableCoroutine() 函数会在 suspendCoroutineUninterceptedOrReturn() 代码块中创建 CancellableContinuationImpl 实例,然后调用它的 initCancellability() 方法初始化它的可取消性,initCancellability() 方法会调用 installParentHandle() 方法安装父任务句柄,在这个方法中又会调用 JobSupportinovkeOnCompletion() 方法获取句柄。

获取到句柄后,suspendCancellableCoroutine() 代码块就会执行代码块 block ,等代码块执行完成后,就会调用 CancellableContinuationImpl 的 getResult() 方法获取结果并返回。

CanellableContinuationImpl 是 Task 的子类,JobSupport 是 Job 的实现类,Job 就是一个协程,Task 实现了 Runnable 接口,也就是 Job 是用来执行 Task 的,而 suspendCancellableCoroutine() 就是用来创建一个任务,并且把协程挂起,直到拿到数据为止。

下面我们就一步步看下这些函数都做了什么。

2. suspendCancellableCoroutine()

suspendCancellableCoroutine() 的实现如上所示,这是一个内联函数,它会调用 suspendCoroutineUninterceptedOrReturn() 方法,并在该方法的代码块中创建一个 CancellableContinuationImpl 对象,并往 block 代码块参数中回传 CancellableContinuationImpl 实例,最后调用 CancellableContinuationImpl 的 getResult() 方法获取结果。

suspendCoroutineUninterceptedOrReturn() 中使用了 Kotlin Contract API ,该 API 用于为编译器提供了一些代码分析(如类型推导)帮助,与这篇文章的主题无关,在这里就不展开了。

3. CancellableContinuationImpl

CancellableContinuationImpl 继承了 DispatchedTask ,实现了 CancellableContination 和协程帧栈 CoroutineStackFrame 接口,可以把 CancellableContinuationImpl 理解为可取消任务

CancellableContinuationImpl 有下面 3 个特点:

  1. 有且只有一个取消监听器,该监听器无取消中状态
  2. 只在取消时调用取消监听器,无法立刻调用该监听器
  3. 它的取消监听器无法注销

因为有上面这些限制,所以它的状态机也比较简单,如上图所示,默认为 UNDECIDED 状态,当调用 trySuspend() 后变为挂起 SUSPENDED 状态,当调用 tryResume() 后变为 RESUMED已恢复状态。

CancellableContinuationImpl 一共有下面 4 种状态:

  1. ACTIVE:活跃,无取消监听器,状态类为 Active
  2. SINGLE_A:活跃,有一个取消监听器,状态类为 CancelHandler
  3. CANCELLED:已取消(最终状态),状态类为 CancelledContinuation
  4. COMPLETED:已完成,已生成结果或已抛出异常,状态类不定

4. CancellableContinuationImpl#initCancellablbity()

initCancellability() 方法比较简单,主要是调用 installParentHandle() 方法安装协程句柄,也就是把自己加入到协程结点列表,如果任务已完成的话,则释放句柄,也就是把自己从协程的结点列表中移除。

CancellableContinuationImplinstallParentHandle() 方法首先会从协程上下文中获取工作项(Job),获取到 Job 后,就会创建一个 ChildContinuation并传给 JobSupportinvokeOnCompletion() 方法安装句柄,最后返回句柄,句柄就是用来标识对象的标识符,这个例子中,具体就是 ChildContinuation 。

5. ChildContinuation

ChildContinuationChildHandleNode 类似,区别就是它是给 CancellableContinuation 使用的。

ChildContinuation 继承了 JobCancellingNode 类,当ChildContinuation 的 invoke() 方法被调用时,它会调用子任务 CancellableContinuationImpl 的 parentCancelled() 方法告诉它协程已经取消(完成)了。

ChildContinuation 继承了 JobCancellingNodeJobCancellingNode 继承了 JobNode ,JobNode 实现了 DisposableHandleIncomplete 接口,也就是 ChildContinuation 是一个任务节点,也是一个可销毁句柄(DisposableHandle)。

6. JobSupport#invokeOnCompletion

JobSupportinvokeOnCompletion() 方法中,首先会通过 makeNode() 方法获取任务节点 JobNode ,这里的JobNode 就是前面创建的 ChildContinuation

然后会通过 loopOnState 代码块不断根据 Job 的状态执行不同的操作,当 JobSupport 的 active 参数为 true 时,JobSupport 的默认状态为 EMPTY_N ,这里的 N 是 New 的意思。

7. CancellableContinuationImpl#trySuspend

CancellableContinuationImpl 刚初始化时,由于决策状态机的状态为 UNDECIDIED ,所以这时 trySuspend() 方法会尝试把状态设为 SUSPENDED ,设置成功表示挂起成功,返回 true 。

如果在 suspendCancelalbleCoroutine() 代码块中已经调用了 resume ,这时就不能把状态设为 SUSPENDED 。

8. CancellableContinuationImpl#resume

前面的 ExampleTest 在 suspendCancellableCoroutine 代码块中调用的 resume() 方法,调用的是 CancellableContinuationImplresume() 方法,该方法会调用自己的 resumeImpl() 方法。

resumeImpl() 方法中会根据不同的状态执行不同的操作,如果状态是默认的 Active ,Active 实现了 NotCompleted接口,这时就会调用 resumedState() 方法获取 CompletedContinuation 实例,resumeImpl() 中接收到的 proposedUpdate 参数就是 ExampleTest 传入的字符串 "操作结果" 。

获取到 CompletedContinuation 后,就会把当前任务状态改为 CompletedContinuation ,修改完成后,就调用 detachChildIfNonReusable() 方法把自己(ChildContinuation)从Job(BlockingCoroutine)中移除。

9. CancellableContinuationImpl#getResult

getResult() 方法中,会通过 trySuspend() 方法尝试把任务的状态切换为 SUSPENDED ,切换成功则返回协程挂起标志 COROUTINE_SUSPENDED ,如果挂起失败,并且状态为 CompletedExceptionally ,就抛出恢复堆栈异常,否则如果父任务未取消,则调用 getSuccessfulResult() 获取成功结果。

10. CoroutineScope#launch 原理

看完了 runBlocking()suspendCancellableCoroutine() 的实现后,下面我们来看下更常用的 CoroutineScopelaunch() 扩展函数的实现。

这一大节讲的主要是 CoroutineScope 的 launch() 扩展函数的实现以及默认协程分发器 DefaultScheduler 的实现

1. CoroutineScope#launch 原理概述

以下面这段代码为例。

2. CoroutineScope

CoroutineScope 接口中声明了一个 coroutineContext 常量,也就是实现该接口要提供一个协程上下文。CoroutineScope 有一个 cancel() 扩展函数,Lifecycle 会在生命周期状态为 DESTROYED 时调用 cancel() 函数取消掉该作用域启动的协程,ViewModel 则会在 clear() 方法中调用 cancel() 函数,对于 ActivityFragmentViewModelclear() 方法也是在 destroy 回调里触发的。

CoroutineScope 的实现类主要有 GlobalScopeMainScopeAbstractCoroutine 、 TestScopeImpl 、ProducerScope 、ContextScope 、CloseableCoroutineScope 以及 LifecycleCoroutineScopeImpl ,下面来看下这些协程作用域的作用。

GlobalScope是一个全局协程作用域 ,是一个单例对象,一般情况下在 Android 中不会使用这个作用域来启动协程,因为这么做存在潜在的内存泄漏风险,比如 Activity 退出后,匿名内部类持有 Acitivty 的引用,导致 Activity 无法释放,也就是发生了内存泄漏。

GlobalScope 不支持通过 cancel() 扩展函数取消使用它启动的协程,因为它的 coroutineContext 只能获取,不能设值(调用 plus 方法)。

MainScope 是主线程协程作用域,这个作用域的协程分发器为 Dispatchers.Main ,使用方法如下。

AbstractCoroutine 是 BlockingCoroutine 等协程类的父类,它也实现了 CoroutineScope 接口,也就是它要对外提供协程上下文。

TestScopeImpl 是测试协程作用域,当我们在 runTest() 函数的代码块中执行包含协程的单元测试代码时,这个函数就会创建一个 TestScopeImpl 作为协程的作用域。

ProducerScope 是一个继承了 CoroutineScope 的接口,生产者协程ProducerCoroutine 就实现了该接口,Kotlin Flow 接口的 zip() 扩展函数就会创建生产者协程用来组合两个流的值。

ContextScope 用于直接用协程上下文创建协程作用域,比如 MainScope 就是把 SupervisorJobDispatchers.Main 结合后传给 ContextScope 的方式来创建协程作用域,此外 CoroutineScope 函数也是通过 ContextScope 来创建协程作用域的。

CloseableCoroutineScopeViewModelviewModelScope 使用的协程作用域,这个作用域实现了 Closeable 接口,当 ViewModel 的 clear() 方法被触发时,就会调用这个作用域的 close() 方法取消使用该作用域启动的协程。

LifecycleCoroutineScopeImplLifecycleOwner 的 lifecycleScope 所使用的协程作用域,它的父类 LifecycleCoroutineScope 中声明了 launchWhenCreated() 、launchWhenStarted() 和 launchWhenResumed() 这三个函数,通过这些函数我们可以让代码在 create 、start 和 resume 这三个生命周期回调发生的时候再执行,比如下面这样。

3. CoroutineScope#launch

CoroutineScopelaunch() 扩展函数中首先会调用 newCoroutineContext() 方法创建一个协程上下文 CoroutineContext,这里的创建协程上下文主要是看下是否为 DEBUG 环境,是的话则需要把 CoroutineId 放到默认协程分发器(Dispatchers.Default)的上下文中。

然后如果 start 参数的类型为 lazy ,则创建延迟执行的独立协程 LazyStandaloneCoroutine ,否则创建 StandaloneCoroutine ,然后调用协程的 start() 方法启动协程,最后返回该协程。

4. StandaloneCoroutine

StandaloneCoroutine(独立协程)继承了 AbstractCoroutine 类,并重写了父类的 handleJobException() 方法,改为调用自 CoroutineExceptionHandler 中的 handleCoroutineException() 方法。

如上图所示 ,LazyStandaloneCoroutineStandaloneCoroutine 的子类,StandaloneCoroutine 继承了 AbstractCoroutine ,AbstractCoroutine 又继承了 JobSupport 类。

5. DefaultScheduler

internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    // ...
}

Dispatchers.Default 对应的协程分发器为 DefaultScheduler ,DefaultScheduler 是 SchedulerCoroutineDispatcher 的子类,SchedulerCoroutineDispatcher 是 ExecutorCoroutineDispatcher 的子类。

6. SchedulerCoroutineDispatcher

SchedulerCoroutineDispatcher 的核心线程池大小为 CPU 核数,最大线程池大小为 2097150(2M),空闲线程存活时间为60 秒。

SchedulerCoroutineDispatcher 中重写了 dispatch 方法,具体实现为 CoroutineScheduler 的 dispatch() 。

7. CoroutineScheduler#dispatch

CoroutineScheduler 的 dispatch() 方法中,会调用 createTask() 方法初始化 任务(如 DispatchedContinuation) 的提交时间 submissionTime 和任务上下文 taskContext

然后会尝试把任务添加到局部任务队列 localQueue,如果添加失败,就尝试把任务添加到全局任务队列globalQueue,如果还是添加失败则抛出拒绝执行异常。

然后会通过 currentWorker 获取当前工作者线程,如果工作者线程为空的话,就调用 addToGlobalQueue() 方法把任务添加到全局任务队列,

如果添加成功,则判断是否需要跳过唤醒操作,当 tailDispatch 为默认值 false 时,则不需要跳过。

当任务的模式为非阻塞时,如果不需要跳过唤醒操作,则调用 signalCpuWork() 唤醒负责计算密集型任务的工作者线程。

如果任务的工作模式为阻塞,则调用 signalBlockingWork() 唤醒负责阻塞任务的工作者线程。

任务的工作模式有两种:

  1. TASK_NON_BLOKCING :非阻塞模式
  2. TASK_PROBABLY_BLOCKING :可能阻塞模式,用 IO 线程池执行的任务就是该模式

8. CoroutineScheduler#addToGlobalQueue

private fun addToGlobalQueue(task: Task): Boolean {
    return if (task.isBlocking) {
        globalBlockingQueue.addLast(task)
   } else {
        globalCpuQueue.addLast(task)
   }
}

addToGlobalQueue() 的实现比较简单,就是根据任务的 isBlocking 字段的值把任务添加到全局阻塞任务队列或全局 CPU 任务队列(计算密集型任务队列)。

9. CoroutineScheduler#signalCpuWork

在 singalCpuWork() 方法中,会调用 tryUnpark() 方法唤醒已存在的工作者线程,如果没有的话,则调用 tryCreateWorker() 方法创建新的工作者线程。

10. CoroutineScheduler#tryCreateWorker

在 tryCreateWorker() 方法中,CPU 工作者线程的数量就是已存在的工作者线程的数量减去阻塞任务工作者线程的数量,如果 cpuWorkers 小于核心线程池大小,则创建一个新的 CpuWorker ,如果创建完后发现只有 1 个 CpuWorker ,就再创建一个。

11. CoroutineScheduler#Worker

CoroutineScheduler 的工作者线程的具体实现类为它的内部类 Worker ,Worker 是 Thread 的子类,有一个局部任务队列 WorkQueue ,默认状态为 DORMATN(休眠)。

Woker 中定义了一个 WorkerState 枚举类,这个枚举类中包含了 Worker 的状态,一共有下面 5 种状态:

  1. CPU_ACQUIRED:正在执行或正在查找非阻塞任务
  2. BLOCKING:正在执行阻塞任务
  3. PARKING :等待状态
  4. DORMANT :休眠状态
  5. TERMINATED :终止状态

在 CoroutineScheduler 的 createNewWorker() 方法中,创建了一个新的 Worker 后,就会调用它的 run() 方法。

12. Worker#run

Workerrun() 方法会执行runWorker() 方法,这个方法会调用 findTask() 方法找出需要执行的任务,找到后就调用 executeTask() 方法执行任务,如果找不到任务的话,就调用 tryPark() 方法让 Worker 进入等待状态,直到有新的任务后,Worker 被唤醒。如果 Worker 处于终止状态,则调用 tryReleaseCpu() 方法释放对 CPU 许可证(permit)的占用。

13. Worker#findTask

findTask() 方法首先会调用 tryAcquireCpuPermit() 方法获取 CPU 占用的许可证,获取到许可证后就会调用 findAnyTask() 方法返回任务。

如果获取不到 CPU 占用许可证,就会看下是否需要扫描 localQueue ,如果要扫描的话就从 localQueue 中找出阻塞任务来执行,如果不需要扫描 localQueue 的话,就从全局阻塞任务队列中拿出任务执行。

如果没有阻塞任务的话,就调用 trySteal() 方法把其他 Worker 还没执行的阻塞任务拿过来执行。

14. Worker#tryAcquireCpuPermit

在 Worker 的 tryAcquireCpuPermit() 方法中,会调用 CoroutineScheduler 的 tryAcquireCpuPermit() 方法获取可用的许可证,这个值默认为核心线程池大小。

如果没有可用的许可证,也就是不能再开启线程,则返回 false ,否则更新 CoroutineScheduler 的 controlState 的值。

15. Worker#findAnyTask()

在 Worker 的 findAnyTask() 方法中,会通过 scanLocalQueue 参数判断局部任务队列是否有任务,如果有的话,就看下是否优先从全局任务队列中获取任务,不是的话就从局部任务队列中获取任务并返回。

如果局部任务队列为空,则从全局任务队列中获取任务并返回。

如果局部任务队列和全局任务队列都为空,则从其他 Worker 中拿非阻塞任务并返回。

16. Worker#executeTask

executeTask() 方法中最主要的就是通过 runSafely() 方法调用任务的 run() 方法,如果遇到异常的话就通过线程的 uncaughtExceptionHandler 抛出。

这里的任务就是 DispatchedContinuation ,DispatchedContinuation 的 run() 方法会通过 SuspendLambda 的父类 BaseContinuationImpl 的 resumeWith() 方法调用协程代码块对应的匿名内部类的 invokeSuspend() 方法,到这里,关于 launch() 的实现就讲完了。

17. CoroutineScope#launch 原理小结

launch() 协程构建器函数的实现与 runBlocking() 实现的主要的差异就是协程类型和协程分发器的类型,launch() 在不修改 context 参数的情况下,默认用的协程分发器是 DefaultScheduler ,在不修改 start 参数的情况下,默认用的是 StandaloneCoroutine

StandaloneCoroutine 只是简单地重写了 JobSupport 的 handleJobException() 方法,而 BlockingCoroutine 则重写了afterCompletion() 方法,并在这个方法中唤醒被阻塞的线程。而且 BlockingCoroutine 需要执行的任务要通过 joinBlocking() 方法让事件循环来执行,而 StandaloneCoroutine 的任务则是由 DefaultScheduler 启动 Worker 后,由 Worker 来执行的。

11. withContext() 原理

我们在 Android 中使用协程的时候,一般都要用 withContext() 把任务执行的分发器从主线程分发器切到工作线程分发器(如 Dispatchers.IO),或者是从工作线程分发器切回主线程分发器 Dispatchers.Main ,下面我们就来看下 withContext() 是怎么实现的。

1. withContext()

withContext() 的用于在指定的协程上下文执行代码块中的代码,也就是指定用于分发协程的分发器。

withContext() 中的代码主要是在 suspendCoroutineUninterceptedOrReturn() 代码块中执行的,这个代码块中可以获取一个 Continuation 实例 uCont,这个 Continuation 实例就是 withContext() 代码块对应的 SuspendLambda 匿名内部类。

在这个代码块中,首先会把旧的上下文与新的上下文进行结合,所谓的结合,主要就是替换掉上下文中 KeyContinuationInterceptor 的元素,比如这个元素原来是默认协程分发器,把它换成 IO 协程分发器。

如果新的上下文和旧的上下文没有区别的话,就创建一个作用域协程 ScopeCoroutine ,并调用它的 startUndipsatchedOrReturn() 扩展函数启动协程。

如果新的协程上下文和旧的上下文的区别只是 ContinuationInterceptor 不同的话,那就创建一个 UndispatchdCoroutine ,并调用它的 startUndispatchedOrReturn() 扩展函数启动协程。

如果新的协程上下文和旧的协程上下文除了 ContinuationInterceptor ,还有其他元素也不同的话,那就创建一个 DispatchedCoroutine ,并调用它的 startCoroutineCancellable() 扩展函数,最后与 suspendCanellableCoroutine() 函数一样,调用这个协程的 getResult() 方法获取执行结果,这个方法会把父协程挂起,也就是父协程代码块中的代码执行到 withContext() 后就不会继续执行,直到 withContext() 方法返回执行结果后,BaseContinuationImpl 中把值传给 completion ,completion 就是 DispatchedCoroutine ,然后 DispatchedCoroutine 就会把状态切换为 RESUMED 恢复执行。

如上图所示,DispatchedCoroutineUndispatchedCoroutine 都是 ScopeCoroutine 的子类。

2. ScopeCoroutine

ScopeCooutine 继承了 AbstractCoroutine ,重写了 JobSupportisScopedCoroutine 常量,重写了 JobSupportafterCompletion()AbstractCoroutineafterResume() 方法。

ScopeCoroutine 的 afterCompletion() 方法中,在当前协程的工作完成或取消后,就会调用 uContintercepted() 方法,让协程分发器把它封装为 DispatchedContinuation ,并把结果传给这个 DispatchedContinuation ,uCont 就是 withContext() 代码块对应的 SuspendLambda

ScopeCoroutineafterResume() 方法被调用时,它会调用 uCont 成员的 resumeWith() 方法,这里调用的 resumeWith()afterCompletion() 中调用的resumeCancellableWith() 方法的区别,就是 resumeCancellableWith() 会把 DispatchedContinuation 的 resumeMode 改为 MODE_CANCELLABLE,而 resumeWith() 则会把 resumeMode 改为 MODE_ATOMIC

3. 分发模式 resumeMode

resumeModeDispatchedContinuation 的父类 DispatchedTask 的成员变量,它表示分发模式,它的取值有下面 5 个。

  1. MODE_ATMOIC(0)

    无法取消的分发模式,使用 Unconfined 协程分发器时,任务的分发模式就是 MODE_ATOMIC

  2. MODE_CANCELLABLE(1)

    可取消的分发模式,通过 suspendCancellableCoroutine() 挂起协程时,创建的 CancellableContinuationImpl 的分发模式就是 MODE_CANCELLABLE

  3. MODE_CANCELLABLE_REUSABLE(2)

    可取消且可重用的分发模式,用于 suspendCancellableCoroutineReusable() 内联函数,ReceiveChannel 发送广播用的 broadcast() 扩展函数用的就是这个内联函数

  4. MODE_UNDISPATCHED(4)

    不进行分发的分发模式,delay() 函数用的就是 MODE_UNDISPATCHED

  5. MODE_UNINITIALIZED(-1)

    未初始化,DispatchedContinuation 的默认分发模式

4. JobSupport#isScopedCoroutine

JobSupportisScopedCorotuine 成员变量主要在 cancelParent() 方法中使用,这个方法会在子协程被取消的时候被调用,在这个方法中,会判断当前协程是否为有作用域的协程 scopedCoroutine,如果是的话,则不调用父协程的 childCancelled() 方法。

5. ScopeCoroutine#startUndispatchedOrReturn

ScopeCoroutinestartUndispatchedOrReturn() 方法中,会在 undispatchedResult() 代码块中调用 block 参数的 startCoroutineUninterceptedOrReturn() 方法。

6. ScopeCoroutine#undispatchedResult

undispatchedResult() 函数中,会通过 startBlock() 的执行结果来决定下一步的操作,如果执行结果为挂起标志 COROUTINE_SUSPENDED ,则返回 COROUTINE_SUSPENDED 挂起协程。

如果执行结果 result 不是挂起标志,则调用 makeCompletingOnce() 方法获取协程状态。

7. DispatchedCoroutine

DispatchedCoroutine 有一个和 DispatchedContinuation 一样简单的决策状态机,这个状态机有 UNDECIDEDSUSPENDEDRESUMED 三种状态。

DispatchedCoroutine 重写了 AbstractCoroutineafterResume() 方法,这个方法会在 resumeWith() 方法被调用的时候调用,协程的 resumeWith() 方法一般是在 SuspendLambda(代码块)执行完后调用的。

DispatchedCoroutineafterResume() 方法首先会尝试把状态迁移到 RESUMED ,如果迁移失败的话,说明协程处于挂起状态,这时就要通过协程分发器再次把 SuspendLambda 封装为任务并进行分发。

DispatchedCoroutine 的 getResult() 方法中,会尝试把状态迁移到 SUSPENDED ,迁移成功则返回挂起标志,如果迁移失败的话,说明处于 RESUMED 状态,也就是已经获取到执行结果了,这时就不用再挂起了,直接把状态拆箱并返回。

unboxState() 扩展函数只是简单地判断了一下当前状态是否为未完成状态 IncompleteStateBox ,如果是的话,则返回它的 state 成员,否则返回当前状态。

8. recoverResult()

参考资料

其他

如果你想直接和我交流 Android 或 Kotlin 相关的技术问题,可以加我的微信:oushaoze2015

作者:欧少泽

%s 个评论

要回复文章请先登录注册