Учебник по многопоточности и многопроцессорности Python
Опубликовано: 2022-03-11Дискуссии, критикующие Python, часто говорят о том, как сложно использовать Python для многопоточной работы, указывая пальцем на то, что известно как глобальная блокировка интерпретатора (ласково называемая GIL), которая предотвращает одновременное выполнение нескольких потоков кода Python. Из-за этого модуль многопоточности Python ведет себя не совсем так, как вы ожидаете, если вы не являетесь разработчиком Python и используете другие языки, такие как C++ или Java. Необходимо четко понимать, что на Python все еще можно писать код, который работает одновременно или параллельно, и резко изменить результирующую производительность, если принять во внимание определенные вещи. Если вы еще не читали ее, я предлагаю вам взглянуть на статью Eqbal Quran о параллелизме и параллелизме в Ruby здесь, в блоге Toptal Engineering.
В этом руководстве по параллелизму Python мы напишем небольшой скрипт Python для загрузки самых популярных изображений с Imgur. Мы начнем с версии, которая загружает изображения последовательно или по одному. В качестве предварительного условия вам нужно будет зарегистрировать приложение на Imgur. Если у вас еще нет учетной записи Imgur, сначала создайте ее.
Скрипты в этих примерах потоков были протестированы с Python 3.6.4. С некоторыми изменениями они также должны работать с Python 2 — urllib — это то, что больше всего изменилось между этими двумя версиями Python.
Начало работы с многопоточностью Python
Начнем с создания модуля Python с именем download.py
. Этот файл будет содержать все функции, необходимые для получения списка изображений и их загрузки. Мы разделим эти функции на три отдельные функции:
-
get_links
-
download_link
-
setup_download_dir
Третья функция, setup_download_dir
, будет использоваться для создания каталога назначения загрузки, если он еще не существует.
API Imgur требует, чтобы HTTP-запросы содержали заголовок Authorization
с идентификатором клиента. Вы можете найти этот идентификатор клиента на панели управления приложения, которое вы зарегистрировали на Imgur, и ответ будет закодирован в формате JSON. Мы можем использовать стандартную библиотеку Python JSON для его декодирования. Загрузка изображения — еще более простая задача, так как все, что вам нужно сделать, это получить изображение по его URL-адресу и записать его в файл.
Вот как выглядит скрипт:
import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir
Далее нам нужно написать модуль, который будет использовать эти функции для загрузки изображений по одному. Мы назовем его single.py
. Он будет содержать основную функцию нашей первой, наивной версии загрузчика изображений Imgur. Модуль получит идентификатор клиента Imgur в переменной среды IMGUR_CLIENT_ID
. Он вызовет setup_download_dir
для создания каталога назначения загрузки. Наконец, он получит список изображений с помощью функции get_links
, отфильтрует все URL-адреса GIF и альбомов, а затем использует download_link
для загрузки и сохранения каждого из этих изображений на диск. Вот как выглядит single.py
:
import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
На моем ноутбуке этот скрипт загружал 91 изображение за 19,4 секунды. Обратите внимание, что эти цифры могут различаться в зависимости от сети, в которой вы находитесь. 19,4 секунды — это не так уж много, но что, если мы хотим загрузить больше изображений? Возможно, 900 изображений вместо 90. В среднем 0,2 секунды на изображение, 900 изображений займут примерно 3 минуты. Для 9000 снимков потребуется 30 минут. Хорошая новость заключается в том, что, введя параллелизм или параллелизм, мы можем значительно ускорить это.
Во всех последующих примерах кода будут отображаться только операторы импорта, которые являются новыми и специфичными для этих примеров. Для удобства все эти скрипты Python можно найти в этом репозитории GitHub.
Параллелизм и параллелизм в Python: пример многопоточности
Многопоточность — один из самых известных подходов к достижению параллелизма и параллелизма в Python. Многопоточность — это функция, обычно предоставляемая операционной системой. Потоки легче процессов и используют то же пространство памяти.
В этом примере с потоками Python мы напишем новый модуль для замены single.py
. Этот модуль создаст пул из восьми потоков, всего девять потоков, включая основной поток. Я выбрал восемь рабочих потоков, потому что мой компьютер имеет восемь ядер ЦП, и один рабочий поток на ядро кажется хорошим числом для одновременного запуска нескольких потоков. На практике это число выбирается гораздо более тщательно на основе других факторов, таких как другие приложения и службы, работающие на том же компьютере.
Это почти то же самое, что и предыдущее, за исключением того, что теперь у нас есть новый класс DownloadWorker
, который является потомком класса Python Thread
. Метод run был переопределен, что запускает бесконечный цикл. На каждой итерации он вызывает self.queue.get()
, чтобы попытаться получить URL из потокобезопасной очереди. Он блокируется до тех пор, пока в очереди не появится элемент для обработки работником. Как только рабочий процесс получает элемент из очереди, он вызывает тот же метод download_link
, который использовался в предыдущем скрипте для загрузки изображения в каталог изображений. После завершения загрузки рабочий сигнализирует очереди о том, что эта задача выполнена. Это очень важно, потому что Queue отслеживает, сколько задач было поставлено в очередь. Вызов queue.join()
заблокирует основной поток навсегда, если рабочие не просигнализируют о завершении задачи.
import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
Выполнение этого примера скрипта с потоковой передачей Python на той же машине, которая использовалась ранее, приводит к времени загрузки 4,1 секунды! Это в 4,7 раза быстрее, чем в предыдущем примере. Хотя это намного быстрее, стоит упомянуть, что из-за GIL одновременно выполнялся только один поток на протяжении всего этого процесса. Следовательно, этот код является параллельным, но не параллельным. Причина, по которой это все еще быстрее, заключается в том, что это задача, связанная с вводом-выводом. Процессор почти не потеет при загрузке этих изображений, и большую часть времени тратится на ожидание сети. Вот почему многопоточность Python может обеспечить значительное увеличение скорости. Процессор может переключаться между потоками всякий раз, когда один из них готов выполнить какую-либо работу. Использование модуля threading в Python или любом другом интерпретируемом языке с GIL может фактически привести к снижению производительности. Если ваш код выполняет задачу, связанную с ЦП, например распаковку файлов gzip, использование модуля threading
приведет к более медленному времени выполнения. Для задач, связанных с ЦП, и действительно параллельного выполнения мы можем использовать модуль многопроцессорности.
Хотя де-факто эталонная реализация Python — CPython — имеет GIL, это верно не для всех реализаций Python. Например, IronPython, реализация Python, использующая платформу .NET, не имеет GIL, равно как и Jython, реализация на основе Java. Вы можете найти список рабочих реализаций Python здесь.
Параллелизм и параллелизм в Python, пример 2: создание нескольких процессов
Модуль многопроцессорности добавить проще, чем модуль потоковой обработки, поскольку нам не нужно добавлять класс, как в примере с потоковой передачей Python. Единственные изменения, которые нам нужно внести, находятся в функции main.
Чтобы использовать несколько процессов, мы создаем многопроцессорный Pool
. С помощью метода карты, который он предоставляет, мы передадим список URL-адресов в пул, который, в свою очередь, создаст восемь новых процессов и будет использовать каждый из них для параллельной загрузки изображений. Это настоящий параллелизм, но за него приходится платить. Вся память скрипта копируется в каждый порожденный подпроцесс. В этом простом примере это не имеет большого значения, но для нетривиальных программ это может стать серьезной проблемой.
import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
Параллелизм и параллелизм в Python, пример 3: распространение среди нескольких рабочих
Хотя модули многопоточности и многопроцессорности отлично подходят для сценариев, которые выполняются на вашем персональном компьютере, что вам следует делать, если вы хотите, чтобы работа выполнялась на другой машине, или вам нужно масштабировать до большего, чем может ЦП на одной машине? ручка? Отличным примером использования этого являются длительные серверные задачи для веб-приложений. Если у вас есть какие-то длительные задачи, вы не хотите запускать кучу подпроцессов или потоков на одном компьютере, на котором должен выполняться остальной код вашего приложения. Это ухудшит производительность вашего приложения для всех ваших пользователей. Было бы здорово иметь возможность запускать эти задания на другой машине или на многих других машинах.
Отличной библиотекой Python для этой задачи является RQ, очень простая, но мощная библиотека. Сначала вы ставите в очередь функцию и ее аргументы, используя библиотеку. Это собирает представление вызова функции, которое затем добавляется в список Redis. Постановка задания в очередь — это первый шаг, но он еще ничего не сделает. Нам также нужен хотя бы один рабочий процесс для прослушивания этой очереди заданий.
Первый шаг — установить и запустить сервер Redis на вашем компьютере или получить доступ к работающему серверу Redis. После этого в существующий код вносится лишь несколько небольших изменений. Сначала мы создаем экземпляр RQ Queue и передаем ему экземпляр сервера Redis из библиотеки redis-py. Затем, вместо того, чтобы просто вызывать наш метод download_link
, мы вызываем q.enqueue(download_link, download_dir, link)
. Метод enqueue принимает функцию в качестве своего первого аргумента, затем любые другие аргументы или аргументы ключевого слова передаются этой функции, когда задание фактически выполняется.

