Pythonマルチスレッドおよびマルチプロセッシングチュートリアル
公開: 2022-03-11Pythonを批判する議論では、Pythonコードの複数のスレッドが同時に実行されるのを防ぐグローバルインタープリターロック(愛情を込めてGILと呼ばれる)と呼ばれるものに指を向けて、マルチスレッド作業にPythonを使用することがいかに難しいかについてよく話します。 このため、Pythonマルチスレッドモジュールは、Python開発者ではなく、C ++やJavaなどの他の言語を使用している場合に、期待どおりに動作しません。 特定の事項を考慮に入れる限り、Pythonで同時にまたは並行して実行され、結果のパフォーマンスに大きな違いをもたらすコードを記述できることを明確にする必要があります。 まだ読んでいない場合は、ToptalEngineeringBlogのRubyでの並行性と並列性に関するEqbalQuranの記事を参照することをお勧めします。
このPython同時実行チュートリアルでは、Imgurから人気のある画像をダウンロードするための小さなPythonスクリプトを作成します。 画像を順番に、または一度に1つずつダウンロードするバージョンから始めます。 前提条件として、Imgurにアプリケーションを登録する必要があります。 まだImgurアカウントをお持ちでない場合は、最初にアカウントを作成してください。
これらのスレッド例のスクリプトは、Python3.6.4でテストされています。 いくつかの変更を加えると、Python 2でも実行できるはずです。urllibは、これら2つのバージョンのPython間で最も変更されたものです。
Pythonマルチスレッディング入門
まず、 download.py
という名前のPythonモジュールを作成しましょう。 このファイルには、画像のリストを取得してダウンロードするために必要なすべての機能が含まれています。 これらの機能を3つの別々の機能に分割します。
-
get_links
-
download_link
-
setup_download_dir
3番目の関数setup_download_dir
は、ダウンロード先ディレクトリがまだ存在しない場合は、それを作成するために使用されます。
ImgurのAPIは、HTTPリクエストがクライアントIDを持つAuthorization
ヘッダーを持つことを要求します。 このクライアントIDは、Imgurに登録したアプリケーションのダッシュボードから見つけることができ、応答はJSONエンコードされます。 Pythonの標準JSONライブラリを使用してデコードできます。 画像のダウンロードはさらに簡単な作業です。URLで画像を取得してファイルに書き込むだけです。
スクリプトは次のようになります。
import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir
次に、これらの関数を使用して画像を1つずつダウンロードするモジュールを作成する必要があります。 このsingle.py
に名前を付けます。 これには、Imgurイメージダウンローダーの最初のナイーブバージョンの主な機能が含まれます。 モジュールは、環境変数IMGUR_CLIENT_ID
でImgurクライアントIDを取得します。 setup_download_dir
を呼び出して、ダウンロード先ディレクトリを作成します。 最後に、 get_links
関数を使用して画像のリストを取得し、すべてのGIFとアルバムのURLを除外してから、 download_link
を使用してこれらの各画像をダウンロードしてディスクに保存します。 single.py
は次のようになります。
import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
私のラップトップでは、このスクリプトは91枚の画像をダウンロードするのに19.4秒かかりました。 これらの数値は、使用しているネットワークによって異なる場合があることに注意してください。 19.4秒はそれほど長くはありませんが、さらに写真をダウンロードしたい場合はどうでしょうか。 おそらく90枚ではなく900枚の画像です。1枚の画像あたり平均0.2秒の場合、900枚の画像には約3分かかります。 9000枚の写真の場合、30分かかります。 良いニュースは、並行性または並列性を導入することで、これを劇的にスピードアップできることです。
以降のすべてのコード例では、それらの例に固有の新しいインポートステートメントのみが表示されます。 便宜上、これらのPythonスクリプトはすべてこのGitHubリポジトリにあります。
Pythonでの並行性と並列性:スレッド化の例
スレッド化は、Pythonの並行性と並列性を実現するための最もよく知られたアプローチの1つです。 スレッド化は、通常、オペレーティングシステムによって提供される機能です。 スレッドはプロセスよりも軽量で、同じメモリスペースを共有します。
このPythonスレッドの例では、 single.py
を置き換える新しいモジュールを記述します。 このモジュールは8つのスレッドのプールを作成し、メインスレッドを含めて合計9つのスレッドを作成します。 私のコンピューターには8つのCPUコアがあり、コアごとに1つのワーカースレッドが一度に実行するスレッドの数として適切であると思われるため、8つのワーカースレッドを選択しました。 実際には、この数は、同じマシンで実行されている他のアプリケーションやサービスなど、他の要因に基づいてはるかに慎重に選択されます。
これは、Python Thread
クラスの子孫である新しいクラスDownloadWorker
があることを除いて、前のクラスとほぼ同じです。 runメソッドがオーバーライドされ、無限ループが実行されます。 反復するたびに、 self.queue.get()
を呼び出して、スレッドセーフキューからURLをフェッチしようとします。 ワーカーが処理するアイテムがキューに入るまでブロックします。 ワーカーがキューからアイテムを受け取ると、前のスクリプトで使用したのと同じdownload_link
メソッドを呼び出して、画像をimagesディレクトリにダウンロードします。 ダウンロードが完了すると、ワーカーはそのタスクが完了したことをキューに通知します。 キューはキューに入れられたタスクの数を追跡するため、これは非常に重要です。 ワーカーがタスクを完了したことを通知しなかった場合、 queue.join()
の呼び出しはメインスレッドを永久にブロックします。
import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
以前に使用したのと同じマシンでこのPythonスレッドサンプルスクリプトを実行すると、ダウンロード時間は4.1秒になります。 これは前の例より4.7倍高速です。 これははるかに高速ですが、GILが原因で、このプロセス全体で一度に1つのスレッドしか実行されていなかったことに言及する価値があります。 したがって、このコードは並行していますが、並行ではありません。 それでも高速である理由は、これがIOバウンドタスクであるためです。 これらの画像をダウンロードしている間、プロセッサはほとんど汗をかきません。そして、時間の大部分はネットワークを待つことに費やされます。 これが、Pythonマルチスレッドが大幅な速度向上を提供できる理由です。 プロセッサは、スレッドの1つが何らかの作業を行う準備ができているときはいつでも、スレッドを切り替えることができます。 PythonまたはGILで他のインタープリター言語でスレッドモジュールを使用すると、実際にはパフォーマンスが低下する可能性があります。 コードがgzipファイルの解凍などのCPUバウンドタスクを実行している場合、 threading
モジュールを使用すると実行時間が遅くなります。 CPUバウンドタスクと真の並列実行には、マルチプロセッシングモジュールを使用できます。
事実上の参照Python実装(CPython)にはGILがありますが、これはすべてのPython実装に当てはまるわけではありません。 たとえば、.NET Frameworkを使用するPython実装であるIronPythonにはGILがなく、Javaベースの実装であるJythonもありません。 動作するPython実装のリストはここにあります。
Pythonでの並行性と並列性例2:複数のプロセスを生成する
マルチプロセッシングモジュールは、Pythonスレッドの例のようにクラスを追加する必要がないため、スレッドモジュールよりも簡単にドロップインできます。 行う必要がある唯一の変更は、main関数にあります。
複数のプロセスを使用するには、マルチプロセッシングPool
を作成します。 それが提供するmapメソッドを使用して、URLのリストをプールに渡します。これにより、8つの新しいプロセスが生成され、それぞれを使用して画像が並行してダウンロードされます。 これは真の並列処理ですが、コストがかかります。 スクリプトのメモリ全体が、生成される各サブプロセスにコピーされます。 この単純な例では、それは大したことではありませんが、重要なプログラムにとっては簡単に深刻なオーバーヘッドになる可能性があります。
import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
Pythonでの並行性と並列性例3:複数のワーカーへの配布
スレッド化およびマルチプロセッシングモジュールは、パーソナルコンピューターで実行されているスクリプトに最適ですが、別のマシンで作業を実行する場合、または1台のマシンのCPUよりも多くにスケールアップする必要がある場合はどうすればよいですか。取り持つ? これの優れたユースケースは、Webアプリケーションの長時間実行されるバックエンドタスクです。 実行時間の長いタスクがある場合は、残りのアプリケーションコードを実行する必要がある、同じマシン上の一連のサブプロセスまたはスレッドを起動したくないでしょう。 これにより、すべてのユーザーのアプリケーションのパフォーマンスが低下します。 これらのジョブを別のマシンまたは他の多くのマシンで実行できるのは素晴らしいことです。
このタスクに最適なPythonライブラリは、非常にシンプルでありながら強力なライブラリであるRQです。 まず、ライブラリを使用して関数とその引数をキューに入れます。 これにより、関数呼び出し表現が選択され、Redisリストに追加されます。 ジョブのエンキューは最初のステップですが、まだ何もしません。 また、そのジョブキューをリッスンするために少なくとも1人のワーカーが必要です。
最初のステップは、コンピューターにRedisサーバーをインストールして実行するか、実行中のRedisサーバーにアクセスできるようにすることです。 その後、既存のコードに加えられた小さな変更はほんのわずかです。 まず、RQキューのインスタンスを作成し、redis-pyライブラリからRedisサーバーのインスタンスを渡します。 次に、 download_link
メソッドを呼び出す代わりに、 q.enqueue(download_link, download_dir, link)
を呼び出します。 enqueueメソッドは最初の引数として関数を取り、ジョブが実際に実行されるときに、他の引数またはキーワード引数がその関数に渡されます。
私たちがしなければならない最後のステップは、何人かの労働者を立ち上げることです。 RQは、デフォルトのキューでワーカーを実行するための便利なスクリプトを提供します。 ターミナルウィンドウでrqworker
を実行するだけで、デフォルトのキューでリッスンしているワーカーが起動します。 現在の作業ディレクトリがスクリプトが存在する場所と同じであることを確認してください。別のキューをリッスンする場合は、 rqworker queue_name
を実行すると、その名前のキューがリッスンされます。 RQの優れている点は、Redisに接続できる限り、好きなだけ多くの異なるマシンで好きなだけ多くのワーカーを実行できることです。 したがって、アプリケーションの成長に合わせてスケールアップするのは非常に簡単です。 RQバージョンのソースは次のとおりです。

