Kotlin协程Channel特点及使用细节详解

正文

在协程启动模式中已经知道async是可以返回结果的,但是只返回一个,那么在复杂场景下就会不够用了,所以Channel就出现了。

1.认识Channel

Channel的意思是管道、通道,用图表示如下:

Channel的左边是发送方,右边是接收方,中间则是消息,那么代码表示就是下面这样:

fun main() {
 channelTest()
}
fun channelTest() = runBlocking {
 val channel = Channel<Int>() //关键点①
 launch {
 for (i in 1..3) {
 channel.send(i) //关键点②
 logX("send: $i")
 }
 }
 launch {
 for (i in channel) { //关键点③
 logX("receiver: $i")
 }
 }
 logX("end")
}
//输出结果:
//================================
//end 
//Thread:main @coroutine#1
//================================
//================================
//receiver: 1 
//Thread:main @coroutine#3
//================================
//================================
//send: 1 
//Thread:main @coroutine#2
//================================
//================================
//send: 2 
//Thread:main @coroutine#2
//================================
//================================
//receiver: 2 
//Thread:main @coroutine#3
//================================
//================================
//receiver: 3 
//Thread:main @coroutine#3
//================================
//================================
//send: 3 
//Thread:main @coroutine#2
//================================

上面的代码中启动了两个协程,一个发送,一个接收,还有几个关键点:

  • 关键点①:通过Channel创建一个管道,其中泛型Int表示发送的数据类型;
  • 关键点②:启动一个协程通过send发送数据,send是一个挂起函数;
  • 关键点③:启动一个协程遍历channel打印出接收到的消息。

那么这里还有一个问题,在执行完上述代码后程序并没有终止,那要如何终止程序呢?

很简单,在发送完所有消息后调用close方法即可。

launch {
 for (i in 1..3) {
 channel.send(i) //关键点②
 logX("send: $i")
 }
//	修改点
//	 ↓
 channel.close()
 }

Channel也是一种协程资源,用完后如果不关闭那么这个资源就会一直被占用。

