Combine中错误处理和Scheduler使用详解

错误处理

到目前为止,在我们编写的大部分代码中,我们没有处理错误,而处理的都是“happy path”。在前面的文章中,我们了解到,Combine Publisher 声明了两个约束:

  • Output定义 Publisher 发出的值的类型;
  • Failure 定义 Publisher 发出的失败的类型。

现在,我们将深入了解 Failure 在 Publisher 中的作用。

Never

失败类型为 Never 的 Publisher 表示永远不会发出失败。它为这些 Publisher 提供了强大的保证。这类 Publisher 可让我们专注于使用值,同时绝对确保 Publisher 只有成功完成的事件。

在新的 Playground 页面添加以下代码:

import Combine
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
func example(_ desc: String, _ action:() -> Void) {
 print("--- (desc) ---")
 action()
}
var subscriptions = Set<AnyCancellable>()
example("Just") {
 Just("Hello")
}

我们创建了一个带有 Hello 字符串值的 Just。 Just 是不会发出失败的。 请按住 Command 并单击 Just 初始化程序并选择 Jump to Definition,查看定义:

In contrast with Result.Publisher, a Just publisher can’t fail with an error. And unlike Optional.Publisher, a Just publisher always produces a value.

Combine 对 Never 的障保证不仅是理论上的,而是深深植根于框架及其各种 API 中。Combine 提供了几个 Operator,这些 Operator 仅在保证 Publisher 永远不会发出失败事件时才可用。第一个是 sink 的变体,只处理值:

example("Just") {
 Just("Hello")
 .sink(receiveValue: { print($0) })
 .store(in: &subscriptions)
}

在上面的示例中,我们使用 sink(receiveValue:) ,这种特定的重载使我们可以忽略 Publisher 的完成事件,而只处理其发出的值。

此重载仅适用于这类“可靠”的 Publisher。在错误处理方面,Combine 是智能且安全的,如果可能抛出错误,它会强制我们处理完成事件。要看到这一点,我们需要将 Never 的 Publisher 变成可能发出失败事件的 Publisher。

setFailureType(to:)

func setFailureType<E>(to failureType: E.Type) -> Publishers.SetFailureType<Self, E> where E : Error

Never Publisher 转变为可能发出失败事件的 Publisher 的第一种方法是使用 setFailureType。这是另一个仅适用于失败类型为 Never 的 Publisher 的 Operator:

example("setFailureType") {
 &nbsp;Just("Hello")
 &nbsp; &nbsp;.setFailureType(to: MyError.self)
}

可以使用 .eraseToAnyPublisher(),来确认已改变的 Publisher 类型:

继续修改上述代码:

enum MyError: Error {
 case ohNo
}
example("setFailureType") {
 Just("Hello")
 .setFailureType(to: MyError.self)
 .sink(
 receiveCompletion: { completion in
 switch completion {
 case .failure(.ohNo):
 print("Finished with OhNo!")
 case .finished:
 print("Finished successfully!")
 }
 },
 receiveValue: { value in
 print("Got value: (value)")
 }
 )
 .store(in: &subscriptions)
}

现在我们只能使用 sink(receiveCompletion:receiveValue:)sink(receiveValue:) 重载不再可用,因为此 Publisher 可能会发出失败事件。可以尝试注释掉 receiveCompletion查看编译错误。

此外,失败类型为为 MyError,这使我们可以针对.failure(.ohNo) 情况而无需进行不必要的强制转换来处理该错误。

当然,setFailureType 的作用只是类型定义。 由于原始 Publisher 是 Just,因此实际上也不会引发任何错误。

assign(to:on:)

assign Operator 仅适用于不会发出失败事件的 Publisher,与 setFailureType 相同。 向提供的 keypath 发送错误会导致未定义的行为。添加以下示例进行测试:

