Tutorial zu Python-Multithreading und Multiprocessing

Veröffentlicht: 2022-03-11
Hinweis: Auf vielfachen Wunsch, dass ich einige alternative Techniken demonstriere – einschließlich async/await, die erst seit der Einführung von Python 3.5 verfügbar sind – habe ich am Ende des Artikels einige Aktualisierungen hinzugefügt. Genießen!

Diskussionen, in denen Python kritisiert wird, sprechen oft darüber, wie schwierig es ist, Python für Multithread-Arbeiten zu verwenden, und zeigen mit dem Finger auf die sogenannte globale Interpreter-Sperre (liebevoll als GIL bezeichnet), die verhindert, dass mehrere Python-Code-Threads gleichzeitig ausgeführt werden. Aus diesem Grund verhält sich das Python-Multithreading-Modul nicht ganz so, wie Sie es erwarten würden, wenn Sie kein Python-Entwickler sind und aus anderen Sprachen wie C++ oder Java kommen. Es muss klargestellt werden, dass man immer noch Code in Python schreiben kann, der gleichzeitig oder parallel läuft und einen großen Unterschied in der resultierenden Leistung macht, solange bestimmte Dinge berücksichtigt werden. Wenn Sie es noch nicht gelesen haben, schlage ich vor, dass Sie sich den Artikel von Eqbal Quran über Nebenläufigkeit und Parallelität in Ruby hier im Toptal Engineering Blog ansehen.

In diesem Python-Parallelitäts-Tutorial schreiben wir ein kleines Python-Skript, um die beliebtesten Bilder von Imgur herunterzuladen. Wir beginnen mit einer Version, die Bilder nacheinander oder einzeln herunterlädt. Als Voraussetzung müssen Sie eine Anwendung auf Imgur registrieren. Wenn Sie noch kein Imgur-Konto haben, erstellen Sie bitte zuerst eines.

Die Skripts in diesen Threading-Beispielen wurden mit Python 3.6.4 getestet. Mit einigen Änderungen sollten sie auch mit Python 2 laufen – urllib hat sich zwischen diesen beiden Python-Versionen am meisten geändert.

Erste Schritte mit Python-Multithreading

Beginnen wir mit der Erstellung eines Python-Moduls mit dem Namen download.py . Diese Datei enthält alle Funktionen, die erforderlich sind, um die Liste der Bilder abzurufen und herunterzuladen. Wir werden diese Funktionalitäten in drei separate Funktionen aufteilen:

  • get_links
  • download_link
  • setup_download_dir

Die dritte Funktion, setup_download_dir , wird verwendet, um ein Download-Zielverzeichnis zu erstellen, falls es noch nicht existiert.

Die API von Imgur erfordert, dass HTTP-Anforderungen den Authorization -Header mit der Client-ID enthalten. Sie finden diese Client-ID im Dashboard der Anwendung, die Sie bei Imgur registriert haben, und die Antwort ist JSON-codiert. Wir können die standardmäßige JSON-Bibliothek von Python verwenden, um sie zu decodieren. Das Herunterladen des Bildes ist eine noch einfachere Aufgabe, da Sie das Bild nur über seine URL abrufen und in eine Datei schreiben müssen.

So sieht das Skript aus:

 import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir

Als nächstes müssen wir ein Modul schreiben, das diese Funktionen verwendet, um die Bilder einzeln herunterzuladen. Wir nennen diese single.py . Dies wird die Hauptfunktion unserer ersten, naiven Version des Imgur-Bilddownloaders enthalten. Das Modul ruft die Imgur-Client-ID in der Umgebungsvariable IMGUR_CLIENT_ID . Es setup_download_dir auf, um das Download-Zielverzeichnis zu erstellen. Schließlich ruft es mit der Funktion get_links eine Liste von Bildern ab, filtert alle GIF- und Album-URLs heraus und verwendet dann download_link , um jedes dieser Bilder herunterzuladen und auf der Festplatte zu speichern. So sieht single.py aus:

 import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

