Python 多线程和多处理教程

已发表: 2022-03-11
注意:应大众要求,我演示了一些替代技术——包括 async/await,仅在 Python 3.5 出现后才可用——我在文章末尾添加了一些更新。 享受!

批评 Python 的讨论经常谈论将 Python 用于多线程工作是多么困难,指责所谓的全局解释器锁(被亲切地称为 GIL)会阻止 Python 代码的多个线程同时运行。 因此,如果您不是 Python 开发人员并且您来自其他语言(例如 C++ 或 Java),那么 Python 多线程模块的行为方式并不完全符合您的预期。 必须明确的是,只要考虑到某些因素,仍然可以用 Python 编写并发或并行运行的代码,并在结果性能上产生明显的差异。 如果您还没有阅读它,我建议您在 Toptal 工程博客上查看 Eqbal Quran 关于 Ruby 中的并发和并行性的文章。

在这个 Python 并发教程中,我们将编写一个 Python 小脚本来从 Imgur 下载热门图片。 我们将从顺序下载图像的版本开始,或者一次下载一个。 作为先决条件,您必须在 Imgur 上注册应用程序。 如果您还没有 Imgur 帐户,请先创建一个。

这些线程示例中的脚本已经使用 Python 3.6.4 进行了测试。 通过一些更改,它们也应该与 Python 2 一起运行——urllib 是这两个 Python 版本之间变化最大的部分。

Python 多线程入门

让我们首先创建一个名为download.py的 Python 模块。 该文件将包含获取图像列表并下载它们所需的所有功能。 我们将这些功能分成三个独立的功能:

  • get_links
  • download_link
  • setup_download_dir

第三个函数setup_download_dir将用于创建下载目标目录(如果该目录尚不存在)。

Imgur 的 API 要求 HTTP 请求带有带有客户端 ID 的Authorization标头。 您可以从您在 Imgur 上注册的应用程序的仪表板中找到此客户端 ID,并且响应将采用 JSON 编码。 我们可以使用 Python 的标准 JSON 库对其进行解码。 下载图像是一项更简单的任务,因为您所要做的就是通过其 URL 获取图像并将其写入文件。

这是脚本的样子:

 import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir

接下来,我们将需要编写一个模块,该模块将使用这些函数逐个下载图像。 我们将命名为single.py 。 这将包含我们第一个简单版本的 Imgur 图像下载器的主要功能。 该模块将在环境变量IMGUR_CLIENT_ID中检索 Imgur 客户端 ID。 它将调用setup_download_dir来创建下载目标目录。 最后,它将使用get_links函数获取图像列表,过滤掉所有 GIF 和专辑 URL,然后使用download_link下载每个图像并将其保存到磁盘。 这是single.py的样子:

 import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

在我的笔记本电脑上,这个脚本用了 19.4 秒来下载 91 张图片。 请注意,这些数字可能会因您所在的网络而异。 19.4 秒不算长,但如果我们想下载更多图片怎么办? 可能是 900 张图片,而不是 90 张。每张图片平均需要 0.2 秒,900 张图片大约需要 3 分钟。 对于 9000 张图片,需要 30 分钟。 好消息是,通过引入并发性或并行性,我们可以显着加快速度。

所有后续代码示例将仅显示新的和特定于这些示例的导入语句。 为方便起见,所有这些 Python 脚本都可以在这个 GitHub 存储库中找到。

Python 中的并发性和并行性:线程示例

线程是获得 Python 并发性和并行性的最著名的方法之一。 线程是操作系统通常提供的功能。 线程比进程轻,并且共享相同的内存空间。

Python多线程内存模型

在这个 Python 线程示例中,我们将编写一个新模块来替换single.py 。 该模块将创建一个由 8 个线程组成的池,包括主线程在内总共有 9 个线程。 我选择了 8 个工作线程,因为我的计算机有 8 个 CPU 内核,每个内核有一个工作线程,这对于同时运行多少个线程来说似乎是一个不错的数字。 在实践中,根据其他因素(例如在同一台机器上运行的其他应用程序和服务)来更仔细地选择此数字。

这与前一个几乎相同,只是我们现在有一个新类DownloadWorker ,它是 Python Thread类的后代。 run 方法已被覆盖,它运行一个无限循环。 在每次迭代中,它都会调用self.queue.get()以尝试从线程安全队列中获取 URL。 它一直阻塞,直到队列中有一个项目供工作人员处理。 一旦工作人员从队列中接收到一个项目,它就会调用之前脚本中使用的相同的download_link方法将图像下载到图像目录。 下载完成后,worker 向队列发出该任务已完成的信号。 这非常重要,因为队列会跟踪入队的任务数量。 如果工作人员没有发出完成任务的信号,对queue.join()的调用将永远阻塞主线程。

 import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