example("assign(to:on:)") {
 class Person {
 var name = "Unknown"
 }
 let person = Person()
 print(person.name)
 Just("Layer")
 .handleEvents(
 receiveCompletion: { _ in 
 print(person.name) 
 }
 )
 .assign(to: .name, on: person)
 .store(in: &subscriptions)
}

我们定义一个具有 name 属性的 Person 类。创建一个 Person 实例并立即打印其 name。一旦 Publisher 发送完成事件,使用 handleEvents 再次打印此 name。最后,使用 assignname 设置为 Publisher 发出的值:

--- assign(to:on:) ---
Unknown
Layer

Just("Layer") 正下方添加以下行:

.setFailureType(to: Error.self)

这意味着它不再是 Publisher<String, Never>,而是现在的 Publisher<String, Error>。运行 Playground,我们将进行验证:

Referencing instance method 'assign(to:on:)' on 'Publisher' requires the types 'any Error' and 'Never' be equivalent

assign(to:)

assign(to:on:) 有一个棘手的部分——它会 strong 捕获提供给 on 参数的对象。在上一个示例之后添加以下代码:

example("assign(to:)") {
 class MyViewModel: ObservableObject {
 @Published var currentDate = Date()
 init() {
 Timer.publish(every: 1, on: .main, in: .common)
 .autoconnect() 
 .prefix(3)
 .assign(to: .currentDate, on: self)
 .store(in: &subscriptions)
 }
 }
 let vm = MyViewModel()
 vm.$currentDate
 .sink(receiveValue: { print($0) })
 .store(in: &subscriptions)
}

我们 MyViewModel 中定义一个 @Published 属性。 它的初始值为当前日期。在 init 中创建一个 Timer Publisher,它每秒发出当前日期。使用 prefix Operator 只接受 3 个更新。使用 assign(to:on:) 将每个日期更新给@Published 属性。实例化 MyViewModelsinkvm.$currentDate,并打印出每个值:

--- assign(to:) ---
2022-12-24 07:32:33 +0000
2022-12-24 07:32:34 +0000
2022-12-24 07:32:35 +0000
2022-12-24 07:32:36 +0000

看起来一切都很好。但是对assign(to:on:) 的调用创建了一个 strong 持有 self 的 Subscription。 导致 self 挂在Subscription 上,而 Subscription 挂在 self 上,创建了一个导致内存泄漏的引用循环。

因此引入了该 Operator 的另一个重载 assign(to:)。该 Operator 通过对 Publisher 的 inout 引用来将值分配给 @Published 属性。因此以下两行:

.assign(to: .currentDate, on: self)
.store(in: &subscriptions)

可以被替换为:

.assign(to: &$currentDate)

使用 assign(to:) Operator 将 inout 引用 Publisher 会打破引用循环。此外,它会在内部自动处理 Subscription 的内存管理,这样我们就可以省略 store(in: &subscriptions)

assertNoFailure(_:file:line:)

当我们在开发过程确认 Publisher 以失败事件完成时,assertNoFailure Operator 非常有用。它不会阻止上游发出失败事件。但是,如果它检测到错误,它会因错误而崩溃:

example("assertNoFailure") {
 Just("Hello")
 .setFailureType(to: MyError.self)
 .assertNoFailure()
 .sink(receiveValue: { print("Got value: ($0) ")}) 
 .store(in: &subscriptions)
}

我们使用 Just 创建一个“可靠”的 Publisher 并将其错误类型设置为 MyError。如果 Publisher 以错误事件完成,则使用 assertNoFailure 以崩溃。这会将 Publisher 的失败类型转回 Never。使用 sink 打印出任何接收到的值。请注意,由于 assertNoFailure 将失败类型设置回 Never,因此 sink(receiveValue:) 重载可以直接使用。

运行 Playground,它可以正常工作:

--- assertNoFailure ---
Got value: Hello

setFailureType 之后,添加以下行:

.tryMap { _ in throw MyError.ohNo }

