Swift 中的高级并发与 HoneyBee

已发表: 2022-03-11

在 Swift 中设计、测试和维护并发算法很困难,正确处理细节对于应用程序的成功至关重要。 并发算法(也称为并行编程)是一种旨在同时执行多个(可能很多)操作以利用更多硬件资源并减少整体执行时间的算法。

在 Apple 的平台上,编写并发算法的传统方式是 NSOperation。 NSOperation 的设计邀请程序员将并发算法细分为单独的长时间运行的异步任务。 每个任务将在其自己的 NSOperation 子类中定义,并且这些类的实例将通过目标 API 组合以在运行时创建任务的部分顺序。 七年来,这种设计并发算法的方法在 Apple 平台上一直是最先进的。

2014 年,Apple 推出了 Grand Central Dispatch (GCD),作为并发操作表达的一大进步。 GCD 以及伴随并支持它的新语言功能块,提供了一种在启动异步请求后立即紧凑地描述异步响应处理程序的方法。 不再鼓励程序员在多个 NSOperation 子类中的多个文件中传播并发任务的定义。 现在,可以在单个方法中编写整个并发算法。 这种表现力和类型安全性的提高是一个重大的概念转变。 这种书写方式的典型算法可能如下所示:

 func processImageData(completion: (result: Image?, error: Error?) -> Void) { loadWebResource("dataprofile.txt") { (dataResource, error) in guard let dataResource = dataResource else { completion(nil, error) return } loadWebResource("imagedata.dat") { (imageResource, error) in guard let imageResource = imageResource else { completion(nil, error) return } decodeImage(dataResource, imageResource) { (imageTmp, error) in guard let imageTmp = imageTmp else { completion(nil, error) return } dewarpAndCleanupImage(imageTmp) { imageResult in guard let imageResult = imageResult else { completion(nil, error) return } completion(imageResult, nil) } } } } }

让我们稍微分解一下这个算法。 函数processImageData是一个异步函数,它自己进行四次异步调用来完成它的工作。 四个异步调用以对基于块的异步处理最自然的方式嵌套在另一个内部。 每个结果块都有一个可选的错误参数,除了一个之外,所有的都包含一个额外的可选参数,表示 aysnc 操作的结果。

大多数 Swift 开发人员可能对上述代码块的形状很熟悉。 但是这种方法有什么问题呢? 以下痛点列表可能同样熟悉。

  • 这种嵌套代码块的“末日金字塔”形状很快就会变得笨拙。 如果我们再添加两个异步操作会发生什么? 四? 条件操作呢? 重试行为或资源限制保护如何? 现实世界的代码永远不会像博客文章中的示例那样干净和简单。 “末日金字塔”效应很容易导致代码难以阅读、难以维护并且容易出现错误。
  • 上述示例中的错误处理尝试虽然 Swifty,但实际上是不完整的。 程序员假定两参数、Objective-C 风格的异步回调块将始终提供两个参数之一; 他们永远不会同时为零。 这不是一个安全的假设。 并发算法以难以编写和调试而闻名,而毫无根据的假设是部分原因。 对于任何打算在现实世界中运行的并发算法来说,完整和正确的错误处理是不可避免的。
  • 再进一步考虑,可能编写被调用异步函数的程序员没有你那么有原则。 如果存在被调用函数回调失败的情况怎么办? 还是不止一次回电? 在这些情况下,processImageData 的正确性会发生什么变化? 专业人士不会冒险。 即使依赖第三方编写的功能,关键任务功能也需要正确。
  • 也许最引人注目的是,所考虑的异步算法是次优构造的。 前两个异步操作都是远程资源的下载。 即使它们没有相互依赖关系,上述算法也会按顺序而不是并行执行下载。 原因很明显; 嵌套块语法鼓励这种浪费。 竞争市场不会容忍不必要的滞后。 如果您的应用程序没有尽快执行其异步操作,那么另一个应用程序将会执行。

我们怎样才能做得更好? HoneyBee 是一个 futures/promises 库,它使 Swift 并发编程变得简单、富有表现力和安全。 让我们用 HoneyBee 重写上面的异步算法并检查结果:

 func processImageData(completion: (result: Image?, error: Error?) -> Void) { HoneyBee.start() .setErrorHandler { completion(nil, $0) } .branch { stem in stem.chain(loadWebResource =<< "dataprofile.txt") + stem.chain(loadWebResource =<< "imagedata.dat") } .chain(decodeImage) .chain(dewarpAndCleanupImage) .chain { completion($0, nil) } }