在之前使用的同一台机器上运行这个 Python 线程示例脚本会导致下载时间为 4.1 秒! 这比上一个示例快 4.7 倍。 虽然这要快得多,但值得一提的是,由于 GIL,在整个过程中一次只执行一个线程。 因此,这段代码是并发的,但不是并行的。 它仍然更快的原因是因为这是一个 IO 绑定任务。 处理器在下载这些图像时几乎不费吹灰之力,大部分时间都花在等待网络上。 这就是为什么 Python 多线程可以提供很大的速度提升。 只要其中一个准备好做一些工作,处理器就可以在线程之间切换。 在 Python 或任何其他带有 GIL 的解释性语言中使用线程模块实际上会导致性能下降。 如果您的代码正在执行 CPU 密集型任务,例如解压缩 gzip 文件,则使用threading模块将导致执行时间变慢。 对于 CPU 密集型任务和真正的并行执行,我们可以使用多处理模块。

虽然事实上的参考 Python 实现 - CPython - 具有 GIL,但并非所有 Python 实现都是如此。 例如,使用 .NET 框架的 Python 实现 IronPython 没有 GIL,基于 Java 的实现 Jython 也没有。 您可以在此处找到可用的 Python 实现列表。

相关: Toptal 开发人员的 Python 最佳实践和技巧

Python 示例 2 中的并发性和并行性:生成多个进程

多处理模块比线程模块更容易加入,因为我们不需要像 Python 线程示例那样添加类。 我们需要做的唯一改变是在 main 函数中。

Python 多处理教程:模块

为了使用多个进程,我们创建了一个 multiprocessing Pool 。 使用它提供的 map 方法,我们会将 URL 列表传递给池,池中又会产生八个新进程,并使用每个进程并行下载图像。 这是真正的并行性,但它是有代价的。 脚本的整个内存被复制到生成的每个子进程中。 在这个简单的示例中,这没什么大不了的,但对于非平凡的程序来说,它很容易成为严重的开销。

 import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

Python 示例 3 中的并发性和并行性:分发给多个 Worker

虽然线程和多处理模块非常适合在您的个人计算机上运行的脚本,但是如果您希望在不同的机器上完成工作,或者您需要扩展到超过一台机器上的 CPU 可以做的事情,您应该怎么做处理? 一个很好的用例是 Web 应用程序的长时间运行的后端任务。 如果您有一些长时间运行的任务,您不希望在需要运行其余应用程序代码的同一台机器上启动一堆子进程或线程。 这将降低所有用户的应用程序性能。 能够在另一台机器或许多其他机器上运行这些作业会很棒。

这个任务的一个很棒的 Python 库是 RQ,一个非常简单但功能强大的库。 您首先使用库将函数及其参数排入队列。 这会腌制函数调用表示,然后将其附加到 Redis 列表中。 将作业入队是第一步,但还不会做任何事情。 我们还需要至少一名工作人员来监听该作业队列。

RQ Python 队列库的模型

第一步是在您的计算机上安装和运行 Redis 服务器,或者访问正在运行的 Redis 服务器。 之后,只对现有代码进行了一些小的更改。 我们首先创建一个 RQ Queue 的实例,并从 redis-py 库中传递一个 Redis 服务器的实例。 然后,我们不只是调用我们的download_link方法,而是调用q.enqueue(download_link, download_dir, link) 。 enqueue 方法将函数作为其第一个参数,然后在实际执行作业时将任何其他参数或关键字参数传递给该函数。

我们需要做的最后一步是启动一些工人。 RQ 提供了一个方便的脚本来在默认队列上运行工作人员。 只需在终端窗口中运行rqworker ,它就会启动一个监听默认队列的工作程序。 请确保您当前的工作目录与脚本所在的目录相同。如果您想监听不同的队列,您可以运行rqworker queue_name ,它将监听该命名队列。 RQ 的伟大之处在于,只要你能连接到 Redis,你就可以在任意多的不同机器上运行任意多的 worker; 因此,随着应用程序的增长,它很容易扩展。 这是 RQ 版本的来源:

 import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()

但是,RQ 并不是唯一的 Python 作业队列解决方案。 RQ 易于使用并且非常好地涵盖了简单的用例,但是如果需要更高级的选项,可以使用其他 Python 3 队列解决方案(例如 Celery)。

