Swift 中的高級並發與 HoneyBee

已發表: 2022-03-11

在 Swift 中設計、測試和維護並發算法很困難,正確處理細節對於應用程序的成功至關重要。 並發算法(也稱為並行編程)是一種旨在同時執行多個(可能很多)操作以利用更多硬件資源並減少整體執行時間的算法。

在 Apple 的平台上,編寫並發算法的傳統方式是 NSOperation。 NSOperation 的設計邀請程序員將並發算法細分為單獨的長時間運行的異步任務。 每個任務將在其自己的 NSOperation 子類中定義,並且這些類的實例將通過目標 API 組合以在運行時創建任務的部分順序。 七年來,這種設計並發算法的方法在 Apple 平台上一直是最先進的。

2014 年,Apple 推出了 Grand Central Dispatch (GCD),作為並發操作表達的一大進步。 GCD 以及伴隨並支持它的新語言功能塊,提供了一種在啟動異步請求後立即緊湊地描述異步響應處理程序的方法。 不再鼓勵程序員在多個 NSOperation 子類中的多個文件中傳播並發任務的定義。 現在,可以在單個方法中編寫整個並發算法。 這種表現力和類型安全性的提高是一個重大的概念轉變。 這種書寫方式的典型算法可能如下所示:

 func processImageData(completion: (result: Image?, error: Error?) -> Void) { loadWebResource("dataprofile.txt") { (dataResource, error) in guard let dataResource = dataResource else { completion(nil, error) return } loadWebResource("imagedata.dat") { (imageResource, error) in guard let imageResource = imageResource else { completion(nil, error) return } decodeImage(dataResource, imageResource) { (imageTmp, error) in guard let imageTmp = imageTmp else { completion(nil, error) return } dewarpAndCleanupImage(imageTmp) { imageResult in guard let imageResult = imageResult else { completion(nil, error) return } completion(imageResult, nil) } } } } }

讓我們稍微分解一下這個算法。 函數processImageData是一個異步函數,它自己進行四次異步調用來完成它的工作。 四個異步調用以對基於塊的異步處理最自然的方式嵌套在另一個內部。 每個結果塊都有一個可選的錯誤參數,除了一個之外,所有的都包含一個額外的可選參數,表示 aysnc 操作的結果。

大多數 Swift 開發人員可能對上述代碼塊的形狀很熟悉。 但是這種方法有什麼問題呢? 以下痛點列表可能同樣熟悉。

  • 這種嵌套代碼塊的“末日金字塔”形狀很快就會變得笨拙。 如果我們再添加兩個異步操作會發生什麼? 四? 條件操作呢? 重試行為或資源限制保護如何? 現實世界的代碼永遠不會像博客文章中的示例那樣乾淨和簡單。 “末日金字塔”效應很容易導致代碼難以閱讀、難以維護並且容易出現錯誤。
  • 上述示例中的錯誤處理嘗試雖然 Swifty,但實際上是不完整的。 程序員假定兩參數、Objective-C 風格的異步回調塊將始終提供兩個參數之一; 他們永遠不會同時為零。 這不是一個安全的假設。 並發算法以難以編寫和調試而聞名,而毫無根據的假設是部分原因。 對於任何打算在現實世界中運行的並發算法來說,完整和正確的錯誤處理是不可避免的。
  • 再進一步考慮,可能編寫被調用異步函數的程序員沒有你那麼有原則。 如果存在被調用函數回調失敗的情況怎麼辦? 還是不止一次回電? 在這些情況下,processImageData 的正確性會發生什麼變化? 專業人士不會冒險。 即使依賴第三方編寫的功能,關鍵任務功能也需要正確。
  • 也許最引人注目的是,所考慮的異步算法是次優構造的。 前兩個異步操作都是遠程資源的下載。 即使它們沒有相互依賴關係,上述算法也會按順序而不是並行執行下載。 原因很明顯; 嵌套塊語法鼓勵這種浪費。 競爭市場不會容忍不必要的滯後。 如果您的應用程序沒有盡快執行其異步操作,那麼另一個應用程序將會執行。

我們怎樣才能做得更好? HoneyBee 是一個 futures/promises 庫,它使 Swift 並發編程變得簡單、富有表現力和安全。 讓我們用 HoneyBee 重寫上面的異步算法並檢查結果:

 func processImageData(completion: (result: Image?, error: Error?) -> Void) { HoneyBee.start() .setErrorHandler { completion(nil, $0) } .branch { stem in stem.chain(loadWebResource =<< "dataprofile.txt") + stem.chain(loadWebResource =<< "imagedata.dat") } .chain(decodeImage) .chain(dewarpAndCleanupImage) .chain { completion($0, nil) } }