public fun <E> Channel(
 capacity: Int = RENDEZVOUS,
 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
 onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
 when (capacity) {
 RENDEZVOUS -> {
 ...
 }
 CONFLATED -> {
 ...
 }
 UNLIMITED -> {
 ...
 }
 BUFFERED -> { 
 ...
 }
 else -> {
 ...
 }
 }

Channel中有三个参数:

  • capacity 代表管道的容量,默认值为RENDEZVOUS,代表容量为0,除此之外还有三个类型:
  • CONFLATED:代表容量为1,新的数据会替代旧的数据;
  • UNLIMITED:代表无限容量;
  • BUFFERED:代表具备一定缓存的容量,默认情况下是64,具体容量由VM参数kotlinx.coroutines.channels.defaultBuffer决定。
  • onBufferOverflow 代表缓冲策略,也就是当缓冲的容量满了之后要怎么做。默认值为SUSPEND,表示在缓冲区溢出时挂起。除此之外还有两个类型:
  • DROP_OLDEST:在缓冲区溢出时删除最旧的值,向缓冲区添加新值,不要挂起;

  • DROP_LATEST:在缓冲区溢出时,立即删除正在添加到缓冲区的最新值(以便缓冲区内容保持不变),不要挂起。

  • onUndeliveredElement 它相当于一个异常处理回调。当管道中的某些数据没有被成功接收的时候,这个回调就会被调用

现在写个案例看一下capacity在其他类型下的区别

/**
 * Channel.CONFLATED
 */
fun channelTest() = runBlocking {
 val channel = Channel<Int>(Channel.CONFLATED)
 launch {
 for (i in 1..4) {
 channel.send(i)
 println("send: $i")
 }
 channel.close()
 }
 launch {
 for (i in channel) {
 println("receiver: $i")
 }
 }
 println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 4
/**
 * Channel.UNLIMITED
 */
fun channelTest() = runBlocking {
 val channel = Channel<Int>(Channel.UNLIMITED)
 launch {
 for (i in 1..4) {
 channel.send(i)
 println("send: $i")
 }
 channel.close()
 }
 launch {
 for (i in channel) {
 println("receiver: $i")
 }
 }
 println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4
/**
 * Channel.BUFFERED
 */
fun channelTest() = runBlocking {
 val channel = Channel<Int>(Channel.BUFFERED)
 launch {
 for (i in 1..4) {
 channel.send(i)
 println("send: $i")
 }
 channel.close()
 }
 launch {
 for (i in channel) {
 println("receiver: $i")
 }
 }
 println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4

再看一下onBufferOverflow在其他类型下的区别

/**
 * capacity = 3,onBufferOverflow = BufferOverflow.DROP_OLDEST
 * 缓冲区设置为3,缓冲区溢出时删除最旧的值,向缓冲区添加新值
 */
fun channelTest() = runBlocking {
 val channel = Channel<Int>(
 capacity = 3,
 onBufferOverflow = BufferOverflow.DROP_OLDEST
 )
 launch {
 for (i in 1..4) {
 channel.send(i)
 println("send: $i")
 }
 channel.close()
 }
 launch {
 for (i in channel) {
 println("receiver: $i")
 }
 }
 println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 2
//receiver: 3
//receiver: 4
/**
 * capacity = 3,onBufferOverflow = BufferOverflow.DROP_LATEST
 * 缓冲区设置为3,缓冲区溢出时立即删除正在添加到缓冲区的最新值
 */
fun channelTest() = runBlocking {
 val channel = Channel<Int>(
 capacity = 3,
 onBufferOverflow = BufferOverflow.DROP_LATEST
 )
 launch {
 for (i in 1..4) {
 channel.send(i)
 println("send: $i")
 }
 channel.close()
 }
 launch {
 for (i in channel) {
 println("receiver: $i")
 }
 }
 println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3

再看一下onUndeliveredElement要如何使用

/**
 * capacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST, onUndeliveredElement
 * 缓冲区设置为2,缓冲区溢出时立即删除正在添加到缓冲区的最新值
 * 接收一个数据后取消接收其他数据
 */
fun channelTest() = runBlocking {
 val channel = Channel<Int>(
 capacity = 2,
 onBufferOverflow = BufferOverflow.DROP_LATEST,
 onUndeliveredElement = {
 println("onUndeliveredElement: $it")
 }
 )
 launch {
 for (i in 1..4) {
 channel.send(i)
 println("send: $i")
 }
 }
 println("receive:${channel.receive()}")
 channel.cancel()
}
//输出结果:
//send: 1
//send: 2
//send: 3
//send: 4
//receive:1
//onUndeliveredElement: 2
//onUndeliveredElement: 3

上面的代码容量设置为2,缓冲策略是删除正在添加到缓冲区的最新值,接收一个数据后立即取消接收其他数据,也就是说接收到了【send: 1】的数据【receive:1】,【send: 4】的数据被缓冲策略删除了,由于接收消息的同道已经被取消了那么【send: 2】和【send: 3】的数据就只能在异常中被处理,从输出结果就可以看到。

从上面的代码示例可以总结出它的应用场景:接收方很关心数据是否被消费,例如企业微信、钉钉的消息是否已读的状态,对于异常处理那块的场景就像是发送消息过程中消息没有被发送出去,那么接收方就无法接受到这个消息。

2.Channel使用中的细节

前面在使用Channel时为了让程序终止在发送完成后调用了channel.close(),但是这个很容易被忘记,忘记添加就会造成程序无法终止的问题,那么Produce就诞生了,它是一个高阶函数。

fun produceTest() = runBlocking {
 val channel: ReceiveChannel<Int> = produce {
 for (i in 1..4) {
 send(i)
 }
 }
 launch {
 for (i in channel) {
 println("receive: $i")
 }
 }
 println("end")
}
//输出结果:
//end
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//Process finished with exit code 0

可以看到没有加入close代码就可以正常结束,上面发送了4条数据,那么我要是接收5条数据会不会有什么问题?

fun produceTest() = runBlocking {
 val channel: ReceiveChannel<Int> = produce {
 for (i in 1..4) {
 send(i)
 }
 }
 println("receive: ${channel.receive()}")
 println("receive: ${channel.receive()}")
 println("receive: ${channel.receive()}")
 println("receive: ${channel.receive()}")
 println("receive: ${channel.receive()}")
 println("end")
}
//输出结果:
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//ClosedReceiveChannelException: Channel was closed

可以看到当我接收第5条数据的时候报出channel被关闭的提示,也就是说produce确实会在消息发送完毕后关闭通道。

业务开发中有可能我们确实需要对channel发送的消息进行单独处理,那么也许并不知道具体发送了几条数据,如果接收数据数量超过发送数据数量就会出现错误,那有没有像isClose这类的方法可以在接收前判断是否被关闭呢?有的,在Channel中还有两个变量:

//如果该通道已通过调用[close]关闭,则返回' true '。这意味着调用[send]将导致异常。
public val isClosedForSend: Boolean
//如果通过在SendChannel端调用close关闭了此通道,
//并且已经接收到以前发送的所有项目,则返回true。
public val isClosedForReceive: Boolean

那么安全的调用channel.receive()接收就可以这么写

fun produceTest() = runBlocking {
 val channel: ReceiveChannel<Int> = produce(capacity = 3) {
 (1..4).forEach {
 send(it)
 println("Send $it")
 }
 }
 while (!channel.isClosedForReceive) {
 println("receive: ${channel.receive()}")
 }
 println("end")
}
//输出结果:
//Send 1
//Send 2
//Send 3
//Send 4
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//end

但是这里会有一个问题,不定义capacity的数量

fun produceTest() = runBlocking {
 //	变化在这里
 //	↓
 val channel: ReceiveChannel<Int> = produce {
 (1..4).forEach {
 send(it)
 println("Send $it")
 }
 }
 while (!channel.isClosedForReceive) {
 println("receive: ${channel.receive()}")
 }
 println("end")
}
//输出结果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//
//ClosedReceiveChannelException: Channel was closed

可以看到send发送的数据全部都被接收了,但是还是报出channel被关闭的错误,原因在注释中已经写明:如果通过在SendChannel端调用close关闭了此通道,并且已经接收到以前发送的所有项目,则返回true。

这意味着调用receive将导致closereceivechannelexception。 所以channel.receive()要慎用。可以用channel.consumeEach代替

fun produceTest() = runBlocking {
 val channel: ReceiveChannel<Int> = produce {
 (1..4).forEach {
 send(it)
 println("Send $it")
 }
 }
 //变化在这里 
 channel.consumeEach {
 println("receive: $it")
 }
 println("end")
}
//输出结果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//end

3.Channe的特点

Channel主要你用来传递数据流的,这个数据流指的是多个数据组合形成别的流,与它形成鲜明对比的是async、挂起函数。

数据流的传输,有发送就有接收,而Channel是完全符合这一点的。发送与接收存在两种情况:

  • 数据流的发送了但是还没有被接收,没有接收则不再进行发送消息,例如文件的传输;
  • 数据流的发送了不管有没有被接收,都要继续发送消息,例如微信聊天。

Channel符合第二个结论,无论发送的数据是否被消费或者说被接收,Channel都会进行工作。我们来证明一下这个结论。

/**
 * 消息容量为10,发送4条数据
 * 无论消息是否被接收都会吧消息发送完毕
 */
fun produceTest() = runBlocking {
 val channel: ReceiveChannel<Int> = produce(capacity = 10) {
 (1..4).forEach {
 send(it)
 println("Send $it")
 }
 }
 println("end")
}
//输出结果:
//end
//Send 1
//Send 2
//Send 3
//Send 4
/**
 * 消息容量改为默认,默认值时0,发送4条数据
 * Channel依旧是在工作的,只是说在调用send方法的时候
 * 接收方还没有准备完毕且容量为0,所以会被挂起,程序一直无法退出
 */
fun produceTest() = runBlocking {
 val channel: ReceiveChannel<Int> = produce {
 (1..4).forEach {
 send(it)
 println("Send $it")
 }
 }
 println("end")
}
//输出结果:
//end
//程序没有结束

通过上面的代码引出一个结论:Channel是“热” 的。不管接收方是否存在,Channel是一定会工作的。类似于自来水厂向像居民提供水源,发电厂向居民提供电能。

作者:无糖可乐爱好者

%s 个评论

要回复文章请先登录注册