Последний шаг, который нам нужно сделать, это запустить несколько воркеров. RQ предоставляет удобный скрипт для запуска воркеров в очереди по умолчанию. Просто запустите rqworker
в окне терминала, и он запустит рабочий процесс, прослушивающий очередь по умолчанию. Убедитесь, что ваш текущий рабочий каталог совпадает с каталогом, в котором находятся скрипты. Если вы хотите прослушивать другую очередь, вы можете запустить rqworker queue_name
и он будет прослушивать эту именованную очередь. Самое замечательное в RQ заключается в том, что пока вы можете подключиться к Redis, вы можете запускать столько воркеров, сколько захотите, на любом количестве разных машин; поэтому его очень легко масштабировать по мере роста вашего приложения. Вот исходник версии RQ:
import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()
Однако RQ — не единственное решение для очереди заданий Python. RQ прост в использовании и очень хорошо охватывает простые варианты использования, но если требуются более сложные параметры, можно использовать другие решения для очередей Python 3 (например, Celery).
Многопоточность Python против многопроцессорности
Если ваш код связан с вводом-выводом, вам подойдут как многопроцессорность, так и многопоточность в Python. Многопроцессорность легче просто добавить, чем многопоточность, но она требует больших затрат памяти. Если ваш код привязан к ЦП, многопроцессорность, скорее всего, будет лучшим выбором, особенно если целевая машина имеет несколько ядер или ЦП. Для веб-приложений и когда вам нужно масштабировать работу на нескольких машинах, RQ будет лучше для вас.
Обновлять
Python concurrent.futures
Что-то новое после Python 3.2, которое не было затронуто в исходной статье, — это пакет concurrent.futures
. Этот пакет предоставляет еще один способ использования параллелизма и параллелизма в Python.
В исходной статье я упомянул, что модуль многопроцессорности Python будет легче вставить в существующий код, чем модуль многопоточности. Это было связано с тем, что модуль многопоточности Python 3 требовал подкласса класса Thread
, а также создания Queue
для потоков, чтобы отслеживать работу.
Использование concurrent.futures.ThreadPoolExecutor делает код примера многопоточной обработки Python почти идентичным многопроцессорному модулю.
import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()
Теперь, когда у нас есть все эти изображения, загруженные с помощью нашего Python ThreadPoolExecutor
, мы можем использовать их для тестирования задачи, связанной с процессором. Мы можем создавать эскизы всех изображений как в однопоточном скрипте с одним процессом, так и тестировать многопроцессорное решение.
Мы собираемся использовать библиотеку Pillow для изменения размера изображений.
Вот наш первоначальный скрипт.
import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
Этот сценарий перебирает пути в папке images
и для каждого пути запускает функцию create_thumbnail. Эта функция использует Pillow для открытия изображения, создания эскиза и сохранения нового изображения меньшего размера с тем же именем, что и у оригинала, но с добавлением _thumbnail
к имени.
Запуск этого скрипта на 160 изображениях общим числом 36 миллионов занимает 2,32 секунды. Давайте посмотрим, сможем ли мы ускорить это с помощью ProcessPoolExecutor.
import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
Метод create_thumbnail
идентичен последнему скрипту. Основное отличие заключается в создании ProcessPoolExecutor
. Метод карты исполнителя используется для параллельного создания эскизов. По умолчанию ProcessPoolExecutor
создает один подпроцесс на каждый ЦП. Запуск этого скрипта на тех же 160 изображениях занял 1,05 секунды — в 2,2 раза быстрее!
Async/Await (только Python 3.5+)
Одним из наиболее часто запрашиваемых элементов в комментариях к исходной статье был пример использования модуля asyncio Python 3. По сравнению с другими примерами здесь есть новый синтаксис Python, который может быть новым для большинства людей, а также некоторые новые концепции. Досадный дополнительный уровень сложности вызван тем, что встроенный в Python модуль urllib
не является асинхронным. Нам нужно будет использовать асинхронную HTTP-библиотеку, чтобы получить все преимущества asyncio. Для этого мы будем использовать aiohttp.
Давайте сразу перейдем к коду, и последует более подробное объяснение.
import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)
Здесь есть что распаковать. Начнем с основной точки входа в программу. Первое, что мы делаем с модулем asyncio, — получаем цикл обработки событий. Цикл событий обрабатывает весь асинхронный код. Затем цикл выполняется до завершения и передачи main
функции. В определении main есть часть нового синтаксиса: async def
. Вы также заметите, что await
и with async
.
Синтаксис async/await был представлен в PEP492. Синтаксис async def
помечает функцию как сопрограмму. Внутри сопрограммы основаны на генераторах Python, но это не совсем одно и то же. Сопрограммы возвращают объект сопрограммы подобно тому, как генераторы возвращают объект генератора. Когда у вас есть сопрограмма, вы получаете ее результаты с помощью выражения await
. Когда сопрограмма вызывает await
, выполнение сопрограммы приостанавливается до тех пор, пока ожидаемое не завершится. Эта приостановка позволяет завершить другую работу, пока сопрограмма приостанавливается, «ожидая» некоторого результата. В общем, этот результат будет своего рода вводом-выводом, например, запросом к базе данных или, в нашем случае, HTTP-запросом.
Функцию download_link
пришлось довольно существенно изменить. Раньше мы полагались на urllib
, чтобы сделать основную часть работы по чтению изображения за нас. Теперь, чтобы наш метод мог правильно работать с парадигмой асинхронного программирования, мы ввели цикл while
, который считывает фрагменты изображения за раз и приостанавливает выполнение, ожидая завершения ввода-вывода. Это позволяет циклу обработки событий зацикливать загрузку различных изображений, поскольку во время загрузки для каждого из них доступны новые данные.
Должен быть один — желательно только один — очевидный способ сделать это
В то время как дзен Python говорит нам, что должен быть один очевидный способ сделать что-то, в Python есть много способов ввести параллелизм в наши программы. Лучший метод для выбора будет зависеть от вашего конкретного случая использования. Асинхронная парадигма лучше масштабируется для рабочих нагрузок с высокой степенью параллелизма (например, веб-сервера) по сравнению с многопоточной или многопроцессорной обработкой, но она требует, чтобы ваш код (и зависимости) был асинхронным, чтобы получить все преимущества.
Надеемся, что примеры многопоточности Python в этой статье — и обновление — укажут вам правильное направление, чтобы у вас было представление о том, где искать в стандартной библиотеке Python, если вам нужно ввести параллелизм в свои программы.