一旦 Hello 被推送到下游,使用 tryMap 抛出错误。再次运行 Playground:

Playground execution failed:
error: Execution was interrupted, reason: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0).
...
frame #0: 0x00007fff232fbbf2 Combine`Combine.Publishers.AssertNoFailure...

由于 Publisher 发出失败事件,playground 会 crash。 在某种程度上,我们可以将 assertNoFailure() 视为代码的保护机制。 虽然我们不应该在生产环境中使用它,但在开发过程中提前发现问题非常有用。

处理错误

try* Operator

Combine 提供了一个区分可能引发错误和可能不会引发错误的 Operator 的方法:try 前缀。

注意:Combine 中所有以 try 为前缀的 Operator 在遇到错误时的行为相同。我们将只在本章中尝试使用 tryMap Operator。

example("tryMap") {
 enum NameError: Error {
 case tooShort(String)
 case unknown
 }
 
 ["Aaaa", "Bbbbb", "Cccccc"]
 .publisher
 .map { value in
 return value.count
 }
 .sink(
 receiveCompletion: { print("Completed with ($0)") },
 receiveValue: { print("Got value: ($0)") }
 )
}

在上面的示例中,我们定义一个 NameError 错误枚举。创建发布三个字符串的 Publisher。将每个字符串映射到它的长度。运行示例并查看控制台输出:

--- tryMap ---
Got value: 4
Got value: 5
Got value: 6
Completed with finished

将上面示例中的 map 替换为以下内容:

.tryMap { value -> Int in
 let length = value.count
 guard length >= 5 else {
 throw NameError.tooShort(value)
 }
 return value.count
}

我们检查字符串的长度是否大于等于 5。否则,我们会抛出错误:

--- tryMap ---
Completed with failure(Page_Contents.(unknown context at $10e3cb984).(unknown context at $10e3cba6c).(unknown context at $10e3cbaa8).NameError.tooShort("Aaaa"))

映射错误

maptryMap 之间的区别不仅仅是后者允许抛出错误。 map 继承了现有的失败类型并且只操作 Publisher 的值,但 tryMap 没有——它实际上将错误类型擦除为普通的 Swift 错误。 与带有 try 前缀的所有 Operator 都是如此。

example("map vs tryMap") {
 enum NameError: Error {
 case tooShort(String)
 case unknown
 }
 Just("Hello")
 .setFailureType(to: NameError.self)
 .map { $0 + " World!" }
 .sink(
 receiveCompletion: { completion in
 switch completion {
 case .finished:
 print("Done!")
 case .failure(.tooShort(let name)):
 print("(name) is too short!")
 case .failure(.unknown):
 print("An unknown name error occurred")
 }
 },
 receiveValue: { print("Got value ($0)") }
 )
 .store(in: &subscriptions)
}

我们定义一个用于此示例的 NameError。创建一个只发出字符串 HelloJust。使用 setFailureType 设置失败类型为 NameError。使用 map 将另一个字符串附加。最后,使用 sinkreceiveCompletionNameError 的每个情况打印出适当的消息。运行 Playground:

--- map vs tryMap ---
Got value Hello World!
Done!

Completion 的失败类型是 NameError,这正是我们想要的。 setFailureType 允许我们专门针对 NameError 进行处理,例如 failure(.tooShort(let name))

map 更改为 tryMap

.tryMap { throw NameError.tooShort($0) }

我们会立即注意到 Playground 不再编译。 再次点击 completion

tryMap 删除了我们的类型错误并将其替换为通用 Swift.Error 类型。即使我们实际上并没有从 tryMap 中抛出错误,也会发生这种情况。

原因很简单:Swift 还不支持类型化 throws,尽管自 2015 年以来 Swift Evolution 中一直在讨论这个主题。这意味着当我们使用带有 try 前缀的 Operator 时,我们的错误类型将总是被抹去到最常见的父类:Swift.Error