Auf meinem Laptop brauchte dieses Skript 19,4 Sekunden, um 91 Bilder herunterzuladen. Bitte beachten Sie, dass diese Nummern je nach Netzwerk, in dem Sie sich befinden, variieren können. 19,4 Sekunden sind nicht besonders lang, aber was wäre, wenn wir mehr Bilder herunterladen wollten? Vielleicht 900 Bilder statt 90. Bei durchschnittlich 0,2 Sekunden pro Bild würden 900 Bilder etwa 3 Minuten dauern. Für 9000 Bilder würde es 30 Minuten dauern. Die gute Nachricht ist, dass wir dies durch die Einführung von Nebenläufigkeit oder Parallelität erheblich beschleunigen können.

Alle nachfolgenden Codebeispiele zeigen nur import-Anweisungen, die neu und spezifisch für diese Beispiele sind. Der Einfachheit halber sind alle diese Python-Skripte in diesem GitHub-Repository zu finden.

Nebenläufigkeit und Parallelität in Python: Threading-Beispiel

Threading ist einer der bekanntesten Ansätze, um Parallelität und Parallelität in Python zu erreichen. Threading ist eine Funktion, die normalerweise vom Betriebssystem bereitgestellt wird. Threads sind leichter als Prozesse und teilen sich den gleichen Speicherplatz.

Python-Multithreading-Speichermodell

In diesem Python-Threading-Beispiel schreiben wir ein neues Modul, um single.py zu ersetzen. Dieses Modul erstellt einen Pool von acht Threads, was insgesamt neun Threads einschließlich des Hauptthreads ergibt. Ich habe mich für acht Worker-Threads entschieden, da mein Computer über acht CPU-Kerne verfügt und ein Worker-Thread pro Kern eine gute Zahl für die gleichzeitige Ausführung von Threads zu sein schien. In der Praxis wird diese Anzahl viel sorgfältiger auf der Grundlage anderer Faktoren ausgewählt, z. B. andere Anwendungen und Dienste, die auf demselben Computer ausgeführt werden.

Dies ist fast dasselbe wie die vorherige, mit der Ausnahme, dass wir jetzt eine neue Klasse haben, DownloadWorker , die ein Nachkomme der Python- Thread -Klasse ist. Die run-Methode wurde überschrieben, wodurch eine Endlosschleife ausgeführt wird. Bei jeder Iteration ruft es self.queue.get() auf, um zu versuchen, eine URL aus einer Thread-sicheren Warteschlange abzurufen. Es blockiert, bis sich ein Element in der Warteschlange befindet, das der Worker verarbeiten kann. Sobald der Worker ein Element aus der Warteschlange erhält, ruft er dieselbe download_link -Methode auf, die im vorherigen Skript verwendet wurde, um das Bild in das Bilderverzeichnis herunterzuladen. Nachdem der Download abgeschlossen ist, signalisiert der Worker der Warteschlange, dass diese Aufgabe erledigt ist. Dies ist sehr wichtig, da die Warteschlange verfolgt, wie viele Aufgaben in die Warteschlange gestellt wurden. Der Aufruf von queue.join() würde den Haupt-Thread für immer blockieren, wenn die Worker nicht signalisieren, dass sie eine Aufgabe erledigt haben.

 import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Das Ausführen dieses Python-Threading-Beispielskripts auf demselben Computer, der zuvor verwendet wurde, führt zu einer Downloadzeit von 4,1 Sekunden! Das ist 4,7-mal schneller als im vorherigen Beispiel. Obwohl dies viel schneller ist, sollte erwähnt werden, dass während dieses Prozesses aufgrund der GIL nur jeweils ein Thread ausgeführt wurde. Daher ist dieser Code gleichzeitig, aber nicht parallel. Der Grund, warum es immer noch schneller ist, liegt darin, dass dies eine IO-gebundene Aufgabe ist. Der Prozessor kommt beim Herunterladen dieser Bilder kaum ins Schwitzen und die meiste Zeit wird mit Warten auf das Netz verbracht. Aus diesem Grund kann Python-Multithreading eine große Geschwindigkeitssteigerung bieten. Der Prozessor kann zwischen den Threads wechseln, wann immer einer von ihnen bereit ist, etwas zu tun. Die Verwendung des Threading-Moduls in Python oder einer anderen interpretierten Sprache mit einer GIL kann tatsächlich zu einer reduzierten Leistung führen. Wenn Ihr Code eine CPU-gebundene Aufgabe ausführt, z. B. das Dekomprimieren von gzip-Dateien, führt die Verwendung des threading -Moduls zu einer langsameren Ausführungszeit. Für CPU-gebundene Aufgaben und wirklich parallele Ausführung können wir das Multiprocessing-Modul verwenden.