这个实现开始的第一行是一个新的 HoneyBee 配方。 第二行建立默认的错误处理程序。 错误处理在 HoneyBee 配方中不是可选的。 如果出现问题,算法必须处理它。 第三行打开一个允许并行执行的分支。 loadWebResource的两条链将并行执行,它们的结果将合并(第 5 行)。 两个加载资源的组合值被转发到decodeImage等,直到调用完成。

让我们来看看上面列出的痛点,看看 HoneyBee 是如何改进这段代码的。 维护此功能现在变得更加容易。 HoneyBee recipe 看起来像它所表达的算法。 代码可读、可理解且可快速修改。 HoneyBee 的设计确保指令的任何错误排序都会导致编译时错误,而不是运行时错误。 该功能现在不太容易受到错误和人为错误的影响。

所有可能的运行时错误都已完全处理。 HoneyBee 支持的每个函数签名(其中有 38 个)都保证得到完全处理。 在我们的示例中,Objective-C 风格的双参数回调将产生一个非零错误,该错误将被路由到错误处理程序,或者它会产生一个非零值,该值将沿着链前进,或者如果两者都值为 nil HoneyBee 将生成一个错误,说明函数回调未履行其合同。

HoneyBee 还处理调用函数回调次数的合同正确性。 如果一个函数调用它的回调失败,HoneyBee 会产生一个描述性的失败。 如果函数多次调用其回调,HoneyBee 将抑制辅助调用并记录警告。 这两种故障响应(和其他)都可以根据程序员的个人需求进行定制。

希望这种形式的processImageData可以正确地并行化资源下载以提供最佳性能应该已经很明显了。 HoneyBee 最强大的设计目标之一是配方应该看起来像它所表达的算法。

好多了。 正确的? 但是 HoneyBee 可以提供更多。

