Python 多線程和多處理教程
已發表: 2022-03-11批評 Python 的討論經常談論將 Python 用於多線程工作是多麼困難,指責所謂的全局解釋器鎖(被親切地稱為 GIL)會阻止 Python 代碼的多個線程同時運行。 因此,如果您不是 Python 開發人員並且您來自其他語言(例如 C++ 或 Java),那麼 Python 多線程模塊的行為方式並不完全符合您的預期。 必須明確一點,只要考慮到某些因素,仍然可以用 Python 編寫並發或併行運行的代碼,並在結果性能上產生明顯的差異。 如果您還沒有閱讀它,我建議您在 Toptal 工程博客上查看 Eqbal Quran 關於 Ruby 中的並發和並行性的文章。
在這個 Python 並發教程中,我們將編寫一個 Python 小腳本來從 Imgur 下載熱門圖片。 我們將從順序下載圖像的版本開始,或者一次下載一個。 作為先決條件,您必須在 Imgur 上註冊應用程序。 如果您還沒有 Imgur 帳戶,請先創建一個。
這些線程示例中的腳本已經使用 Python 3.6.4 進行了測試。 通過一些更改,它們也應該與 Python 2 一起運行——urllib 是這兩個 Python 版本之間變化最大的部分。
Python 多線程入門
讓我們首先創建一個名為download.py
的 Python 模塊。 該文件將包含獲取圖像列表並下載它們所需的所有功能。 我們將這些功能分成三個獨立的功能:
-
get_links
-
download_link
-
setup_download_dir
第三個函數setup_download_dir
將用於創建下載目標目錄(如果該目錄尚不存在)。
Imgur 的 API 要求 HTTP 請求帶有帶有客戶端 ID 的Authorization
標頭。 您可以從您在 Imgur 上註冊的應用程序的儀表板中找到此客戶端 ID,並且響應將採用 JSON 編碼。 我們可以使用 Python 的標準 JSON 庫對其進行解碼。 下載圖像是一項更簡單的任務,因為您所要做的就是通過其 URL 獲取圖像並將其寫入文件。
這是腳本的樣子:
import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir
接下來,我們將需要編寫一個模塊,該模塊將使用這些函數逐個下載圖像。 我們將命名為single.py
。 這將包含我們第一個簡單版本的 Imgur 圖像下載器的主要功能。 該模塊將在環境變量IMGUR_CLIENT_ID
中檢索 Imgur 客戶端 ID。 它將調用setup_download_dir
來創建下載目標目錄。 最後,它將使用get_links
函數獲取圖像列表,過濾掉所有 GIF 和專輯 URL,然後使用download_link
下載每個圖像並將其保存到磁盤。 這是single.py
的樣子:
import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
在我的筆記本電腦上,這個腳本用了 19.4 秒來下載 91 張圖片。 請注意,這些數字可能會因您所在的網絡而異。 19.4 秒不算長,但如果我們想下載更多圖片怎麼辦? 可能是 900 張圖片,而不是 90 張。每張圖片平均需要 0.2 秒,900 張圖片大約需要 3 分鐘。 對於 9000 張圖片,需要 30 分鐘。 好消息是,通過引入並發性或併行性,我們可以顯著加快速度。
所有後續代碼示例將僅顯示新的和特定於這些示例的導入語句。 為方便起見,所有這些 Python 腳本都可以在這個 GitHub 存儲庫中找到。
Python 中的並發性和並行性:線程示例
線程是獲得 Python 並發性和並行性的最著名的方法之一。 線程是操作系統通常提供的功能。 線程比進程輕,並且共享相同的內存空間。
在這個 Python 線程示例中,我們將編寫一個新模塊來替換single.py
。 該模塊將創建一個由 8 個線程組成的池,包括主線程在內總共有 9 個線程。 我選擇了 8 個工作線程,因為我的計算機有 8 個 CPU 內核,每個內核有一個工作線程,這對於同時運行多少個線程來說似乎是一個不錯的數字。 在實踐中,根據其他因素(例如在同一台機器上運行的其他應用程序和服務)來更仔細地選擇此數字。
這與前一個幾乎相同,只是我們現在有一個新類DownloadWorker
,它是 Python Thread
類的後代。 run 方法已被覆蓋,它運行一個無限循環。 在每次迭代中,它都會調用self.queue.get()
以嘗試從線程安全隊列中獲取 URL。 它一直阻塞,直到隊列中有一個項目供工作人員處理。 一旦工作人員從隊列中接收到一個項目,它就會調用之前腳本中使用的相同的download_link
方法將圖像下載到圖像目錄。 下載完成後,worker 向隊列發出該任務已完成的信號。 這非常重要,因為隊列會跟踪入隊的任務數量。 如果工作人員沒有發出完成任務的信號,對queue.join()
的調用將永遠阻塞主線程。
import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
在之前使用的同一台機器上運行這個 Python 線程示例腳本會導致下載時間為 4.1 秒! 這比上一個示例快 4.7 倍。 雖然這要快得多,但值得一提的是,由於 GIL,在整個過程中一次只執行一個線程。 因此,這段代碼是並發的,但不是並行的。 它仍然更快的原因是因為這是一個 IO 綁定任務。 處理器在下載這些圖像時幾乎不費吹灰之力,大部分時間都花在等待網絡上。 這就是為什麼 Python 多線程可以提供很大的速度提升。 只要其中一個準備好做一些工作,處理器就可以在線程之間切換。 在 Python 或任何其他帶有 GIL 的解釋性語言中使用線程模塊實際上會導致性能下降。 如果您的代碼正在執行 CPU 密集型任務,例如解壓縮 gzip 文件,則使用threading
模塊將導致執行時間變慢。 對於 CPU 密集型任務和真正的並行執行,我們可以使用多處理模塊。
雖然事實上的參考 Python 實現 - CPython - 具有 GIL,但並非所有 Python 實現都是如此。 例如,使用 .NET 框架的 Python 實現 IronPython 沒有 GIL,基於 Java 的實現 Jython 也沒有。 您可以在此處找到可用的 Python 實現列表。
Python 示例 2 中的並發性和並行性:生成多個進程
多處理模塊比線程模塊更容易加入,因為我們不需要像 Python 線程示例那樣添加類。 我們需要做的唯一改變是在 main 函數中。
為了使用多個進程,我們創建了一個 multiprocessing Pool
。 使用它提供的 map 方法,我們會將 URL 列表傳遞給池,池中又會產生八個新進程,並使用每個進程並行下載圖像。 這是真正的並行性,但它是有代價的。 腳本的整個內存被複製到生成的每個子進程中。 在這個簡單的示例中,這沒什麼大不了的,但對於非平凡的程序來說,它很容易成為嚴重的開銷。
import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
Python 示例 3 中的並發性和並行性:分發給多個 Worker
雖然線程和多處理模塊非常適合在您的個人計算機上運行的腳本,但是如果您希望在不同的機器上完成工作,或者您需要擴展到超過一台機器上的 CPU 可以做的事情,您應該怎麼做處理? 一個很好的用例是 Web 應用程序的長時間運行的後端任務。 如果您有一些長時間運行的任務,您不希望在需要運行其餘應用程序代碼的同一台機器上啟動一堆子進程或線程。 這將降低所有用戶的應用程序性能。 能夠在另一台機器或許多其他機器上運行這些作業會很棒。
這個任務的一個很棒的 Python 庫是 RQ,一個非常簡單但功能強大的庫。 您首先使用庫將函數及其參數排入隊列。 這會醃製函數調用表示,然後將其附加到 Redis 列表中。 將作業入隊是第一步,但還不會做任何事情。 我們還需要至少一名工作人員來監聽該作業隊列。
第一步是在您的計算機上安裝和運行 Redis 服務器,或者訪問正在運行的 Redis 服務器。 之後,只對現有代碼進行了一些小的更改。 我們首先創建一個 RQ Queue 的實例,並從 redis-py 庫中傳遞一個 Redis 服務器的實例。 然後,我們不只是調用我們的download_link
方法,而是調用q.enqueue(download_link, download_dir, link)
。 enqueue 方法將函數作為其第一個參數,然後在實際執行作業時將任何其他參數或關鍵字參數傳遞給該函數。
我們需要做的最後一步是啟動一些工人。 RQ 提供了一個方便的腳本來在默認隊列上運行工作人員。 只需在終端窗口中運行rqworker
,它就會啟動一個監聽默認隊列的工作程序。 請確保您當前的工作目錄與腳本所在的目錄相同。如果您想監聽不同的隊列,您可以運行rqworker queue_name
,它將監聽該命名隊列。 RQ 的偉大之處在於,只要你能連接到 Redis,你就可以在任意多的不同機器上運行任意多的 worker; 因此,隨著應用程序的增長,它很容易擴展。 這是 RQ 版本的來源:
import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()
但是,RQ 並不是唯一的 Python 作業隊列解決方案。 RQ 易於使用並且非常好地涵蓋了簡單的用例,但是如果需要更高級的選項,可以使用其他 Python 3 隊列解決方案(例如 Celery)。