這個實現開始的第一行是一個新的 HoneyBee 配方。 第二行建立默認的錯誤處理程序。 錯誤處理在 HoneyBee 配方中不是可選的。 如果出現問題,算法必須處理它。 第三行打開一個允許並行執行的分支。 loadWebResource的兩條鏈將並行執行,它們的結果將合併(第 5 行)。 兩個加載資源的組合值被轉發到decodeImage等,直到調用完成。

讓我們來看看上面列出的痛點,看看 HoneyBee 是如何改進這段代碼的。 維護此功能現在變得更加容易。 HoneyBee recipe 看起來像它所表達的算法。 代碼可讀、可理解且可快速修改。 HoneyBee 的設計確保指令的任何錯誤排序都會導致編譯時錯誤,而不是運行時錯誤。 該功能現在不太容易受到錯誤和人為錯誤的影響。

所有可能的運行時錯誤都已完全處理。 HoneyBee 支持的每個函數簽名(其中有 38 個)都保證得到完全處理。 在我們的示例中,Objective-C 風格的雙參數回調將產生一個非 nil 錯誤,該錯誤將被路由到錯誤處理程序,或者它會產生一個非 nil 值,該值將沿鏈向下傳遞,或者如果兩者都值為 nil HoneyBee 將生成一個錯誤,說明函數回調未履行其合同。

HoneyBee 還處理調用函數回調次數的合同正確性。 如果一個函數調用它的回調失敗,HoneyBee 會產生一個描述性的失敗。 如果函數多次調用其回調,HoneyBee 將抑制輔助調用並記錄警告。 這兩種故障響應(和其他)都可以根據程序員的個人需求進行定制。

希望這種形式的processImageData可以正確地並行化資源下載以提供最佳性能應該已經很明顯了。 HoneyBee 最強大的設計目標之一是配方應該看起來像它所表達的算法。

好多了。 正確的? 但是 HoneyBee 可以提供更多。

