Расширенный параллелизм в Swift с HoneyBee

Опубликовано: 2022-03-11

Проектировать, тестировать и поддерживать параллельные алгоритмы в Swift сложно, и правильная проработка деталей имеет решающее значение для успеха вашего приложения. Параллельный алгоритм (также называемый параллельным программированием) — это алгоритм, предназначенный для выполнения нескольких (возможно, многих) операций одновременно, чтобы использовать больше аппаратных ресурсов и сократить общее время выполнения.

На платформах Apple традиционным способом написания параллельных алгоритмов является NSOperation. Дизайн NSOperation предлагает программисту разделить параллельный алгоритм на отдельные длительные асинхронные задачи. Каждая задача будет определена в своем собственном подклассе NSOperation, и экземпляры этих классов будут объединены через объективный API для создания частичного порядка задач во время выполнения. Этот метод разработки параллельных алгоритмов был самым современным на платформах Apple в течение семи лет.

В 2014 году Apple представила Grand Central Dispatch (GCD) как значительный шаг вперед в выражении параллельных операций. GCD, наряду с новыми языковыми функциональными блоками, которые сопровождали и поддерживали его, предоставили способ компактного описания обработчика асинхронного ответа сразу после инициирования асинхронного запроса. Программистов больше не поощряли распространять определение одновременных задач по нескольким файлам в многочисленных подклассах NSOperation. Теперь весь параллельный алгоритм вполне можно написать в одном методе. Это увеличение выразительности и безопасности типов было значительным концептуальным сдвигом вперед. Алгоритм, типичный для такого способа записи, может выглядеть следующим образом:

 func processImageData(completion: (result: Image?, error: Error?) -> Void) { loadWebResource("dataprofile.txt") { (dataResource, error) in guard let dataResource = dataResource else { completion(nil, error) return } loadWebResource("imagedata.dat") { (imageResource, error) in guard let imageResource = imageResource else { completion(nil, error) return } decodeImage(dataResource, imageResource) { (imageTmp, error) in guard let imageTmp = imageTmp else { completion(nil, error) return } dewarpAndCleanupImage(imageTmp) { imageResult in guard let imageResult = imageResult else { completion(nil, error) return } completion(imageResult, nil) } } } } }

Давайте немного разберем этот алгоритм. Функция processImageData — это асинхронная функция, которая выполняет четыре собственных асинхронных вызова для завершения своей работы. Четыре асинхронных вызова вложены друг в друга таким образом, что это наиболее естественно для блочной асинхронной обработки. Каждый из блоков результатов имеет необязательный параметр Error, и все, кроме одного, содержат дополнительный необязательный параметр, обозначающий результат операции aysnc.

Форма приведенного выше блока кода, вероятно, знакома большинству разработчиков Swift. Но что плохого в этом подходе? Следующий список болевых точек, вероятно, будет вам знаком.

  • Эта «пирамида гибели» вложенных блоков кода может быстро стать громоздкой. Что произойдет, если мы добавим еще две асинхронные операции? Четыре? А условные операции? Как насчет поведения при повторных попытках или защиты от ограничений ресурсов? Реальный код никогда не бывает таким чистым и простым, как примеры в сообщениях блога. Эффект «пирамиды гибели» может легко привести к коду, который трудно читать, трудно поддерживать и который подвержен ошибкам.
  • Попытка обработки ошибок в приведенном выше примере, хотя и Swifty, на самом деле неполная. Программист предположил, что блоки асинхронного обратного вызова в стиле Objective-C с двумя параметрами всегда будут предоставлять один из двух параметров; они никогда не будут равны нулю одновременно. Это не безопасное предположение. Параллельные алгоритмы известны тем, что их сложно писать и отлаживать, а необоснованные предположения являются одной из причин. Полная и правильная обработка ошибок является неизбежной необходимостью для любого параллельного алгоритма, предназначенного для работы в реальном мире.
  • Продолжая эту мысль, возможно, программист, написавший вызываемые асинхронные функции, не был таким принципиальным, как вы. Что делать, если есть условия, при которых вызываемые функции не могут выполнить обратный вызов? Или перезванивать больше одного раза? Что происходит с правильностью processImageData в этих обстоятельствах? Профи не рискуют. Критически важные функции должны быть корректными, даже если они основаны на функциях, написанных третьими лицами.
  • Возможно, наиболее убедительным является то, что рассматриваемый асинхронный алгоритм построен неоптимально. Первые две асинхронные операции — это загрузка удаленных ресурсов. Несмотря на то, что они не имеют взаимозависимости, описанный выше алгоритм выполняет загрузку последовательно, а не параллельно. Причины этого очевидны; синтаксис вложенных блоков поощряет такую ​​расточительность. Конкурентные рынки не терпят ненужного отставания. Если ваше приложение не выполняет свои асинхронные операции как можно быстрее, это сделает другое приложение.