Während die De-facto -Python-Referenzimplementierung – CPython – eine GIL hat, gilt dies nicht für alle Python-Implementierungen. Beispielsweise hat IronPython, eine Python-Implementierung, die das .NET-Framework verwendet, keine GIL, und Jython, die Java-basierte Implementierung, auch nicht. Eine Liste funktionierender Python-Implementierungen finden Sie hier.

Siehe auch : Best Practices und Tipps für Python von Toptal-Entwicklern

Nebenläufigkeit und Parallelität in Python Beispiel 2: Erzeugen mehrerer Prozesse

Das Multiprocessing-Modul lässt sich einfacher einfügen als das Threading-Modul, da wir keine Klasse wie im Python-Threading-Beispiel hinzufügen müssen. Die einzigen Änderungen, die wir vornehmen müssen, sind in der Hauptfunktion.

Python-Multiprocessing-Tutorial: Module

Um mehrere Prozesse zu verwenden, erstellen wir einen Multiprocessing- Pool . Mit der bereitgestellten Map-Methode übergeben wir die Liste der URLs an den Pool, der wiederum acht neue Prozesse hervorbringt und jeden davon verwendet, um die Bilder parallel herunterzuladen. Dies ist echte Parallelität, aber sie ist mit Kosten verbunden. Der gesamte Speicher des Skripts wird in jeden erzeugten Unterprozess kopiert. In diesem einfachen Beispiel ist es keine große Sache, aber es kann leicht zu einem ernsthaften Overhead für nicht-triviale Programme werden.

 import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

Parallelität und Parallelität in Python Beispiel 3: Verteilung an mehrere Worker

Die Threading- und Multiprocessing-Module eignen sich zwar hervorragend für Skripts, die auf Ihrem PC ausgeführt werden, aber was sollten Sie tun, wenn Sie möchten, dass die Arbeit auf einem anderen Computer ausgeführt wird, oder Sie auf mehr skalieren müssen, als die CPU auf einem Computer kann handhaben? Ein großartiger Anwendungsfall dafür sind lang andauernde Back-End-Aufgaben für Webanwendungen. Wenn Sie einige lang andauernde Aufgaben haben, möchten Sie nicht eine Reihe von Unterprozessen oder Threads auf demselben Computer hochfahren, die den Rest Ihres Anwendungscodes ausführen müssen. Dadurch wird die Leistung Ihrer Anwendung für alle Ihre Benutzer beeinträchtigt. Was großartig wäre, wäre, diese Jobs auf einem anderen Computer oder vielen anderen Computern ausführen zu können.

Eine großartige Python-Bibliothek für diese Aufgabe ist RQ, eine sehr einfache, aber leistungsstarke Bibliothek. Zuerst fügen Sie eine Funktion und ihre Argumente mithilfe der Bibliothek in die Warteschlange ein. Dadurch wird die Darstellung des Funktionsaufrufs ausgewählt, die dann an eine Redis-Liste angehängt wird. Das Einreihen des Jobs in die Warteschlange ist der erste Schritt, wird aber noch nichts bewirken. Außerdem benötigen wir mindestens einen Worker, der diese Jobwarteschlange abhört.

Modell der RQ-Python-Warteschlangenbibliothek

Der erste Schritt besteht darin, einen Redis-Server auf Ihrem Computer zu installieren und auszuführen oder Zugriff auf einen laufenden Redis-Server zu haben. Danach werden nur noch wenige kleine Änderungen am bestehenden Code vorgenommen. Wir erstellen zuerst eine Instanz einer RQ-Warteschlange und übergeben ihr eine Instanz eines Redis-Servers aus der redis-py-Bibliothek. Anstatt nur unsere Methode download_link aufzurufen, rufen wir dann q.enqueue(download_link, download_dir, link) . Die Enqueue-Methode nimmt eine Funktion als erstes Argument, dann werden alle anderen Argumente oder Schlüsselwortargumente an diese Funktion weitergegeben, wenn der Job tatsächlich ausgeführt wird.