請注意:下一個案例研究不適合膽小的人。 考慮以下問題描述:您的移動應用程序使用CoreData來保持其狀態。 您有一個名為 Media 的NSManagedObject模型,它表示上傳到後端服務器的媒體資產。 允許用戶一次選擇幾十個媒體項,並批量上傳到後端系統。 媒體首先通過引用字符串表示,該字符串必須轉換為媒體對象。 幸運的是,您的應用程序已經包含一個輔助方法,它可以做到這一點:

 func export(_ mediaRef: String, completion: @escaping (Media?, Error?) -> Void) { // transcoding stuff completion(Media(context: managedObjectContext), nil) }

將媒體引用轉換為媒體對像後,您必須將媒體項上傳到後端。 再次,你有一個輔助函數準備好做網絡的東西。

 func upload(_ media: Media, completion: @escaping (Error?) -> Void) { // network stuff completion(nil) }

因為允許用戶一次選擇數十個媒體項目,所以用戶體驗設計師已經指定了相當多的關於上傳進度的反饋。 需求已被提煉為以下四個功能:

 /// Called if anything goes wrong in the upload func errorHandler(_ error: Error) { // do the right thing } /// Called once per mediaRef, after either a successful or unsuccessful upload func singleUploadCompletion(_ mediaRef: String) { // update a progress indicator } /// Called once per successful upload func singleUploadSuccess(_ media: Media) { // do celebratory things } /// Called if the entire batch was considered to be uploaded successfully. func totalProcessSuccess() { // declare victory }

但是,由於您的應用程序來源的媒體引用有時會過期,因此業務經理決定如果至少有一半的上傳成功,則向用戶發送“成功”消息。 也就是說,如果少於一半的嘗試上傳失敗,則並發進程應該宣布勝利並調用totalProcessSuccess 。 這是作為開發人員交給您的規範。 但作為一名經驗豐富的程序員,您意識到必須應用更多的要求。

當然,Business 希望批量上傳盡快進行,因此串行上傳是不可能的。 上傳必須並行執行。

但是不要太多。 如果你只是不加選擇地async整個批次,那麼數十個並發上傳將淹沒移動 NIC(網絡接口卡),並且上傳實際上會比串行進行得慢,而不是更快。

移動網絡連接被認為不穩定。 即使是短交易也可能僅由於網絡連接的變化而失敗。 為了真正聲明上傳失敗,我們需要至少重試一次上傳。

重試策略不應包括導出操作,因為它不會受到暫時失敗的影響。

導出過程是計算綁定的,因此必須在主線程之外執行。

因為導出是計算綁定的,所以它的並發實例數量應該比其餘的上傳過程少,以避免處理器崩潰。

上面描述的四個回調函數都會更新 UI,所以都必須在主線程上調用。

Media 是一個NSManagedObject ,它來自一個NSManagedObjectContext並且有自己的線程要求,必須遵守。

這個問題規範是不是有點晦澀難懂? 如果您發現未來潛伏著這樣的問題,請不要感到驚訝。 我在自己的工作中遇到過這樣的人。 我們先嘗試用傳統工具解決這個問題。 係好安全帶,這不會很漂亮。

 /// An enum describing specific problems that the algorithm might encounter. enum UploadingError : Error { case invalidResponse case tooManyFailures } /// A semaphore to prevent flooding the NIC let outerLimit = DispatchSemaphore(value: 4) /// A semaphore to prevent thrashing the processor let exportLimit = DispatchSemaphore(value: 1) /// The number of times to retry the upload if it fails let uploadRetries = 1 /// Dispatch group to keep track of when the entire process is finished let fullProcessDispatchGroup = DispatchGroup() /// How many of the uploads fully completed. var uploadSuccesses = 0 // this notify block is called when the full process has completed. fullProcessDispatchGroup.notify(queue: DispatchQueue.main) { let successRate = Float(uploadSuccesses) / Float(mediaReferences.count) if successRate > 0.5 { totalProcessSuccess() } else { errorHandler(UploadingError.tooManyFailures) } } // start in the background DispatchQueue.global().async { for mediaRef in mediaReferences { // alert the group that we're starting a process fullProcessDispatchGroup.enter() // wait until it's safe to start uploading outerLimit.wait() /// common cleanup operations needed later func finalizeMediaRef() { singleUploadCompletion(mediaRef) fullProcessDispatchGroup.leave() outerLimit.signal() } // wait until it's safe to start exporting exportLimit.wait() export(mediaRef) { (media, error) in // allow another export to begin exportLimit.signal() if let error = error { DispatchQueue.main.async { errorHandler(error) finalizeMediaRef() } } else { guard let media = media else { DispatchQueue.main.async { errorHandler(UploadingError.invalidResponse) finalizeMediaRef() } return } // the export was successful var uploadAttempts = 0 /// define the upload process and its retry behavior func doUpload() { // respect Media's threading requirements managedObjectContext.perform { upload(media) { error in if let error = error { if uploadAttempts < uploadRetries { uploadAttempts += 1 doUpload() // retry } else { DispatchQueue.main.async { // too many upload failures errorHandler(error) finalizeMediaRef() } } } else { DispatchQueue.main.async { uploadSuccesses += 1 singleUploadSuccess(media) finalizeMediaRef() } } } } } // kick off the first upload doUpload() } } } }

哇! 沒有註釋,大約是 75 行。 您是否一直遵循推理? 如果你在新工作的第一周遇到這個怪物,你會有什麼感覺? 您是否準備好維護它或修改它? 你知道它是否包含錯誤嗎? 它是否包含錯誤?

現在,考慮 HoneyBee 替代方案:

 HoneyBee.start(on: DispatchQueue.main) .setErrorHandler(errorHandler) .insert(mediaReferences) .setBlockPerformer(DispatchQueue.global()) .each(limit: 4, acceptableFailure: .ratio(0.5)) { elem in elem.finally { link in link.setBlockPerformer(DispatchQueue.main) .chain(singleUploadCompletion) } .limit(1) { link in link.chain(export) } .setBlockPerformer(managedObjectContext) .retry(1) { link in link.chain(upload) // subject to transient failure } .setBlockPerformer(DispatchQueue.main) .chain(singleUploadSuccess) } .setBlockPerformer(DispatchQueue.main) .drop() .chain(totalProcessSuccess)

這種形式如何打動你? 讓我們一點一點地解決它。 在第一行,我們從主線程開始啟動 HoneyBee 配方。 通過從主線程開始,我們確保所有錯誤都將傳遞給主線程上的 errorHandler(第 2 行)。 第 3 mediaReferences數組插入到進程鏈中。 接下來,我們切換到全局後台隊列,為一些並行性做準備。 在第 5 行,我們開始對每個mediaReferences進行並行迭代。 我們將這種並行性限制為最多 4 個並發操作。 我們還聲明,如果至少有一半的子鏈成功(不要出錯),則認為完整的迭代是成功的。 第 6 行聲明了一個finally鏈接,無論下面的子鏈成功還是失敗,都會調用該鏈接。 在finally鏈接上,我們切換到主線程(第 7 行)並調用singleUploadCompletion (第 8 行)。 在第 10 行,我們圍繞導出操作(第 11 行)設置了最大並行度 1(單次執行)。 第 13 行切換到我們的managedObjectContext實例擁有的私有隊列。 第 14 行聲明了上傳操作的一次重試嘗試(第 15 行)。 第 17 行再次切換到主線程,第 18 行調用singleUploadSuccess 。 到執行第 20 行時,所有並行迭代都已完成。 如果少於一半的迭代失敗,則第 20 行最後一次切換到主隊列(回想一下每次都在後台隊列上運行),第 21 行丟棄入站值(仍然是mediaReferences ),第 22 行調用totalProcessSuccess

HoneyBee 表單更清晰、更乾淨、更易於閱讀,更不用說更易於維護。 如果需要循環將 Media 對象重新集成到像 map 函數這樣的數組中,那麼這個算法的長形式會發生什麼? 在你做出改變之後,你對算法的所有要求仍然得到滿足有多大信心? 在 HoneyBee 形式中,此更改將用 map 替換每個以使用並行 map 功能。 (是的,它也減少了。)

HoneyBee 是一個強大的 Swift 未來庫,它使編寫異步和並發算法更容易、更安全、更有表現力。 在本文中,我們了解了 HoneyBee 如何使您的算法更易於維護、更正確和更快。 HoneyBee 還支持其他關鍵異步範例,例如重試支持、多個錯誤處理程序、資源保護和集合處理(映射、過濾器和歸約的異步形式)。 有關功能的完整列表,請參閱網站。 要了解更多信息或提出問題,請參閱全新的社區論壇。

附錄:確保異步函數的合同正確性

確保功能的合同正確性是計算機科學的基本原則。 以至於幾乎所有現代編譯器都有檢查以確保聲明返回值的函數只返回一次。 返回少於或多於一次被視為錯誤並適當地阻止完整編譯。

但是這種編譯器幫助通常不適用於異步函數。 考慮以下(有趣的)示例:

 func generateIcecream(from int: Int, completion: (String) -> Void) { if int > 5 { if int < 20 { completion("Chocolate") } else if int < 10 { completion("Strawberry") } completion("Pistachio") } else if int < 2 { completion("Vanilla") } }

generateIcecream函數接受一個 Int 並異步返回一個 String。 swift 編譯器很高興地接受上述形式是正確的,即使它包含一些明顯的問題。 給定某些輸入,此函數可能會調用完成零、一或兩次。 使用過異步函數的程序員經常會在自己的工作中回憶起這個問題的例子。 我們可以做什麼? 當然,我們可以將代碼重構得更整潔(帶有範圍情況的開關可以在這里工作)。 但有時功能複雜性很難降低。 如果編譯器能像定期返回函數一樣幫助我們驗證正確性不是更好嗎?

事實證明有一種方法。 請注意以下 Swifty 咒語:

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { completion("Chocolate") } else if int < 10 { completion("Strawberry") } // else completion("Pistachio") } else if int < 2 { completion("Vanilla") } }

在該函數頂部插入的四行強制編譯器驗證完成回調是否只調用了一次,這意味著該函數不再編譯。 這是怎麼回事? 在第一行,我們聲明但不初始化我們最終希望這個函數產生的結果。 通過讓它未定義,我們確保它必須被分配一次才能使用它,並且通過聲明它讓我們確保它永遠不會被分配兩次。 第二行是一個 defer,它將作為這個函數的最後一個動作執行。 它使用finalResult調用完成塊 - 在它被函數的其餘部分分配之後。 第 3 行創建了一個名為 completion 的新常量,它隱藏了回調參數。 新的完成是 Void 類型,它沒有聲明公共 API。 此行確保在此行之後使用完成將是編譯器錯誤。 第 2 行的 defer 是完成塊的唯一允許使用。 第 4 行刪除了一個編譯器警告,否則會出現關於未使用的新完成常量的警告。

所以我們已經成功地強制 swift 編譯器報告這個異步函數沒有履行它的契約。 讓我們逐步完成步驟以使其正確。 首先,讓我們將所有對回調的直接訪問替換為對finalResult的賦值。

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { finalResult = "Chocolate" } else if int < 10 { finalResult = "Strawberry" } // else finalResult = "Pistachio" } else if int < 2 { finalResult = "Vanilla" } }

現在編譯器報告了兩個問題:

 error: AsyncCorrectness.playground:1:8: error: constant 'finalResult' used before being initialized defer { completion(finalResult) } ^ error: AsyncCorrectness.playground:11:3: error: immutable value 'finalResult' may only be initialized once finalResult = "Pistachio"

正如預期的那樣,該函數有一個最終結果被分配零次的路徑, finalResult一個被多次分配的路徑。 我們解決這些問題如下:

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { finalResult = "Chocolate" } else if int < 10 { finalResult = "Strawberry" } else { finalResult = "Pistachio" } } else if int < 2 { finalResult = "Vanilla" } else { finalResult = "Neapolitan" } }

“開心果”已移至適當的 else 子句,我們意識到我們未能涵蓋一般情況——當然是“那不勒斯”。

剛剛描述的模式可以很容易地調整為返回可選值、可選錯誤或複雜類型,如常見的 Result 枚舉。 通過強制編譯器驗證回調只被調用一次,我們可以斷言異步函數的正確性和完整性。