一种方法是将通用错误手动转换为特定的错误类型,但这不是最理想的。它打破了严格类型错误的整个目的。幸运的是,Combine 为这个问题提供了一个很好的解决方案,称为 mapError

在调用 tryMap 之后,添加以下行:

.mapError { $0 as? NameError ?? .unknown }

mapError 接收上游 Publisher 抛出的任何错误,并将其映射到我们想要的任何错误。在这种情况下,我们可以利用它将错误转换回 NameError。这会将 Failure 恢复为所需要的类型,并将我们的 Publisher 转回 Publisher<String, NameError>。构建并运行 Playground,最终可以按预期编译和工作:

--- map vs tryMap ---
Hello is too short!

捕获错误并重试

很多时候,当我们请求资源或执行某些计算时,失败可能是由于网络不稳定或其他资源不可用而导致的一次性 事件。

在这些情况下,我们通常会编写一个机制来重试不同的工作,跟踪尝试次数,并处理如果所有尝试都失败的情况。Combine 让这一切变得非常简单。

retry Operator 接受一个数字。如果 Publisher 失败,它将重新订阅上游并重试至我们指定的次数。如果所有重试都失败,它将错误推送到下游,就像没有 retry Operator 一样:

example("Catching and retrying") {
 enum MyError: Error {
 case network
 }
 var service1 = PassthroughSubject<Int, MyError>() service1.send(completion: .failure(.network))
 
 service1
 .handleEvents(
 receiveSubscription: { _ in print("Trying ...") },
 receiveCompletion: {
 guard case .failure(let error) = $0 else { return }
 print("Got error: (error)")
 }
 )
 .retry(3)
 .sink(
 receiveCompletion: { print("($0)") },
 receiveValue: { number in
 print("Got Number: (number)")
 }
 )
 .store(in: &subscriptions)
}

我们有一个 service1,它发出了失败事件。因此,订阅 service1 肯定会获得失败事件。我们尝试三次,并通过 handleEvents 打印订阅和完成:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
failure(Page_Contents.(unknown context at $10fc7b584).(unknown context at $10fc7b77c).(unknown context at $10fc7b7b8).MyError.network)

运行 Playerground,我们会看到有四次 Trying。初始 Trying,加上由 retry Operator 触发的三次重试。 由于 service1 不断失败,因此 Operator 会耗尽所有重试尝试并将错误推送到 sink

调整代码:

example("Catching and retrying") {
 enum MyError: Error {
 case network
 }
 var service1 = PassthroughSubject<Int, MyError>()
 service1.send(completion: .failure(.network))
 
 service1
 .handleEvents(
 receiveSubscription: { _ in print("Trying ...") },
 receiveCompletion: {
 guard case .failure(let error) = $0 else { return }
 print("Got error: (error)")
 }
 )
 .retry(3)
 .replaceError(with: 1)
 .sink(
 receiveCompletion: { print("($0)") },
 receiveValue: { number in
 print("Got Number: (number)")
 }
 )
 .store(in: &subscriptions)
}

service1 重试后,若还是失败,我们将通过 replaceError 将失败替换为 1:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Got Number: 1
finished

或者,我们可以使用 catch 捕获 service1 的失败,并为下游提供另一个 Publisher:

example("Catching and retrying") {
 enum MyError: Error {
 case network
 }
 var service1 = PassthroughSubject<Int, MyError>()
 service1.send(completion: .failure(.network))
 var service2 = PassthroughSubject<Int, MyError>()
 
 service1
 .handleEvents(
 receiveSubscription: { _ in print("Trying ...") },
 receiveCompletion: {
 guard case .failure(let error) = $0 else { return }
 print("Got error: (error)")
 }
 )
 .retry(3)
 .catch { error in
 return service2
 }
 .sink(
 receiveCompletion: { print("($0)") },
 receiveValue: { number in
 print("Got Number: (number)")
 }
 )
 .store(in: &subscriptions)
 
 service2.send(2)
 service2.send(completion: .finished)
}