Ein letzter Schritt, den wir tun müssen, ist, einige Arbeiter einzustellen. RQ bietet ein praktisches Skript zum Ausführen von Workern in der Standardwarteschlange. Führen Sie einfach rqworker in einem Terminalfenster aus und es startet einen Worker, der die Standardwarteschlange abhört. Bitte stellen Sie sicher, dass Ihr aktuelles Arbeitsverzeichnis das gleiche ist, in dem sich die Skripte befinden. Wenn Sie eine andere Warteschlange abhören möchten, können rqworker queue_name und es wird auf diese benannte Warteschlange lauschen. Das Tolle an RQ ist, dass Sie, solange Sie eine Verbindung zu Redis herstellen können, beliebig viele Worker auf beliebig vielen verschiedenen Computern ausführen können. Daher ist eine Skalierung sehr einfach, wenn Ihre Anwendung wächst. Hier ist die Quelle für die RQ-Version:

 import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()

RQ ist jedoch nicht die einzige Lösung für Python-Jobwarteschlangen. RQ ist einfach zu verwenden und deckt einfache Anwendungsfälle sehr gut ab, aber wenn erweiterte Optionen erforderlich sind, können andere Python 3-Warteschlangenlösungen (wie Celery) verwendet werden.

Python-Multithreading vs. Multiprocessing

Wenn Ihr Code IO-gebunden ist, funktionieren sowohl Multiprocessing als auch Multithreading in Python für Sie. Multiprocessing lässt sich einfacher einbinden als Threading, hat aber einen höheren Speicher-Overhead. Wenn Ihr Code CPU-gebunden ist, ist Multiprocessing höchstwahrscheinlich die bessere Wahl – insbesondere, wenn der Zielcomputer mehrere Kerne oder CPUs hat. Für Webanwendungen und wenn Sie die Arbeit auf mehrere Computer skalieren müssen, ist RQ besser für Sie.

Verwandte Themen: Fortgeschrittener werden: Vermeiden Sie die 10 häufigsten Fehler, die Python-Programmierer machen

Aktualisieren

Python concurrent.futures

Etwas Neues seit Python 3.2, das im ursprünglichen Artikel nicht angesprochen wurde, ist das Paket concurrent.futures . Dieses Paket bietet eine weitere Möglichkeit, Nebenläufigkeit und Parallelität mit Python zu verwenden.

Im ursprünglichen Artikel habe ich erwähnt, dass das Multiprocessing-Modul von Python einfacher in bestehenden Code eingefügt werden kann als das Threading-Modul. Dies lag daran, dass das Python 3-Threading-Modul eine Unterklasse der Thread -Klasse erforderte und auch eine Queue für die Threads erstellte, die auf Arbeit überwacht werden sollten.

Die Verwendung eines concurrent.futures.ThreadPoolExecutor macht den Python-Threading-Beispielcode fast identisch mit dem Multiprocessing-Modul.

 import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()

Jetzt, da wir alle diese Bilder mit unserem Python ThreadPoolExecutor heruntergeladen haben, können wir sie verwenden, um eine CPU-gebundene Aufgabe zu testen. Wir können Thumbnail-Versionen aller Bilder in einem Single-Threaded-Single-Process-Skript erstellen und dann eine Multiprocessing-basierte Lösung testen.

Wir werden die Pillow-Bibliothek verwenden, um die Größenänderung der Bilder zu handhaben.

Hier ist unser erstes Skript.

 import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Dieses Skript durchläuft die Pfade im Ordner images und führt für jeden Pfad die Funktion create_thumbnail aus. Diese Funktion verwendet Pillow, um das Bild zu öffnen, ein Miniaturbild zu erstellen und das neue, kleinere Bild unter demselben Namen wie das Original, aber mit angehängtem _thumbnail an den Namen zu speichern.

Das Ausführen dieses Skripts für 160 Bilder mit insgesamt 36 Millionen dauert 2,32 Sekunden. Mal sehen, ob wir dies mit einem ProcessPoolExecutor beschleunigen können.

 import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Die Methode create_thumbnail ist identisch mit dem letzten Skript. Der Hauptunterschied besteht in der Erstellung eines ProcessPoolExecutor . Die Map-Methode des Executors wird verwendet, um die Thumbnails parallel zu erstellen. Standardmäßig erstellt der ProcessPoolExecutor einen Unterprozess pro CPU. Das Ausführen dieses Skripts auf denselben 160 Bildern dauerte 1,05 Sekunden – 2,2-mal schneller!