Python 多線程與多處理
如果你的代碼是 IO 綁定的,那麼 Python 中的多處理和多線程都可以為你工作。 多處理比線程更容易加入,但內存開銷更高。 如果您的代碼受 CPU 限制,那麼多處理很可能是更好的選擇——尤其是在目標機器具有多個內核或 CPU 的情況下。 對於 Web 應用程序,當您需要跨多台機器擴展工作時,RQ 會更適合您。
更新
Python concurrent.futures
自 Python 3.2 以來未在原始文章中涉及的新內容是concurrent.futures
包。 這個包提供了另一種在 Python 中使用並發和並行性的方法。
在原始文章中,我提到 Python 的多處理模塊比線程模塊更容易放入現有代碼中。 這是因為 Python 3 線程模塊需要對Thread
類進行子類化,並且還需要為線程創建一個Queue
來監控工作。
使用 concurrent.futures.ThreadPoolExecutor 使 Python 線程示例代碼幾乎與多處理模塊相同。
import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()
現在我們已經使用 Python ThreadPoolExecutor
下載了所有這些圖像,我們可以使用它們來測試 CPU 密集型任務。 我們可以在單線程、單進程腳本中創建所有圖像的縮略圖版本,然後測試基於多處理的解決方案。
我們將使用 Pillow 庫來處理圖像的大小調整。
這是我們的初始腳本。
import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
此腳本遍歷images
文件夾中的路徑,並為每個路徑運行 create_thumbnail 函數。 此函數使用 Pillow 打開圖像,創建縮略圖,並保存新的、較小的圖像,其名稱與原始圖像相同,但名稱後附加了_thumbnail
。
在總共 3600 萬張圖像上運行此腳本需要 2.32 秒。 讓我們看看我們是否可以使用 ProcessPoolExecutor 加快速度。
import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
create_thumbnail
方法與上一個腳本相同。 主要區別在於ProcessPoolExecutor
的創建。 執行器的 map 方法用於並行創建縮略圖。 默認情況下, ProcessPoolExecutor
為每個 CPU 創建一個子進程。 在相同的 160 張圖像上運行此腳本需要 1.05 秒——快 2.2 倍!
異步/等待(僅限 Python 3.5+)
在原始文章的評論中,請求最多的項目之一是使用 Python 3 的 asyncio 模塊的示例。 與其他示例相比,有一些新的 Python 語法對大多數人來說可能是新的,也有一些新概念。 不幸的是,Python 的內置urllib
模塊不是異步的,導致了額外的複雜性。 我們將需要使用異步 HTTP 庫來獲得 asyncio 的全部好處。 為此,我們將使用 aiohttp。
讓我們直接進入代碼,接下來會有更詳細的解釋。
import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)
這裡有很多東西要解壓。 讓我們從程序的主入口點開始。 我們對 asyncio 模塊做的第一件事是獲取事件循環。 事件循環處理所有異步代碼。 然後,循環運行直到完成並傳遞main
函數。 main 的定義中有一段新語法: async def
。 您還會注意到await
和with async
。
在 PEP492 中引入了 async/await 語法。 async def
語法將函數標記為協程。 在內部,協程基於 Python 生成器,但並不完全相同。 協程返回一個協程對象,類似於生成器返回一個生成器對象。 一旦你有了一個協程,你就可以通過await
表達式獲得它的結果。 當協程調用await
時,協程的執行將暫停,直到可等待完成。 這種暫停允許在協程暫停“等待”某些結果時完成其他工作。 通常,此結果將是某種 I/O,例如數據庫請求,或者在我們的例子中是 HTTP 請求。
download_link
函數必須進行相當大的更改。 以前,我們主要依靠urllib
來為我們完成讀取圖像的工作。 現在,為了讓我們的方法能夠在異步編程範式下正常工作,我們引入了一個while
循環,它一次讀取圖像塊並在等待 I/O 完成時暫停執行。 這允許事件循環循環下載不同的圖像,因為每個圖像在下載期間都有可用的新數據。
應該有一個——最好只有一個——明顯的方法
雖然 Python 的禪宗告訴我們應該有一種明顯的方法來做某事,但 Python 中有許多方法可以將並發性引入我們的程序。 選擇的最佳方法將取決於您的特定用例。 與線程或多處理相比,異步範式更適合高並發工作負載(如網絡服務器),但它需要您的代碼(和依賴項)異步才能充分受益。
希望本文中的 Python 線程示例(以及更新)將為您指明正確的方向,以便您了解在需要在程序中引入並發性時在哪裡查看 Python 標準庫。