Как мы можем сделать лучше? HoneyBee — это библиотека будущего/обещания, которая делает параллельное программирование Swift простым, выразительным и безопасным. Давайте перепишем приведенный выше асинхронный алгоритм с помощью HoneyBee и изучим результат:

 func processImageData(completion: (result: Image?, error: Error?) -> Void) { HoneyBee.start() .setErrorHandler { completion(nil, $0) } .branch { stem in stem.chain(loadWebResource =<< "dataprofile.txt") + stem.chain(loadWebResource =<< "imagedata.dat") } .chain(decodeImage) .chain(dewarpAndCleanupImage) .chain { completion($0, nil) } }

Первая строка, которую запускает эта реализация, — это новый рецепт HoneyBee. Вторая строка устанавливает обработчик ошибок по умолчанию. Обработка ошибок не является обязательной в рецептах HoneyBee. Если что-то может пойти не так, алгоритм должен это обработать. Третья строка открывает ветвь, которая позволяет выполнять параллельное выполнение. Две цепочки loadWebResource будут выполняться параллельно, и их результаты будут объединены (строка 5). Объединенные значения двух загруженных ресурсов пересылаются в decodeImage и так далее по цепочке, пока не будет вызвано завершение.

Давайте пройдемся по приведенному выше списку болевых точек и посмотрим, как HoneyBee улучшила этот код. Поддерживать эту функцию теперь значительно проще. Рецепт HoneyBee выглядит как алгоритм, который он выражает. Код читаем, понятен и быстро модифицируется. Конструкция HoneyBee гарантирует, что любой неправильный порядок инструкций приведет к ошибке времени компиляции, а не к ошибке времени выполнения. Функция теперь гораздо менее подвержена ошибкам и человеческим ошибкам.

Все возможные ошибки времени выполнения были полностью обработаны. Каждая сигнатура функции, которую поддерживает HoneyBee (их 38), гарантированно будет полностью обработана. В нашем примере двухпараметрический обратный вызов в стиле Objective-C либо выдаст ненулевую ошибку, которая будет направлена ​​обработчику ошибок, либо выдаст ненулевое значение, которое будет продвигаться вниз по цепочке, либо, если оба значения равны нулю HoneyBee сгенерирует ошибку, объясняющую, что функция обратного вызова не выполняет свой контракт.

HoneyBee также обрабатывает контрактную корректность для количества вызовов обратных вызовов функций. Если функция не может вызвать свой обратный вызов, HoneyBee выдает описательный сбой. Если функция вызывает свой обратный вызов более одного раза, HoneyBee подавляет вспомогательные вызовы и регистрирует предупреждения. Обе эти реакции на ошибку (и другие) могут быть настроены в соответствии с индивидуальными потребностями программиста.

Надеюсь, уже должно быть очевидно, что эта форма processImageData правильно распараллеливает загрузку ресурсов для обеспечения оптимальной производительности. Одной из главных целей дизайна HoneyBee является то, что рецепт должен выглядеть как алгоритм, который он выражает.

Намного лучше. Верно? Но HoneyBee может предложить гораздо больше.