import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()
ただし、PythonジョブキューソリューションはRQだけではありません。 RQは使いやすく、単純なユースケースを非常にうまくカバーしますが、より高度なオプションが必要な場合は、他のPython 3キューソリューション(Celeryなど)を使用できます。
Pythonマルチスレッドとマルチプロセッシング
コードがIOバウンドの場合、Pythonのマルチプロセッシングとマルチスレッドの両方が機能します。 マルチプロセッシングは、スレッド化よりも簡単にドロップインできますが、メモリのオーバーヘッドが高くなります。 コードがCPUにバインドされている場合、特にターゲットマシンに複数のコアまたはCPUがある場合は、マルチプロセッシングの方が適している可能性があります。 Webアプリケーションの場合、および複数のマシン間で作業をスケーリングする必要がある場合は、RQの方が適しています。
アップデート
concurrent.futures
元の記事で触れられていなかったPython3.2以降の新しいものは、 concurrent.futures
パッケージです。 このパッケージは、Pythonで並行性と並列性を使用するさらに別の方法を提供します。
元の記事で、Pythonのマルチプロセッシングモジュールは、スレッドモジュールよりも既存のコードにドロップする方が簡単であると述べました。 これは、Python 3スレッドモジュールでThread
クラスをサブクラス化し、スレッドが作業を監視するためのQueue
を作成する必要があるためです。
コンカレント.futures.ThreadPoolExecutorを使用すると、Pythonスレッドのサンプルコードがマルチプロセッシングモジュールとほぼ同じになります。
import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()
これらすべてのイメージがThreadPoolExecutor
でダウンロードされたので、それらを使用してCPUバウンドタスクをテストできます。 シングルスレッド、シングルプロセススクリプトの両方ですべての画像のサムネイルバージョンを作成してから、マルチプロセッシングベースのソリューションをテストできます。
Pillowライブラリを使用して、画像のサイズ変更を処理します。
これが最初のスクリプトです。
import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
このスクリプトは、 images
フォルダー内のパスを繰り返し処理し、パスごとにcreate_thumbnail関数を実行します。 この関数は、Pillowを使用して画像を開き、サムネイルを作成し、元の画像と同じ名前で、名前に_thumbnail
が追加された新しい小さい画像を保存します。
このスクリプトを160枚の画像で実行すると、合計3,600万枚に2.32秒かかります。 ProcessPoolExecutorを使用してこれを高速化できるかどうかを見てみましょう。
import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
create_thumbnail
メソッドは、最後のスクリプトと同じです。 主な違いは、 ProcessPoolExecutor
の作成です。 エグゼキュータのmapメソッドを使用して、サムネイルを並行して作成します。 デフォルトでは、 ProcessPoolExecutor
はCPUごとに1つのサブプロセスを作成します。 同じ160枚の画像でこのスクリプトを実行すると、1.05秒かかりました。これは2.2倍の速さです。
Async / Await(Python 3.5以降のみ)
元の記事のコメントで最も要求された項目の1つは、Python3のasyncioモジュールを使用した例です。 他の例と比較すると、ほとんどの人にとって新しい可能性のある新しいPython構文と、いくつかの新しい概念があります。 残念ながら、Pythonの組み込みurllib
モジュールが非同期ではないために、さらに複雑なレイヤーが発生します。 asyncioの利点を最大限に活用するには、非同期HTTPライブラリを使用する必要があります。 このために、aiohttpを使用します。
コードに飛び込んでみましょう。さらに詳細な説明が続きます。
import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)
ここで開梱するものはかなりあります。 プログラムのメインエントリポイントから始めましょう。 asyncioモジュールで最初に行う新しいことは、イベントループを取得することです。 イベントループは、すべての非同期コードを処理します。 次に、ループが完了するまで実行され、 main
関数が渡されます。 mainの定義に新しい構文があります: async def
。 また、 await
とwith async
を使用していることに気付くでしょう。
async/await構文はPEP492で導入されました。 async def
構文は、関数をコルーチンとしてマークします。 内部的には、コルーチンはPythonジェネレーターに基づいていますが、まったく同じではありません。 コルーチンは、ジェネレーターがジェネレーターオブジェクトを返すのと同じようにコルーチンオブジェクトを返します。 コルーチンを取得したら、 await
式を使用してその結果を取得します。 コルーチンがawait
を呼び出すと、待機が完了するまでコルーチンの実行が一時停止されます。 この一時停止により、コルーチンが一時停止されている間に他の作業を完了して、何らかの結果を「待つ」ことができます。 一般に、この結果は、データベース要求やこの場合はHTTP要求のようなある種のI/Oになります。
download_link
関数はかなり大幅に変更する必要がありました。 以前は、画像を読み取る作業の矢面に立つためにurllib
に依存していました。 ここで、メソッドが非同期プログラミングパラダイムで適切に機能できるようにするために、一度に画像のチャンクを読み取り、I/Oが完了するのを待っている間実行を一時停止するwhile
ループを導入しました。 これにより、ダウンロード中に新しいデータが利用できるようになるため、イベントループでさまざまな画像のダウンロードをループできます。
それを行うための明白な方法は1つ、できれば1つだけである必要があります
Pythonの禅は、何かを行うための明白な方法が1つあるべきだと言っていますが、Pythonには、プログラムに並行性を導入する方法がたくさんあります。 選択する最良の方法は、特定のユースケースによって異なります。 非同期パラダイムは、スレッド化やマルチプロセッシングと比較して、同時実行性の高いワークロード(Webサーバーなど)に合わせて拡張できますが、十分なメリットを得るには、コード(および依存関係)を非同期にする必要があります。
うまくいけば、この記事のPythonスレッドの例(および更新)が正しい方向を示し、プログラムに並行性を導入する必要がある場合にPython標準ライブラリのどこを見ればよいかがわかります。