Kotlin协程基础元素梳理分析

Kotlin 协程的基础元素:Continuation、SafeContinuation、CoroutineContext、CombinedContext、CancellationException、intrinsics。CombinedContext是 CoroutineContext 的一个实现类,SafeContinuation是 Continuation 的实现类。

Continuation 是什么?

class Call<T>(callBack: Call.CallBack<T>) {
 fun cancel() {
 }
 interface CallBack<T> {
 fun onSuccess(data: T)
 fun onFail(throwable: Throwable)
 }
}
suspend fun <T : Any> Call<T>.await(): T =
 suspendCancellableCoroutine<T> { continuation ->
 val call = Call(object : Call.CallBack<T> {
 override fun onSuccess(data: T) {
 continuation.resume(data, onCancellation = null)
 }
 override fun onFail(throwable: Throwable) {
 continuation.resumeWithException(throwable)
 }
 })
 continuation.invokeOnCancellation {
 call.cancel()
 }
 }

要实现挂起函数的时候,可以使用 suspendCoroutine{}、suspendCancellableCoroutine{}这两个高阶函数,在它们的 Lambda 当中,我们可以使用它暴露出来的 continuation 对象,把程序的执行结果或异常传到外部去。常用于实现挂起函数内部逻辑。

fun checkLength() {
 runBlocking {
 val result = getStrLengthSuspend("Kotlin")
 println(result)
 }
}
suspend fun getStrLengthSuspend(text:String):Int = suspendCoroutine {
 continuation ->
 thread {
 Thread.sleep(1000L)
 continuation.resume(text.length)
 }
}
Log
6
Process finished with exit code 0

使用 suspendCoroutine{}实现了挂起函数,在它内部,使用 continuation.resume() 的方式,传出了挂起函数的返回值。

为什么以 continuation.resume() 这样异步的方式传出结果,挂起函数就能接收到结果呢?

suspend fun getStrLengthSuspend(text: String): Int = suspendCoroutine { continuation ->
 thread {
 Thread.sleep(1000L)
 continuation.resume(text.length)
 }
}
fun checkLength2() {
 val func = ::getStrLengthSuspend as (String, Continuation<Int>) -> Any?
 func("Kotlin", object :Continuation<Int> {
 override val context: CoroutineContext
 get() = EmptyCoroutineContext
 
 override fun resumeWith(result: Result<Int>) {
 println(result.getOrNull())
 }
 })
 Thread.sleep(2000L)
}
Log
6
Process finished with exit code 0

把函数强转成了带有 Continuation 的函数类型,然后通过匿名内部类的方式,创建了一个 Continuation 对象传了进去。挂起函数的本质,就是 Callback!

把 Continuation 改为 Callback:

fun getStrLength() {
 func2("Kotlin", object : MyCallBack<Int> {
 override fun resume(value: Int) {
 println(value)
 }
 })
}
fun func2(text: String, callBack: MyCallBack<Int>) {
 thread {
 Thread.sleep(1000L)
 callBack.resume(text.length)
 }
}
interface MyCallBack<T> {
 fun resume(value: T)
}
Log
6
Process finished with exit code 0

Continuation 改成 Callback 以后,使用匿名内部类创建 Callback 用于接收异步结果;异步函数内部,使用 callback.resume() 将结果传出去。

Kotlin 协程当中的 Continuation,作用其实就相当于 Callback,它既可以用于实现挂起函数,往挂起函数的外部传递结果;也可以用于调用挂起函数,我们可以创建 Continuation 的匿名内部类,来接收挂起函数传递出来的结果。

Java 代码中如何调用 Kotlin 的挂起函数吗?

public static void main(String[] args) {
 SuspendFromJavaDo.INSTANCE.getUserInfo(new Continuation<String>() {
 @NonNull
 @Override
 public CoroutineContext getContext() {
 return EmptyCoroutineContext.INSTANCE;
 }
 @Override
 public void resumeWith(@NonNull Object o) {
 System.out.println(o + "");
 }
 });
 }
object SuspendFromJavaDo {
 suspend fun getUserInfo():String {
 delay(1000L)
 return "Kotlin"
 }
}

所以,实现挂起函数逻辑的时候,常用到 suspendCoroutine{}、suspendCancellableCoroutine{}。

Continuation.kt源码

public interface Continuation<in T> {
 /**
 * The context of the coroutine that corresponds to this continuation.
 */
 public val context: CoroutineContext
 /**
 * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
 * return value of the last suspension point.
 */
 public fun resumeWith(result: Result<T>)
}
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
 contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
 return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
 val safe = SafeContinuation(c.intercepted())
 block(safe)
 safe.getOrThrow()
 }
}

suspendCoroutineUninterceptedOrReturn{}实现了suspendCoroutine{}。