Python 多线程与多处理

如果你的代码是 IO 绑定的,那么 Python 中的多处理和多线程都可以为你工作。 多处理比线程更容易加入,但内存开销更高。 如果您的代码受 CPU 限制,那么多处理很可能是更好的选择——尤其是在目标机器具有多个内核或 CPU 的情况下。 对于 Web 应用程序,当您需要跨多台机器扩展工作时,RQ 会更适合您。

相关:变得更高级:避免 Python 程序员犯的 10 个最常见错误

更新

Python concurrent.futures

自 Python 3.2 以来未在原始文章中涉及的新内容是concurrent.futures包。 这个包提供了另一种在 Python 中使用并发和并行性的方法。

在原始文章中,我提到 Python 的多处理模块比线程模块更容易放入现有代码中。 这是因为 Python 3 线程模块需要对Thread类进行子类化,并且还需要为线程创建一个Queue来监控工作。

使用 concurrent.futures.ThreadPoolExecutor 使 Python 线程示例代码几乎与多处理模块相同。

 import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()

现在我们已经使用 Python ThreadPoolExecutor下载了所有这些图像,我们可以使用它们来测试 CPU 密集型任务。 我们可以在单线程、单进程脚本中创建所有图像的缩略图版本,然后测试基于多处理的解决方案。

我们将使用 Pillow 库来处理图像的大小调整。

这是我们的初始脚本。

 import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

此脚本遍历images文件夹中的路径,并为每个路径运行 create_thumbnail 函数。 此函数使用 Pillow 打开图像,创建缩略图,并保存新的、较小的图像,其名称与原始图像相同,但名称后附加了_thumbnail

在总共 3600 万张图像上运行此脚本需要 2.32 秒。 让我们看看我们是否可以使用 ProcessPoolExecutor 加快速度。

 import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

create_thumbnail方法与上一个脚本相同。 主要区别在于ProcessPoolExecutor的创建。 执行器的 map 方法用于并行创建缩略图。 默认情况下, ProcessPoolExecutor为每个 CPU 创建一个子进程。 在相同的 160 张图像上运行此脚本需要 1.05 秒——快 2.2 倍!

异步/等待(仅限 Python 3.5+)

在原始文章的评论中,请求最多的项目之一是使用 Python 3 的 asyncio 模块的示例。 与其他示例相比,有一些新的 Python 语法对大多数人来说可能是新的,也有一些新概念。 不幸的是,Python 的内置urllib模块不是异步的,导致了额外的复杂性。 我们将需要使用异步 HTTP 库来获得 asyncio 的全部好处。 为此,我们将使用 aiohttp。

让我们直接进入代码,接下来会有更详细的解释。

 import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)

这里有很多东西要解压。 让我们从程序的主入口点开始。 我们对 asyncio 模块做的第一件事是获取事件循环。 事件循环处理所有异步代码。 然后,循环运行直到完成并传递main函数。 main 的定义中有一段新语法: async def 。 您还会注意到awaitwith async

在 PEP492 中引入了 async/await 语法。 async def语法将函数标记为协程。 在内部,协程基于 Python 生成器,但并不完全相同。 协程返回一个协程对象,类似于生成器返回一个生成器对象。 一旦你有了一个协程,你就可以通过await表达式获得它的结果。 当协程调用await时,协程的执行将暂停,直到可等待完成。 这种暂停允许在协程暂停“等待”某些结果时完成其他工作。 通常,此结果将是某种 I/O,例如数据库请求,或者在我们的例子中是 HTTP 请求。

download_link函数必须进行相当大的更改。 以前,我们主要依靠urllib来为我们完成读取图像的工作。 现在,为了让我们的方法能够在异步编程范式下正常工作,我们引入了一个while循环,该循环一次读取图像块并在等待 I/O 完成时暂停执行。 这允许事件循环循环下载不同的图像,因为每个图像在下载期间都有可用的新数据。

应该有一个——最好只有一个——明显的方法

虽然 Python 的禅宗告诉我们应该有一种明显的方法来做某事,但 Python 中有许多方法可以将并发性引入我们的程序。 选择的最佳方法将取决于您的特定用例。 与线程或多处理相比,异步范式更适合高并发工作负载(如网络服务器),但它要求您的代码(和依赖项)是异步的才能充分受益。

希望本文中的 Python 线程示例(以及更新)将为您指明正确的方向,以便您了解在需要在程序中引入并发性时在哪里查看 Python 标准库。