Имейте в виду: следующее тематическое исследование не для слабонервных. Рассмотрим следующее описание проблемы. Ваше мобильное приложение использует CoreData для сохранения своего состояния. У вас есть модель NSManagedObject с именем Media, которая представляет мультимедийный ресурс, загруженный на ваш внутренний сервер. Пользователю должно быть разрешено выбирать десятки элементов мультимедиа одновременно и загружать их пакетом в серверную систему. Медиафайлы сначала представляются через ссылочную строку, которую необходимо преобразовать в медиа-объект. К счастью, ваше приложение уже содержит вспомогательный метод, который делает именно это:

 func export(_ mediaRef: String, completion: @escaping (Media?, Error?) -> Void) { // transcoding stuff completion(Media(context: managedObjectContext), nil) }

После преобразования ссылки мультимедиа в объект мультимедиа вы должны загрузить элемент мультимедиа на сервер. Опять же, у вас есть вспомогательная функция, готовая делать сетевые вещи.

 func upload(_ media: Media, completion: @escaping (Error?) -> Void) { // network stuff completion(nil) }

Поскольку пользователю разрешено выбирать десятки элементов мультимедиа одновременно, UX-дизайнер предусмотрел довольно надежную обратную связь о ходе загрузки. Требования были разделены на следующие четыре функции:

 /// Called if anything goes wrong in the upload func errorHandler(_ error: Error) { // do the right thing } /// Called once per mediaRef, after either a successful or unsuccessful upload func singleUploadCompletion(_ mediaRef: String) { // update a progress indicator } /// Called once per successful upload func singleUploadSuccess(_ media: Media) { // do celebratory things } /// Called if the entire batch was considered to be uploaded successfully. func totalProcessSuccess() { // declare victory }

Однако, поскольку ваше приложение использует медиа-ссылки, срок действия которых иногда истек, бизнес-менеджеры решили отправить пользователю сообщение об успешном выполнении, если хотя бы половина загрузок прошла успешно. Другими словами, параллельный процесс должен объявить победу и вызвать totalProcessSuccess если менее половины попыток загрузки завершатся неудачно. Это спецификация, переданная вам как разработчику. Но, как опытный программист, вы понимаете, что существует больше требований, которые необходимо применять.

Конечно, бизнес хочет, чтобы пакетная загрузка происходила как можно быстрее, поэтому о последовательной загрузке не может быть и речи. Загрузки должны выполняться параллельно.

Но не слишком много. Если вы просто без разбора async весь пакет, десятки одновременных загрузок переполнят мобильную сетевую карту (сетевую интерфейсную карту), и загрузки будут фактически выполняться медленнее, чем последовательные, а не быстрее.

Соединения с мобильными сетями не считаются стабильными. Даже короткие транзакции могут завершиться неудачей только из-за изменений в сетевом подключении. Чтобы действительно объявить, что загрузка не удалась, нам нужно будет повторить загрузку хотя бы один раз.

Политика повторных попыток не должна включать операцию экспорта, так как она не подвержена временным сбоям.

Процесс экспорта зависит от вычислений и поэтому должен выполняться вне основного потока.

Поскольку экспорт привязан к вычислительным ресурсам, у него должно быть меньшее количество одновременных экземпляров, чем у остального процесса загрузки, чтобы избежать перегрузки процессора.

Все четыре функции обратного вызова, описанные выше, обновляют пользовательский интерфейс, поэтому все они должны вызываться в основном потоке.

Media — это NSManagedObject , который исходит из NSManagedObjectContext и имеет свои собственные требования к многопоточности, которые необходимо соблюдать.

Эта спецификация проблемы кажется немного неясной? Не удивляйтесь, если вы обнаружите, что подобные проблемы скрываются в вашем будущем. Я сталкивался с подобным в своей работе. Давайте сначала попробуем решить эту проблему традиционными инструментами. Пристегнитесь, это будет некрасиво.

 /// An enum describing specific problems that the algorithm might encounter. enum UploadingError : Error { case invalidResponse case tooManyFailures } /// A semaphore to prevent flooding the NIC let outerLimit = DispatchSemaphore(value: 4) /// A semaphore to prevent thrashing the processor let exportLimit = DispatchSemaphore(value: 1) /// The number of times to retry the upload if it fails let uploadRetries = 1 /// Dispatch group to keep track of when the entire process is finished let fullProcessDispatchGroup = DispatchGroup() /// How many of the uploads fully completed. var uploadSuccesses = 0 // this notify block is called when the full process has completed. fullProcessDispatchGroup.notify(queue: DispatchQueue.main) { let successRate = Float(uploadSuccesses) / Float(mediaReferences.count) if successRate > 0.5 { totalProcessSuccess() } else { errorHandler(UploadingError.tooManyFailures) } } // start in the background DispatchQueue.global().async { for mediaRef in mediaReferences { // alert the group that we're starting a process fullProcessDispatchGroup.enter() // wait until it's safe to start uploading outerLimit.wait() /// common cleanup operations needed later func finalizeMediaRef() { singleUploadCompletion(mediaRef) fullProcessDispatchGroup.leave() outerLimit.signal() } // wait until it's safe to start exporting exportLimit.wait() export(mediaRef) { (media, error) in // allow another export to begin exportLimit.signal() if let error = error { DispatchQueue.main.async { errorHandler(error) finalizeMediaRef() } } else { guard let media = media else { DispatchQueue.main.async { errorHandler(UploadingError.invalidResponse) finalizeMediaRef() } return } // the export was successful var uploadAttempts = 0 /// define the upload process and its retry behavior func doUpload() { // respect Media's threading requirements managedObjectContext.perform { upload(media) { error in if let error = error { if uploadAttempts < uploadRetries { uploadAttempts += 1 doUpload() // retry } else { DispatchQueue.main.async { // too many upload failures errorHandler(error) finalizeMediaRef() } } } else { DispatchQueue.main.async { uploadSuccesses += 1 singleUploadSuccess(media) finalizeMediaRef() } } } } } // kick off the first upload doUpload() } } } }

Вау! Без комментариев это около 75 строк. Вы следовали рассуждениям до конца? Как бы вы себя чувствовали, если бы столкнулись с этим монстром в первую же неделю на новой работе? Готовы ли вы сохранить его или изменить? Вы бы знали, если бы он содержал ошибки? Содержит ли он ошибки?

Теперь рассмотрим альтернативу HoneyBee:

 HoneyBee.start(on: DispatchQueue.main) .setErrorHandler(errorHandler) .insert(mediaReferences) .setBlockPerformer(DispatchQueue.global()) .each(limit: 4, acceptableFailure: .ratio(0.5)) { elem in elem.finally { link in link.setBlockPerformer(DispatchQueue.main) .chain(singleUploadCompletion) } .limit(1) { link in link.chain(export) } .setBlockPerformer(managedObjectContext) .retry(1) { link in link.chain(upload) // subject to transient failure } .setBlockPerformer(DispatchQueue.main) .chain(singleUploadSuccess) } .setBlockPerformer(DispatchQueue.main) .drop() .chain(totalProcessSuccess)

Чем вам запомнилась эта форма? Давайте проработаем его по частям. В первой строке мы запускаем рецепт HoneyBee, начиная с основного потока. Начиная с основного потока, мы гарантируем, что все ошибки будут переданы в errorHandler (строка 2) в основном потоке. Строка 3 вставляет массив mediaReferences в цепочку процессов. Затем мы переключаемся на глобальную фоновую очередь, готовясь к некоторому параллелизму. В строке 5 мы начинаем параллельную итерацию по каждому из mediaReferences . Мы ограничиваем этот параллелизм максимум 4 одновременными операциями. Мы также заявляем, что полная итерация будет считаться успешной, если хотя бы половина подцепей будет успешной (не будет ошибки). Строка 6 объявляет ссылку finally , которая будет вызываться независимо от того, будет ли следующая подцепочка успешной или неудачной. По ссылке finally мы переключаемся на основной поток (строка 7) и вызываем singleUploadCompletion (строка 8). В строке 10 мы устанавливаем максимальное распараллеливание 1 (однократное выполнение) вокруг операции экспорта (строка 11). Строка 13 переключается на частную очередь, принадлежащую нашему экземпляру managedObjectContext . Строка 14 объявляет одну повторную попытку операции загрузки (строка 15). Строка 17 снова переключается на основной поток, а 18 вызывает singleUploadSuccess . К моменту выполнения строки 20 все параллельные итерации будут завершены. Если менее половины итераций завершились неудачей, то строка 20 в последний раз переключается на основную очередь (вспомним, что каждая из них выполнялась в фоновой очереди), 21 отбрасывает входящее значение (по-прежнему mediaReferences ), а 22 вызывает totalProcessSuccess .

Форма HoneyBee понятнее, чище и легче читается, не говоря уже о том, что ее легче поддерживать. Что случилось бы с длинной формой этого алгоритма, если бы цикл потребовался для реинтеграции объектов Media в массив, как функция карты? После того, как вы внесете изменения, насколько вы будете уверены, что все требования алгоритма по-прежнему выполняются? В форме HoneyBee это изменение будет заключаться в замене каждого на карту, чтобы использовать функцию параллельной карты. (Да, он тоже уменьшился.)

HoneyBee — это мощная библиотека Futures для Swift, которая делает написание асинхронных и параллельных алгоритмов проще, безопаснее и выразительнее. В этой статье мы увидели, как HoneyBee может сделать ваши алгоритмы более простыми в обслуживании, более точными и быстрыми. HoneyBee также поддерживает другие ключевые асинхронные парадигмы, такие как поддержка повторных попыток, множественные обработчики ошибок, защита ресурсов и обработка коллекций (асинхронные формы сопоставления, фильтрации и сокращения). Полный список функций см. на веб-сайте. Чтобы узнать больше или задать вопросы, посетите новые форумы сообщества.

Приложение: Обеспечение договорной правильности асинхронных функций

Обеспечение договорной корректности функций является фундаментальным принципом информатики. Настолько, что практически все современные компиляторы имеют проверки, гарантирующие, что функция, объявляющая о возвращении значения, возвращается ровно один раз. Возврат менее или более одного раза рассматривается как ошибка и соответствующим образом предотвращает полную компиляцию.

Но эта помощь компилятора обычно не распространяется на асинхронные функции. Рассмотрим следующий (игровой) пример:

 func generateIcecream(from int: Int, completion: (String) -> Void) { if int > 5 { if int < 20 { completion("Chocolate") } else if int < 10 { completion("Strawberry") } completion("Pistachio") } else if int < 2 { completion("Vanilla") } }

Функция generateIcecream принимает Int и асинхронно возвращает String. Компилятор Swift с радостью принимает приведенную выше форму как правильную, даже несмотря на то, что она содержит некоторые очевидные проблемы. При определенных входных данных эта функция может вызывать завершение ноль, один или два раза. Программисты, которые работали с асинхронными функциями, часто вспоминают примеры этой проблемы в своей работе. Что мы можем сделать? Конечно, мы могли бы сделать рефакторинг кода более аккуратным (здесь сработал бы переключатель с регистрами диапазонов). Но иногда функциональную сложность трудно уменьшить. Не было бы лучше, если бы компилятор мог помочь нам в проверке правильности, как это происходит с регулярно возвращаемыми функциями?

Оказывается есть способ. Соблюдайте следующее заклинание Свифти:

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { completion("Chocolate") } else if int < 10 { completion("Strawberry") } // else completion("Pistachio") } else if int < 2 { completion("Vanilla") } }

Четыре строки, вставленные в начало этой функции, заставляют компилятор проверять, что обратный вызов завершения вызывается ровно один раз, что означает, что эта функция больше не компилируется. В чем дело? В первой строке мы объявляем, но не инициализируем результат, который мы в конечном итоге хотим получить от этой функции. Оставив его неопределенным, мы гарантируем, что он должен быть назначен один раз, прежде чем его можно будет использовать, а объявив его, мы гарантируем, что он никогда не может быть назначен дважды. Вторая строка — это отсрочка, которая будет выполняться как последнее действие этой функции. Он вызывает блок завершения с finalResult - после того, как он был назначен остальной частью функции. Строка 3 создает новую константу с именем завершения, которая скрывает параметр обратного вызова. Новое завершение имеет тип Void, который не объявляет общедоступный API. Эта строка гарантирует, что любое использование завершения после этой строки будет ошибкой компилятора. Отсрочка в строке 2 — единственное разрешенное использование блока завершения. Строка 4 удаляет предупреждение компилятора о неиспользовании новой константы завершения, которое в противном случае присутствовало бы.

Итак, мы успешно заставили компилятор swift сообщить, что эта асинхронная функция не выполняет свой контракт. Давайте пройдемся по шагам, чтобы сделать это правильно. Во-первых, давайте заменим весь прямой доступ к обратному вызову присваиванием finalResult .

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { finalResult = "Chocolate" } else if int < 10 { finalResult = "Strawberry" } // else finalResult = "Pistachio" } else if int < 2 { finalResult = "Vanilla" } }

Теперь компилятор сообщает о двух проблемах:

 error: AsyncCorrectness.playground:1:8: error: constant 'finalResult' used before being initialized defer { completion(finalResult) } ^ error: AsyncCorrectness.playground:11:3: error: immutable value 'finalResult' may only be initialized once finalResult = "Pistachio"

Как и ожидалось, у функции есть путь, по которому finalResult присваивается ноль раз, а также путь, по которому он присваивается более одного раза. Мы решаем эти проблемы следующим образом:

 func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { finalResult = "Chocolate" } else if int < 10 { finalResult = "Strawberry" } else { finalResult = "Pistachio" } } else if int < 2 { finalResult = "Vanilla" } else { finalResult = "Neapolitan" } }

«Фисташковый» был перемещен в соответствующее предложение else, и мы понимаем, что не смогли охватить общий случай — который, конечно же, является «неаполитанским».

Только что описанные шаблоны можно легко настроить для возврата необязательных значений, необязательных ошибок или сложных типов, таких как обычное перечисление Result. Заставляя компилятор проверять, что обратные вызовы вызываются ровно один раз, мы можем утверждать правильность и полноту асинхронных функций.