val safe = SafeContinuation(c.intercepted()) 包裹Continuation。

block(safe)调用 Lambda 当中的逻辑。

safe.getOrThrow() 取出 block(safe) 的运行结果,Continuation 当中是可以存储 result 的。这个 Result 可能是正确的结果,也可能是异常。

@Suppress("UNUSED_PARAMETER", "RedundantSuspendModifier")
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
 contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
 throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}

为什么这个高阶函数的源代码会是抛出异常呢?

“Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic.”suspendCoroutineUninterceptedOrReturn 是一个编译器内建函数,它是由 Kotlin 编译器来实现的。

suspendCoroutineUninterceptedOrReturn 这个高阶函数的参数,它会接收一个 Lambda,类型是(Continuation<T>) -> Any?,这里的“Any?”类型,其实就能代表当前这个挂起函数是否真正挂起。

fun test() {
 runBlocking {
 val result = testNoSuspendCoroutinue()
 println(result)
 }
}
private suspend fun testNoSuspendCoroutinue() = suspendCoroutineUninterceptedOrReturn<String>{
 continuation ->
 return@suspendCoroutineUninterceptedOrReturn "Kotlin"
}
Log
Kotlin
Process finished with exit code 0

直接使用 suspendCoroutineUninterceptedOrReturn 实现了挂起函数,并且,在它的 Lambda 当中,我们并没有调用 continuation.resume()。在挂起函数的外部确实也可以接收到这个结果。

private static final Object testNoSuspendCoroutinue(Continuation $completion) {
 int var2 = false;
 if ("Kotlin" == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
 DebugProbesKt.probeCoroutineSuspended($completion);
 }
 return "Kotlin";
 }

这个函数其实就是一个伪挂起函数,它的内部并不会真正挂起。这样,当我们从外部调用这个函数的时候,这个函数会立即返回结果。

private suspend fun testSuspendCoroutinues() =
 suspendCoroutineUninterceptedOrReturn<String> { continuation ->
 thread {
 Thread.sleep(2000L)
 continuation.resume("Kotlin")
 }
 return@suspendCoroutineUninterceptedOrReturn kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
 }
fun main() {
 runBlocking {
 val testSuspendCoroutinues = testSuspendCoroutinues()
 println(testSuspendCoroutinues)
 }
}
Kotlin
Process finished with exit code 0

使用了 continuation.resume(),挂起函数的外部也能接收到这个结果。

private static final Object testSuspendCoroutinues(Continuation $completion) {
 int var2 = false;
 ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, (Function0)(new TestCoroutine777Kt$testSuspendCoroutinues$2$1($completion)), 31, (Object)null);
 Object var10000 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
 //将 var10000 赋值为 COROUTINE_SUSPENDED 这个挂起标志位。
 if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
 DebugProbesKt.probeCoroutineSuspended($completion);
 }
//返回挂起标志位,代表 testSuspendCoroutinues() 这个函数会真正挂起。
 return var10000;
 }
BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
 int label;
 @Nullable
 public final Object invokeSuspend(@NotNull Object $result) {
 Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
 Object var10000;
 switch(this.label) {
 case 0:
 ResultKt.throwOnFailure($result);
 this.label = 1;
 var10000 = TestCoroutine777Kt.testSuspendCoroutinues(this);
 if (var10000 == var3) {
 return var3;
 }
 break;
 case 1:
 ResultKt.throwOnFailure($result);
 var10000 = $result;
 break;
 default:
 throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
 }
 String testSuspendCoroutinues = (String)var10000;
 System.out.println(testSuspendCoroutinues);
 return Unit.INSTANCE;
 }
 @NotNull
 public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
 Intrinsics.checkNotNullParameter(completion, "completion");
 Function2 var3 = new <anonymous constructor>(completion);
 return var3;
 }
 public final Object invoke(Object var1, Object var2) {
 return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
 }
 }), 1, (Object)null);

由于 suspend 修饰的函数,既可能返回 CoroutineSingletons.COROUTINE_SUSPENDED,也可能返回实际结果,甚至可能返回 null,为了适配所有的可能性,CPS 转换后的函数返回值类型就只能是 Any? 了。

suspendCoroutineUninterceptedOrReturn{}这个高阶函数的作用了:它可以将挂起函数当中的 Continuation 以参数的形式暴露出来,在它的 Lambda 当中,我们可以直接返回结果,这时候它就是一个“伪挂起函数”;或者,我们也可以返回 COROUTINE_SUSPENDED 这个挂起标志位,然后使用 continuation.resume() 传递结果。相应的,suspendCoroutine{}、suspendCancellableCoroutine{}这两个高阶函数,只是对它的一种封装而已。

作者:且听真言原文地址:https://blog.csdn.net/zhangying1994/article/details/127820334

%s 个评论

要回复文章请先登录注册