Async/Await (nur Python 3.5+)

Eines der am häufigsten nachgefragten Elemente in den Kommentaren zum Originalartikel war ein Beispiel für die Verwendung des asyncio-Moduls von Python 3. Im Vergleich zu den anderen Beispielen gibt es einige neue Python-Syntaxen, die für die meisten Menschen vielleicht neu sind, und auch einige neue Konzepte. Eine unglückliche zusätzliche Ebene der Komplexität wird dadurch verursacht, dass Pythons eingebautes urllib -Modul nicht asynchron ist. Wir müssen eine asynchrone HTTP-Bibliothek verwenden, um die vollen Vorteile von asyncio nutzen zu können. Dazu verwenden wir aiohttp.

Lassen Sie uns direkt in den Code springen und eine detailliertere Erklärung wird folgen.

 import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)

Hier gibt es einiges auszupacken. Beginnen wir mit dem Haupteinstiegspunkt des Programms. Die erste neue Sache, die wir mit dem asyncio-Modul machen, ist, die Ereignisschleife zu erhalten. Die Ereignisschleife verarbeitet den gesamten asynchronen Code. Dann wird die Schleife bis zum Ende durchlaufen und die main übergeben. Es gibt eine neue Syntax in der Definition von main: async def . Sie werden auch await und with async bemerken.

Die async/await-Syntax wurde in PEP492 eingeführt. Die async def Syntax markiert eine Funktion als Coroutine. Intern basieren Coroutinen auf Python-Generatoren, sind aber nicht genau dasselbe. Koroutinen geben ein Koroutinenobjekt zurück, ähnlich wie Generatoren ein Generatorobjekt zurückgeben. Sobald Sie eine Coroutine haben, erhalten Sie ihre Ergebnisse mit dem await Ausdruck. Wenn eine Coroutine await aufruft, wird die Ausführung der Coroutine ausgesetzt, bis das Await abgeschlossen ist. Diese Unterbrechung ermöglicht, dass andere Arbeiten abgeschlossen werden, während die Coroutine ausgesetzt wird und auf ein Ergebnis „wartet“. Im Allgemeinen ist dieses Ergebnis eine Art I/O wie eine Datenbankanfrage oder in unserem Fall eine HTTP-Anfrage.

Die Funktion download_link musste ziemlich stark geändert werden. Früher haben wir uns auf urllib verlassen, um die Hauptarbeit des Lesens des Bildes für uns zu erledigen. Damit unsere Methode nun ordnungsgemäß mit dem asynchronen Programmierparadigma funktioniert, haben wir eine while -Schleife eingeführt, die Teile des Bildes auf einmal liest und die Ausführung anhält, während sie auf den Abschluss der E/A wartet. Dadurch kann die Ereignisschleife das Herunterladen der verschiedenen Bilder durchlaufen, da für jedes während des Herunterladens neue Daten verfügbar sind.

Es sollte einen – am besten nur einen – offensichtlichen Weg geben, es zu tun

Während das Zen von Python uns sagt, dass es einen offensichtlichen Weg geben sollte, etwas zu tun, gibt es in Python viele Möglichkeiten, Nebenläufigkeit in unsere Programme einzuführen. Die beste Methode zur Auswahl hängt von Ihrem spezifischen Anwendungsfall ab. Das asynchrone Paradigma lässt sich im Vergleich zu Threading oder Multiprocessing besser auf Workloads mit hoher Parallelität (wie einen Webserver) skalieren, aber es erfordert, dass Ihr Code (und Abhängigkeiten) asynchron sind, um voll davon zu profitieren.

Hoffentlich zeigen Ihnen die Python-Threading-Beispiele in diesem Artikel – und Update – die richtige Richtung, damit Sie eine Vorstellung davon haben, wo Sie in der Python-Standardbibliothek nachsehen müssen, wenn Sie Nebenläufigkeit in Ihre Programme einführen müssen.