请注意:下一个案例研究不适合胆小的人。 考虑以下问题描述:您的移动应用程序使用CoreData来保持其状态。 您有一个名为 Media 的NSManagedObject模型,它表示上传到后端服务器的媒体资产。 允许用户一次选择几十个媒体项,并批量上传到后端系统。 媒体首先通过引用字符串表示,该字符串必须转换为媒体对象。 幸运的是,您的应用程序已经包含一个辅助方法,它可以做到这一点:

 func export(_ mediaRef: String, completion: @escaping (Media?, Error?) -> Void) { // transcoding stuff completion(Media(context: managedObjectContext), nil) }

将媒体引用转换为媒体对象后,您必须将媒体项上传到后端。 再次,你有一个辅助函数准备好做网络的东西。

 func upload(_ media: Media, completion: @escaping (Error?) -> Void) { // network stuff completion(nil) }

因为允许用户一次选择数十个媒体项目,所以用户体验设计师已经指定了相当多的关于上传进度的反馈。 需求已被提炼为以下四个功能:

 /// Called if anything goes wrong in the upload func errorHandler(_ error: Error) { // do the right thing } /// Called once per mediaRef, after either a successful or unsuccessful upload func singleUploadCompletion(_ mediaRef: String) { // update a progress indicator } /// Called once per successful upload func singleUploadSuccess(_ media: Media) { // do celebratory things } /// Called if the entire batch was considered to be uploaded successfully. func totalProcessSuccess() { // declare victory }

但是,由于您的应用程序来源的媒体引用有时会过期,因此业务经理决定如果至少有一半的上传成功,则向用户发送“成功”消息。 也就是说,如果少于一半的尝试上传失败,则并发进程应该宣布胜利并调用totalProcessSuccess 。 这是作为开发人员交给您的规范。 但作为一名经验丰富的程序员,您意识到必须应用更多的要求。

当然,Business 希望批量上传尽快进行,因此串行上传是不可能的。 上传必须并行执行。

但是不要太多。 如果你只是不加选择地async整个批次,那么数十个并发上传将淹没移动 NIC(网络接口卡),并且上传实际上会比串行进行得慢,而不是更快。

移动网络连接被认为不稳定。 即使是短交易也可能仅由于网络连接的变化而失败。 为了真正声明上传失败,我们需要至少重试一次上传。

重试策略不应包括导出操作,因为它不会受到暂时失败的影响。

导出过程是计算绑定的,因此必须在主线程之外执行。

因为导出是计算绑定的,所以它的并发实例数量应该比其余的上传过程少,以避免处理器崩溃。

上面描述的四个回调函数都会更新 UI,所以都必须在主线程上调用。

Media 是一个NSManagedObject ,它来自一个NSManagedObjectContext并且有自己的线程要求,必须遵守。

这个问题规范是不是有点晦涩难懂? 如果您发现未来潜伏着这样的问题,请不要感到惊讶。 我在自己的工作中遇到过这样的人。 我们先尝试用传统工具解决这个问题。 系好安全带,这不会很漂亮。

 /// An enum describing specific problems that the algorithm might encounter. enum UploadingError : Error { case invalidResponse case tooManyFailures } /// A semaphore to prevent flooding the NIC let outerLimit = DispatchSemaphore(value: 4) /// A semaphore to prevent thrashing the processor let exportLimit = DispatchSemaphore(value: 1) /// The number of times to retry the upload if it fails let uploadRetries = 1 /// Dispatch group to keep track of when the entire process is finished let fullProcessDispatchGroup = DispatchGroup() /// How many of the uploads fully completed. var uploadSuccesses = 0 // this notify block is called when the full process has completed. fullProcessDispatchGroup.notify(queue: DispatchQueue.main) { let successRate = Float(uploadSuccesses) / Float(mediaReferences.count) if successRate > 0.5 { totalProcessSuccess() } else { errorHandler(UploadingError.tooManyFailures) } } // start in the background DispatchQueue.global().async { for mediaRef in mediaReferences { // alert the group that we're starting a process fullProcessDispatchGroup.enter() // wait until it's safe to start uploading outerLimit.wait() /// common cleanup operations needed later func finalizeMediaRef() { singleUploadCompletion(mediaRef) fullProcessDispatchGroup.leave() outerLimit.signal() } // wait until it's safe to start exporting exportLimit.wait() export(mediaRef) { (media, error) in // allow another export to begin exportLimit.signal() if let error = error { DispatchQueue.main.async { errorHandler(error) finalizeMediaRef() } } else { guard let media = media else { DispatchQueue.main.async { errorHandler(UploadingError.invalidResponse) finalizeMediaRef() } return } // the export was successful var uploadAttempts = 0 /// define the upload process and its retry behavior func doUpload() { // respect Media's threading requirements managedObjectContext.perform { upload(media) { error in if let error = error { if uploadAttempts < uploadRetries { uploadAttempts += 1 doUpload() // retry } else { DispatchQueue.main.async { // too many upload failures errorHandler(error) finalizeMediaRef() } } } else { DispatchQueue.main.async { uploadSuccesses += 1 singleUploadSuccess(media) finalizeMediaRef() } } } } } // kick off the first upload doUpload() } } } }

哇! 没有注释,大约是 75 行。 您是否一直遵循推理? 如果你在新工作的第一周遇到这个怪物,你会有什么感觉? 您是否准备好维护它或修改它? 你知道它是否包含错误吗? 它是否包含错误?

现在,考虑 HoneyBee 替代方案:

 HoneyBee.start(on: DispatchQueue.main) .setErrorHandler(errorHandler) .insert(mediaReferences) .setBlockPerformer(DispatchQueue.global()) .each(limit: 4, acceptableFailure: .ratio(0.5)) { elem in elem.finally { link in link.setBlockPerformer(DispatchQueue.main) .chain(singleUploadCompletion) } .limit(1) { link in link.chain(export) } .setBlockPerformer(managedObjectContext) .retry(1) { link in link.chain(upload) // subject to transient failure } .setBlockPerformer(DispatchQueue.main) .chain(singleUploadSuccess) } .setBlockPerformer(DispatchQueue.main) .drop() .chain(totalProcessSuccess)

这种形式如何打动你? 让我们一点一点地解决它。 在第一行,我们从主线程开始启动 HoneyBee 配方。 通过从主线程开始,我们确保所有错误都将传递给主线程上的 errorHandler(第 2 行)。 第 3 mediaReferences数组插入到进程链中。 接下来,我们切换到全局后台队列,为一些并行性做准备。 在第 5 行,我们开始对每个mediaReferences进行并行迭代。 我们将这种并行性限制为最多 4 个并发操作。 我们还声明,如果至少有一半的子链成功(不要出错),则认为完整的迭代是成功的。 第 6 行声明了一个finally链接,无论下面的子链成功还是失败,都会调用该链接。 在finally链接上,我们切换到主线程(第 7 行)并调用singleUploadCompletion (第 8 行)。 在第 10 行,我们围绕导出操作(第 11 行)设置了最大并行度 1(单次执行)。 第 13 行切换到我们的managedObjectContext实例拥有的私有队列。 第 14 行声明了上传操作的一次重试尝试(第 15 行)。 第 17 行再次切换到主线程,第 18 行调用singleUploadSuccess 。 到执行第 20 行时,所有并行迭代都已完成。 如果少于一半的迭代失败,则第 20 行最后一次切换到主队列(回想一下每次都在后台队列上运行),第 21 行丢弃入站值(仍然是mediaReferences ),第 22 行调用totalProcessSuccess

HoneyBee 表单更清晰、更干净、更易于阅读,更不用说更易于维护。 如果需要循环将 Media 对象重新集成到像 map 函数这样的数组中,那么这个算法的长形式会发生什么? 在你做出改变之后,你对算法的所有要求仍然得到满足有多大信心? 在 HoneyBee 形式中,此更改将用 map 替换每个以使用并行 map 功能。 (是的,它也减少了。)

HoneyBee 是一个强大的 Swift 未来库,它使编写异步和并发算法更容易、更安全、更有表现力。 在本文中,我们了解了 HoneyBee 如何使您的算法更易于维护、更正确和更快。 HoneyBee 还支持其他关键异步范例,例如重试支持、多个错误处理程序、资源保护和集合处理(映射、过滤器和归约的异步形式)。 有关功能的完整列表,请参阅网站。 要了解更多信息或提出问题,请参阅全新的社区论坛。

附录:确保异步函数的合同正确性

确保功能的合同正确性是计算机科学的基本原则。 以至于几乎所有现代编译器都有检查以确保声明返回值的函数只返回一次。 返回少于或多于一次被视为错误并适当地阻止完整编译。

但是这种编译器帮助通常不适用于异步函数。 考虑以下(有趣的)示例:

 func generateIcecream(from int: Int, completion: (String) -> Void) { if int > 5 { if int < 20 { completion("Chocolate") } else if int < 10 { completion("Strawberry") } completion("Pistachio") } else if int < 2 { completion("Vanilla") } }

generateIcecream函数接受一个 Int 并异步返回一个 String。 swift 编译器很高兴地接受上述形式是正确的,即使它包含一些明显的问题。 给定某些输入,此函数可能会调用完成零、一或两次。 使用过异步函数的程序员经常会在自己的工作中回忆起这个问题的例子。 我们可以做什么? 当然,我们可以将代码重构得更整洁(带有范围情况的开关可以在这里工作)。 但有时功能复杂性很难降低。 如果编译器能像定期返回函数一样帮助我们验证正确性不是更好吗?

事实证明有一种方法。 请注意以下 Swifty 咒语:

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { completion("Chocolate") } else if int < 10 { completion("Strawberry") } // else completion("Pistachio") } else if int < 2 { completion("Vanilla") } }

在该函数顶部插入的四行强制编译器验证完成回调是否只调用了一次,这意味着该函数不再编译。 这是怎么回事? 在第一行,我们声明但不初始化我们最终希望这个函数产生的结果。 通过让它未定义,我们确保它必须被分配一次才能使用它,并且通过声明它让我们确保它永远不会被分配两次。 第二行是一个 defer,它将作为这个函数的最后一个动作执行。 它使用finalResult调用完成块 - 在它被函数的其余部分分配之后。 第 3 行创建了一个名为 completion 的新常量,它隐藏了回调参数。 新的完成是 Void 类型,它没有声明公共 API。 此行确保在此行之后使用完成将是编译器错误。 第 2 行的 defer 是完成块的唯一允许使用。 第 4 行删除了一个编译器警告,否则会出现关于未使用的新完成常量的警告。

所以我们已经成功地强制 swift 编译器报告这个异步函数没有履行它的契约。 让我们逐步完成步骤以使其正确。 首先,让我们将所有对回调的直接访问替换为对finalResult的赋值。

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { finalResult = "Chocolate" } else if int < 10 { finalResult = "Strawberry" } // else finalResult = "Pistachio" } else if int < 2 { finalResult = "Vanilla" } }

现在编译器报告了两个问题:

 error: AsyncCorrectness.playground:1:8: error: constant 'finalResult' used before being initialized defer { completion(finalResult) } ^ error: AsyncCorrectness.playground:11:3: error: immutable value 'finalResult' may only be initialized once finalResult = "Pistachio"

正如预期的那样,该函数有一个最终结果被分配零次的路径, finalResult一个被多次分配的路径。 我们解决这些问题如下:

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { finalResult = "Chocolate" } else if int < 10 { finalResult = "Strawberry" } else { finalResult = "Pistachio" } } else if int < 2 { finalResult = "Vanilla" } else { finalResult = "Neapolitan" } }

“开心果”已移至适当的 else 子句,我们意识到我们未能涵盖一般情况——当然是“那不勒斯”。

刚刚描述的模式可以很容易地调整为返回可选值、可选错误或复杂类型,如常见的 Result 枚举。 通过强制编译器验证回调只被调用一次,我们可以断言异步函数的正确性和完整性。