此时,下游将获得到 service2 发出的值 2 和完成事件:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Got Number: 2
finished

cheduler

我们已经遇到了一些将 Scheduler 作为参数的 Operator。大多数情况下,我们会简单地使用 DispatchQueue.main,因为它方便、易于理解。除了 DispatchQueue.main,我们肯定已经使用了全局并发队列,或创建一个串行调度队列来运行操作。

但是为什么 Combine 需要一个新的类似概念呢?我们接着将了解为什么会出现 Scheduler 的概念,将探索 Combine 如何使异步事件和操作更易于使用,当然,我们还会试使用 Combine 提供的所有 Scheduler。

Scheduler 简介

根据 Apple 的文档,Scheduler 是一种定义何时及如何执行闭包的协议。Scheduler 提供上下文以尽快或在将来的某个事件执行未来的操作。该操作就是协议本身中定义的闭包。闭包也可以隐藏 Publisher 在特定 Scheduler 上执行的某些值的传递。

我们会注意到此定义有意避免对线程的任何引用,这是因为具体的实现是在 Scheduler 协议中,提供的“上下文”中的。因此,我们的代码将在哪个线程上执行取决于选择的 Scheduler。

记住这个重要的概念:Scheduler 不等于线程。我们将在后面详细了解这对每个 Scheduler 意味着什么。让我们从事件流的角度来看 Scheduler 的概念:

我们在上图中看到的内容:

  • 在主 (UI) 线程上发生用户操作,如按钮按下;
  • 它会触发一些工作在 Background Scheduler 上进行处理;
  • 要显示的最终数据在主线程上传递给 Subscriber,Subscriber 可以更新 UI。

我们可以看到 Scheduler 的概念深深植根于前台/后台执行的概念。此外,根据我们选择的实现,工作可以串行化或并行化。

因此,要全面了解 Scheduler,需要查看哪些类符合 Scheduler 协议。首先,我们需要了解与 Scheduler 相关的两个重要 Operator。

Scheduler Operator

Combine 提供了两个基本的 Operator 来使用 Scheduler:

subscribe(on:)subscribe(on:options:) 在指定的 Scheduler 上创建 Subscription(开始工作);

receive(on:)receive(on:options:) 在指定的 Scheduler 上传递值。

此外,以下 Operator 将 Scheduler 和 Scheduler options 作为参数:

debounce(for:scheduler:options:)

delay(for:tolerance:scheduler:options:)

measureInterval(using:options:)

throttle(for:scheduler:latest:)

timeout(_:scheduler:options:customError:)

subscribe(on:) 和 receive(on:)

在我们订阅它之前,Publisher 是一个无生命的实体。但是当我们订阅 Publisher 时会发生什么?有几个步骤:

  • Publiser receive Subscriber 并创建 Subscription;
  • Subscriber receive Subscription 并从 Publiser 请求值(虚线);
  • Publiser 开始工作(通过 Subscription);
  • Publiser 发出值(通过 Subscription);
  • Operator 转换值;
  • Subscriber 收到最终值。

当代码订阅 Publiser 时,步骤一、二和三通常发生在当前线程上。 但是当我们使用 subscribe(on:) Operator 时,所有这些操作都在我们指定的 Scheduler 上运行。

我们可能希望 Publiser 在后台执行一些昂贵的计算以避免阻塞主线程。 执行此操作的简单方法是使用 subscribe(on:)。以下是伪代码:

let queue = DispatchQueue(label: "serial queue")
let subscription = publisher
 .subscribe(on: queue)
 .sink { value in ...

如果我们收到值后,想更新一些 UI 怎么办?我们可以在闭包中执行类似 DispatchQueue.main.async { ... } 的操作,从主线程执行 UI 更新。有一种更有效的方法可以使用 Combine 的 receive(on:):

let subscription = publisher
 .subscribe(on: queue)
 .receive(on: DispatchQueue.main)
 .sink { value in ...

即使计算工作正常并从后台线程发出结果,我们现在也可以保证始终在主队列上接收值。这是安全地执行 UI 更新所需要的。

Scheduler 实现

Apple 提供了几种 Scheduler 协议的具体实现:

  • ImmediateScheduler:一个简单的 Scheduler,它立即在当前线程上执行代码,这是默认的执行上下文,除非使用 subscribe(on:)receive(on:) 或任何其他将 Scheduler 作为参数的 Operator 进行修改。
  • RunLoop:绑定到 Foundation 的 Thread 对象。
  • DispatchQueue:可以是串行的或并发的。
  • OperationQueue:规范工作项执行的队列。

这里省略了 TestScheduler,是一个虚拟的、模拟的 Scheduler,它是任何响应式编程框架测试时不可或缺的一部分。

ImmediateScheduler

在 Playground 中新增代码:

example("ImmediateScheduler") { 
 let source = Timer
 .publish(every: 1.0, on: .main, in: .common)
 .autoconnect()
 .scan(0) { counter, _ in counter + 1 }
 let publisher = source
 .receive(on: ImmediateScheduler.shared)
 .eraseToAnyPublisher()
 publisher.sink(receiveValue: { _ in
 print(Thread.current)
 })
 .store(in: &amp;subscriptions)
}

运行 Playground,我们会看到 Publisher 发出的每个值,都是在 MainThread 上:

--- ImmediateScheduler ---
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}

当前线程是主线程, ImmediateScheduler 立即在当前线程上调度。当我们在 .receive(on: ImmediateScheduler.shared) 前添加一行:

.receive(on: DispatchQueue.global())

执行 Playground,我们将在不同的线程收到值:

--- ImmediateScheduler ---
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
<NSThread: 0x11f005310>{number = 2, name = (null)}
<NSThread: 0x11f005310>{number = 2, name = (null)}
<NSThread: 0x12e7286c0>{number = 4, name = (null)}

ImmediateScheduler options 由于大多数 Operator 在其参数中接受 Scheduler,我们还可以找到一个接受 SchedulerOptions 值的参数。在 ImmediateScheduler 的情况下,此类型被定义为 Never,因此在使用 ImmediateScheduler 时,我们永远不应该为 Operator 的 options 参数传递值。

ImmediateScheduler 的陷阱 关于 ImmediateScheduler 的一件事是它是即时的。我们无法使用 Scheduler 协议的任何 schedule(after:) 变体,因为我们需要指定的 SchedulerTimeType 没有初始化方法,对于 ImmediateScheduler 无意义。

RunLoop scheduler

RunLoop 早于 DispatchQueue,它是一种在线程级别管理输入源的方法。主线程有一个关联的 RunLoop,我们还可以通过从当前线程调用 RunLoop.current 为任何线程获取一个 RunLoop。

在 Playground 中添加此代码:

example("RunLoop") { 
 let source = Timer
 .publish(every: 1.0, on: .main, in: .common)
 .autoconnect()
 .scan(0) { counter, _ in counter + 1 }
 let publisher = source
 .receive(on: DispatchQueue.global())
 .handleEvents(receiveOutput: { _ in
 print("DispatchQueue.global: \(Thread.current)")
 })
 .receive(on: RunLoop.current)
 .handleEvents(receiveOutput: { _ in
 print("RunLoop.current: \(Thread.current)")
 })
 .eraseToAnyPublisher()
 publisher.sink(receiveValue: { _ in
 })
 .store(in: &amp;subscriptions)
}

当前 RunLoop.current 就是主线程的 RunLoop。执行 Playground:

--- RunLoop ---
DispatchQueue.global: &lt;NSThread: 0x12a71cd20&gt;{number = 3, name = (null)}
RunLoop.current: &lt;_NSMainThread: 0x12a705760&gt;{number = 1, name = main}
DispatchQueue.global: &lt;NSThread: 0x12a71cd20&gt;{number = 3, name = (null)}
RunLoop.current: &lt;_NSMainThread: 0x12a705760&gt;{number = 1, name = main}
DispatchQueue.global: &lt;NSThread: 0x12a71cd20&gt;{number = 3, name = (null)}
RunLoop.current: &lt;_NSMainThread: 0x12a705760&gt;{number = 1, name = main}

每发出一个值,都通过一个全局并发队列的线程,然后在主线程上继续。

RunLoop OptionsImmediateScheduler 一样,RunLoop 不提供 SchedulerOptions 参数。

RunLoop 陷阱 RunLoop 的使用应仅限于主线程的 RunLoop,以及我们在需要时控制的 Foundation 线程中可用的 RunLoop。要避免的一个是在 DispatchQueue 上执行的代码中使用 RunLoop.current。这是因为 DispatchQueue 线程可能是短暂的,这使得它们几乎不可能依赖 RunLoop。

DispatchQueue Scheduler

DispatchQueue 符合 Scheduler 协议,并且完全可用于所有将 Scheduler 作为参数的 Operator。Dispatch 框架是 Foundation 的一个强大组件,它允许我们通过向系统管理的调度队列提交工作来在多核硬件上同时执行代码。DispatchQueue 可以是串行的(默认)或并发的。串行队列按顺序执行你提供给它的所有工作项。并发队列将并行启动多个工作项,以最大限度地提高 CPU 使用率:

  • 串行队列通常用于保证某些操作不重叠。因此,如果所有操作都发生在同一个队列中,他们可以使用共享资源而无需加锁。
  • 并发队列将同时执行尽可能多的操作。因此,它更适合纯计算。

我们一直使用的最熟悉的队列是 DispatchQueue.main。它直接映射到主线程,在这个队列上执行的所有操作都可以自由地更新用户界面。 当然,UI 更新只能在主线程进行。所有其他队列,无论是串行的还是并发的,都在系统管理的线程池中执行它们的代码。这意味着我们永远不应该对队列中运行的代码中的当前线程做出任何假设。尤其不应使用 RunLoop.current 来安排工作,因为 DispatchQueue 管理其线程的方式有不同。

所有调度队列共享同一个线程池,执行的串行队列将使用该池中的任何可用线程。一个直接的结果是,来自同一队列的两个连续工作项可能使用不同的线程,但仍可以按顺序执行。这是一个重要的区别:当使用 subscribe(on:)receive(on:) 或任何其他有 Scheduler 参数的 Operator 时,我们永远不应假设线程每次都是相同的。

在 Playground 中添加代码:

example("DispatchQueue") { 
 let source = PassthroughSubject<Void, Never>()
 let sourceQueue = DispatchQueue.main
 let subscription = sourceQueue.schedule(after: sourceQueue.now,
 interval: .seconds(1)) {
 source.send()
 }
 .store(in: &subscriptions) let serialQueue = DispatchQueue(label: "Serial queue")
 source
 .handleEvents(receiveOutput: { _ in
 print("\(Thread.current)")
 })
 .receive(on: serialQueue)
 .handleEvents(receiveOutput: { _ in
 print("\(Thread.current)")
 })
 .sink(receiveValue: { _ in
 })
 .store(in: &subscriptions)
}

Timer 在主队列 sourceQueue 上触发并通过 source 发送 Void 值。接着在串行队列 serialQueue 上接收值:

--- DispatchQueue ---
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x128025cd0>{number = 2, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x117904d90>{number = 5, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}

将 sourceQueue 也改为 DispatchQueue(label: "Serial queue"),也将在全局并发队列上发出值:

--- DispatchQueue ---
<NSThread: 0x137e275b0>{number = 6, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x127e0f400>{number = 4, name = (null)}
<NSThread: 0x137e275b0>{number = 6, name = (null)}

DispatchQueue Options DispatchQueue 是唯一提供一组 Options 的 Scheduler,当 Operator 需要 SchedulerOptions 参数时,我们可以传递这些 Options。主要围绕 QoS(服务质量)值,独立于 DispatchQueue 上已设置的值。例如:

.receive(
 on: serialQueue,
 options: DispatchQueue.SchedulerOptions(qos: .userInteractive)
)

我们将 DispatchQueue.SchedulerOptions 的实例传递.userInteractive。在实际开发中使用这些 Options 有助于操作系统决定在同时有许多队列忙碌的情况下首先安排哪个任务。

OperationQueue Scheduler

由于 OperationQueue 在内部使用 Dispatch,因此在表面上几乎没有区别:

example("OperationQueue") { 
 let queue = OperationQueue()
 let subscription = (1...10).publisher
 .receive(on: queue)
 .print()
 .sink { value in
 print("Received \(value)")
 }
 .store(in: &amp;subscriptions)
}

创建一个简单的 Publisher 发出 1 到 10 之间的数字,然后打印该值,执行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (1)
Received 1
receive value: (8)
Received 8
receive value: (9)
Received 9
receive value: (6)
Received 6
receive value: (3)
Received 3
receive value: (5)
Received 5
receive finished
receive value: (10)
receive value: (4)
receive value: (7)
receive value: (2)

按顺序发出但无序到达!我们可以更改打印行以显示当前线程:

print("Received \(value) on thread \(Thread.current)")

再次执行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (4)
Received 4 on thread <NSThread: 0x14d720980>{number = 2, name = (null)}
receive value: (10)
Received 10 on thread <NSThread: 0x14d720980>{number = 2, name = (null)}
receive value: (3)
Received 3 on thread <NSThread: 0x14e833620>{number = 6, name = (null)}
receive value: (5)
Received 5 on thread <NSThread: 0x14e80dfd0>{number = 4, name = (null)}
receive value: (1)
Received 1 on thread <NSThread: 0x14d70d840>{number = 5, name = (null)}
receive finished
receive value: (2)
receive value: (9)
receive value: (8)
receive value: (6)

每个值都是在不同的线程上接收的!如果我们查看有关 OperationQueue 的文档,有一条关于线程的说明,OperationQueue 使用 Dispatch 框架(因此是 DispatchQueue)来执行操作。这意味着它不保证它会为每个交付的值使用相同的底层线程。

此外,每个 OperationQueue 中都有一个参数可以解释一切:它是 maxConcurrentOperationCount。它默认为系统定义的数字,允许操作队列同时执行大量操作。由于 Publisher 几乎在同一时间发出所有值,它们被 Dispatch 的并发队列分派到多个线程。

对代码进行一些修改:

queue.maxConcurrentOperationCount = 1

再次执行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (1)
Received 1 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (2)
Received 2 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (3)
Received 3 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (4)
Received 4 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (5)
Received 5 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (6)
Received 6 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (7)
Received 7 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (8)
Received 8 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (9)
Received 9 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (10)
Received 10 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive finished

这一次,我们将获得真正的顺序执行——将 maxConcurrentOperationCount 设置为 1 相当于使用串行队列。

OperationQueue Options OperationQueue 没有可用的 SchedulerOptions。它实际上是 RunLoop.SchedulerOptions 类型,本身没有提供任何 Options。

OperationQueue 陷阱 我们刚刚看到 OperationQueue 默认并发执行操作,我们需要非常清楚这一点,因为它可能会给我们带来麻烦。当我们的 Publisher 发出值时都有大量工作要执行时,它可能是一个很好的工具。我们可以通过调整 maxConcurrentOperationCount 参数来控制负载。

内容参考

作者:Layer

%s 个评论

要回复文章请先登录注册