Samouczek dotyczący wielowątkowości i przetwarzania wieloprocesowego w Pythonie

Opublikowany: 2022-03-11
Uwaga: na popularne żądanie zademonstrowania kilku alternatywnych technik — w tym async/await, dostępnych tylko od czasu pojawienia się Pythona 3.5 — dodałem kilka aktualizacji na końcu artykułu. Cieszyć się!

Dyskusje krytykujące Pythona często mówią o tym, jak trudno jest używać Pythona do pracy wielowątkowej, wskazując palcami na tak zwaną globalną blokadę interpretera (czule nazywaną GIL), która zapobiega jednoczesnemu uruchamianiu wielu wątków kodu Pythona. Z tego powodu moduł wielowątkowości w Pythonie nie zachowuje się tak, jak można by się tego spodziewać, jeśli nie jesteś programistą Pythona i pochodzisz z innych języków, takich jak C++ lub Java. Należy wyjaśnić, że nadal można pisać kod w Pythonie, który działa współbieżnie lub równolegle i znacząco wpływa na wynikową wydajność, o ile brane są pod uwagę pewne rzeczy. Jeśli jeszcze tego nie czytałeś, proponuję zapoznać się z artykułem Eqbal Quran na temat współbieżności i równoległości w Ruby na blogu Toptal Engineering.

W tym samouczku współbieżności Pythona napiszemy mały skrypt Pythona, aby pobrać najpopularniejsze popularne obrazy z Imgur. Zaczniemy od wersji, która pobiera obrazy sekwencyjnie lub pojedynczo. Jako warunek wstępny będziesz musiał zarejestrować aplikację na Imgur. Jeśli nie masz jeszcze konta Imgur, utwórz je najpierw.

Skrypty w tych przykładach wątków zostały przetestowane w Pythonie 3.6.4. Z pewnymi zmianami powinny również działać z Pythonem 2 — urllib jest tym, co zmieniło się najbardziej między tymi dwiema wersjami Pythona.

Pierwsze kroki z wielowątkowością w Pythonie

Zacznijmy od stworzenia modułu Pythona o nazwie download.py . Ten plik będzie zawierał wszystkie funkcje niezbędne do pobrania listy obrazów i ich pobrania. Podzielimy te funkcjonalności na trzy oddzielne funkcje:

  • get_links
  • download_link
  • setup_download_dir

Trzecia funkcja, setup_download_dir , zostanie użyta do utworzenia katalogu docelowego pobierania, jeśli jeszcze nie istnieje.

Interfejs API firmy Imgur wymaga, aby żądania HTTP zawierały nagłówek Authorization z identyfikatorem klienta. Możesz znaleźć ten identyfikator klienta z pulpitu nawigacyjnego aplikacji zarejestrowanej w Imgur, a odpowiedź będzie zakodowana w formacie JSON. Do odkodowania możemy użyć standardowej biblioteki JSON Pythona. Pobieranie obrazu jest jeszcze prostszym zadaniem, ponieważ wszystko, co musisz zrobić, to pobrać obraz według jego adresu URL i zapisać go w pliku.

Tak wygląda skrypt:

 import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir

Następnie będziemy musieli napisać moduł, który będzie wykorzystywał te funkcje do pobierania obrazów jeden po drugim. Nazwiemy to single.py . Będzie zawierać główną funkcję naszej pierwszej, naiwnej wersji narzędzia do pobierania obrazów Imgur. Moduł pobierze identyfikator klienta Imgur w zmiennej środowiskowej IMGUR_CLIENT_ID . setup_download_dir , aby utworzyć katalog docelowy pobierania. Na koniec pobierze listę obrazów za pomocą funkcji get_links , odfiltruje wszystkie adresy URL GIF i albumów, a następnie użyje download_link do pobrania i zapisania każdego z tych obrazów na dysku. Oto jak wygląda single.py :

 import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

Na moim laptopie pobranie 91 obrazów zajęło temu skryptowi 19,4 sekundy. Pamiętaj, że te liczby mogą się różnić w zależności od sieci, w której się znajdujesz. 19,4 sekundy nie jest strasznie długie, ale co by było, gdybyśmy chcieli pobrać więcej zdjęć? Być może 900 zdjęć zamiast 90. Przy średniej 0,2 sekundy na zdjęcie, 900 zdjęć zajęłoby około 3 minut. W przypadku 9000 zdjęć zajęłoby to 30 minut. Dobrą wiadomością jest to, że wprowadzając współbieżność lub równoległość możemy to znacznie przyspieszyć.

Wszystkie kolejne przykłady kodu pokażą tylko instrukcje importu, które są nowe i specyficzne dla tych przykładów. Dla wygody wszystkie te skrypty Pythona można znaleźć w tym repozytorium GitHub.

Współbieżność i równoległość w Pythonie: przykład wątków

Wątek jest jednym z najbardziej znanych podejść do osiągnięcia współbieżności i równoległości Pythona. Wątek to funkcja zwykle udostępniana przez system operacyjny. Wątki są lżejsze niż procesy i dzielą tę samą przestrzeń pamięci.

Wielowątkowy model pamięci w Pythonie

W tym przykładzie wątkowania w Pythonie napiszemy nowy moduł, który zastąpi single.py . Ten moduł utworzy pulę ośmiu wątków, co daje łącznie dziewięć wątków, w tym główny wątek. Wybrałem osiem wątków roboczych, ponieważ mój komputer ma osiem rdzeni procesora, a jeden wątek roboczy na rdzeń wydaje się dobrą liczbą, jeśli chodzi o liczbę wątków uruchomionych jednocześnie. W praktyce liczba ta jest dobierana znacznie ostrożniej na podstawie innych czynników, takich jak inne aplikacje i usługi działające na tej samej maszynie.

Jest to prawie to samo, co poprzednie, z wyjątkiem tego, że mamy teraz nową klasę, DownloadWorker , która jest potomkiem klasy Python Thread . Zastąpiono metodę run, która uruchamia nieskończoną pętlę. W każdej iteracji wywołuje self.queue.get() , aby spróbować pobrać adres URL z kolejki bezpiecznej dla wątków. Blokuje się, dopóki w kolejce nie znajdzie się element do przetworzenia przez pracownika. Gdy pracownik otrzyma element z kolejki, wywołuje tę samą metodę download_link , która została użyta w poprzednim skrypcie do pobrania obrazu do katalogu images. Po zakończeniu pobierania pracownik sygnalizuje kolejce, że zadanie zostało wykonane. Jest to bardzo ważne, ponieważ Kolejka śledzi, ile zadań zostało umieszczonych w kolejce. Wywołanie queue.join() zablokowałoby główny wątek na zawsze, gdyby pracownicy nie sygnalizowali, że zakończyli zadanie.

 import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Uruchomienie tego przykładowego skryptu z wątkami w Pythonie na tym samym komputerze, którego używaliśmy wcześniej, powoduje, że czas pobierania wynosi 4,1 sekundy! To 4,7 razy szybciej niż w poprzednim przykładzie. Chociaż jest to znacznie szybsze, warto wspomnieć, że ze względu na GIL w tym samym czasie wykonywany był tylko jeden wątek. Dlatego ten kod jest współbieżny, ale nie równoległy. Powodem, dla którego nadal jest szybszy, jest to, że jest to zadanie związane z IO. Procesor prawie się nie spoci podczas pobierania tych obrazów, a większość czasu spędza czekając na sieć. Dlatego wielowątkowość w Pythonie może zapewnić duży wzrost prędkości. Procesor może przełączać się między wątkami, gdy jeden z nich jest gotowy do wykonania jakiejś pracy. Używanie modułu wątków w Pythonie lub dowolnym innym interpretowanym języku z GIL może w rzeczywistości skutkować zmniejszoną wydajnością. Jeśli Twój kod wykonuje zadanie związane z procesorem, takie jak dekompresowanie plików gzip, użycie modułu threading spowoduje wolniejszy czas wykonywania. Do zadań związanych z procesorem i prawdziwie równoległego wykonywania możemy użyć modułu wieloprocesorowego.

Chociaż de facto referencyjna implementacja Pythona — CPython — ma GIL, nie dotyczy to wszystkich implementacji Pythona. Na przykład IronPython, implementacja Pythona korzystająca z platformy .NET, nie ma GIL, podobnie jak Jython, implementacja oparta na Javie. Listę działających implementacji Pythona znajdziesz tutaj.

Powiązane: Najlepsze praktyki i wskazówki Pythona autorstwa Toptal Developers

Współbieżność i równoległość w Pythonie Przykład 2: tworzenie wielu procesów

Moduł wieloprocesorowy jest łatwiejszy do wrzucenia niż moduł wątkowości, ponieważ nie musimy dodawać klasy, takiej jak przykład wątkowości w Pythonie. Jedyne zmiany, jakie musimy wprowadzić, to funkcja główna.

Samouczek dotyczący wieloprocesowego przetwarzania w Pythonie: Moduły

Aby korzystać z wielu procesów, tworzymy Pool wieloprocesową. Za pomocą metody map, którą udostępnia, przekażemy listę adresów URL do puli, która z kolei uruchomi osiem nowych procesów i użyje każdego z nich do równoległego pobierania obrazów. To prawdziwy paralelizm, ale ma swoją cenę. Cała pamięć skryptu jest kopiowana do każdego tworzonego podprocesu. W tym prostym przykładzie nie jest to wielka sprawa, ale może łatwo stać się poważnym obciążeniem dla nietrywialnych programów.

 import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

Współbieżność i równoległość w Pythonie Przykład 3: Dystrybucja do wielu pracowników

Chociaż moduły wielowątkowe i wieloprocesorowe świetnie nadają się do skryptów uruchamianych na komputerze osobistym, co należy zrobić, jeśli chcesz, aby praca była wykonywana na innej maszynie lub jeśli potrzebujesz skalować w górę do więcej niż procesor na jednej maszynie? uchwyt? Świetnym przypadkiem użycia tego są długotrwałe zadania zaplecza dla aplikacji internetowych. Jeśli masz jakieś długotrwałe zadania, nie chcesz uruchamiać kilku podprocesów lub wątków na tej samej maszynie, która musi uruchamiać resztę kodu aplikacji. Spowoduje to obniżenie wydajności Twojej aplikacji dla wszystkich użytkowników. Byłoby wspaniale móc wykonywać te zadania na innej maszynie lub wielu innych maszynach.

Świetną biblioteką Pythona do tego zadania jest RQ, bardzo prosta, ale potężna biblioteka. Najpierw umieszczasz w kolejce funkcję i jej argumenty za pomocą biblioteki. To marynuje reprezentację wywołania funkcji, która jest następnie dołączana do listy Redis. Kolejkowanie pracy to pierwszy krok, ale jeszcze nic nie zrobi. Potrzebujemy również co najmniej jednego pracownika do nasłuchiwania w tej kolejce zadań.

Model biblioteki kolejki RQ Python

Pierwszym krokiem jest zainstalowanie i uruchomienie serwera Redis na komputerze lub uzyskanie dostępu do działającego serwera Redis. Następnie w istniejącym kodzie wprowadzono tylko kilka drobnych zmian. Najpierw tworzymy instancję kolejki RQ i przekazujemy jej instancję serwera Redis z biblioteki redis-py. Następnie, zamiast po prostu wywoływać naszą metodę download_link , wywołujemy q.enqueue(download_link, download_dir, link) . Metoda enqueue przyjmuje funkcję jako swój pierwszy argument, a następnie wszelkie inne argumenty lub argumenty słów kluczowych są przekazywane do tej funkcji, gdy zadanie jest faktycznie wykonywane.

Ostatnim krokiem, który musimy zrobić, jest uruchomienie kilku pracowników. RQ udostępnia przydatny skrypt do uruchamiania pracowników w domyślnej kolejce. Po prostu uruchom rqworker w oknie terminala, a uruchomi on nasłuchiwanie pracownika w domyślnej kolejce. Upewnij się, że twój bieżący katalog roboczy jest taki sam jak ten, w którym znajdują się skrypty. Jeśli chcesz nasłuchiwać innej kolejki, możesz uruchomić rqworker queue_name i będzie nasłuchiwał tej nazwanej kolejki. Wspaniałą rzeczą w RQ jest to, że dopóki możesz połączyć się z Redis, możesz obsługiwać dowolną liczbę pracowników na tylu różnych maszynach, ile chcesz; dlatego bardzo łatwo jest skalować w miarę rozwoju aplikacji. Oto źródło wersji RQ:

 import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()

Jednak RQ nie jest jedynym rozwiązaniem kolejki zadań w Pythonie. RQ jest łatwy w użyciu i bardzo dobrze obsługuje proste przypadki użycia, ale jeśli wymagane są bardziej zaawansowane opcje, można użyć innych rozwiązań kolejki Pythona 3 (takich jak Celery).

Wielowątkowość w Pythonie a wieloprocesorowość

Jeśli twój kod jest powiązany z IO, zarówno wieloprocesorowość, jak i wielowątkowość w Pythonie będą dla ciebie działać. Proces wieloprocesowy jest łatwiejszy do zrzucenia niż wątkowość, ale wiąże się z wyższym obciążeniem pamięci. Jeśli twój kod jest powiązany z procesorem, najprawdopodobniej lepszym wyborem będzie przetwarzanie wieloprocesorowe — zwłaszcza jeśli maszyna docelowa ma wiele rdzeni lub procesorów. W przypadku aplikacji internetowych i gdy musisz skalować pracę na wielu maszynach, RQ będzie dla Ciebie lepszy.

Powiązane: Stań się bardziej zaawansowany: unikaj 10 najczęstszych błędów popełnianych przez programistów Pythona

Aktualizacja

Python concurrent.futures

Coś nowego od czasu Pythona 3.2, o którym nie wspomniano w oryginalnym artykule, to pakiet concurrent.futures . Ten pakiet zapewnia jeszcze inny sposób wykorzystania współbieżności i równoległości w Pythonie.

W oryginalnym artykule wspomniałem, że moduł wieloprocesorowy Pythona byłby łatwiejszy do wrzucenia do istniejącego kodu niż moduł wątkowości. Wynikało to z faktu, że moduł wątkowości Pythona 3 wymagał podklasy klasy Thread , a także utworzenia Queue dla wątków do monitorowania pracy.

Użycie concurrent.futures.ThreadPoolExecutor sprawia, że ​​przykładowy kod wątkowości Pythona jest prawie identyczny z modułem wieloprocesorowym.

 import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()

Teraz, gdy mamy już wszystkie te obrazy pobrane za pomocą naszego Pythona ThreadPoolExecutor , możemy ich użyć do przetestowania zadania związanego z procesorem. Możemy tworzyć miniaturowe wersje wszystkich obrazów zarówno w jednowątkowym, jednoprocesowym skrypcie, a następnie testować rozwiązanie oparte na wieloprocesorowości.

Użyjemy biblioteki Pillow do obsługi zmiany rozmiaru obrazów.

Oto nasz początkowy skrypt.

 import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Ten skrypt iteruje po ścieżkach w folderze images i dla każdej ścieżki uruchamia funkcję create_thumbnail. Ta funkcja używa poduszki do otwarcia obrazu, utworzenia miniatury i zapisania nowego, mniejszego obrazu o tej samej nazwie co oryginał, ale z dołączoną do nazwy nazwą _thumbnail .

Uruchomienie tego skryptu na 160 obrazach o łącznej wartości 36 milionów zajmuje 2,32 sekundy. Zobaczmy, czy możemy przyspieszyć to za pomocą ProcessPoolExecutor.

 import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Metoda create_thumbnail jest identyczna jak w ostatnim skrypcie. Główną różnicą jest utworzenie ProcessPoolExecutor . Do równoległego tworzenia miniatur wykorzystywana jest metoda map executora. Domyślnie ProcessPoolExecutor tworzy jeden podproces na procesor. Uruchomienie tego skryptu na tych samych 160 obrazach zajęło 1,05 sekundy — 2,2 razy szybciej!

Async/Oczekiwanie (tylko Python 3.5+)

Jednym z najbardziej pożądanych elementów w komentarzach do oryginalnego artykułu był przykład użycia modułu asyncio Pythona 3. W porównaniu z innymi przykładami istnieje pewna nowa składnia Pythona, która może być nowa dla większości ludzi, a także kilka nowych koncepcji. Niefortunna dodatkowa warstwa złożoności jest spowodowana tym, że wbudowany moduł urllib nie jest asynchroniczny. Będziemy musieli użyć asynchronicznej biblioteki HTTP, aby uzyskać pełne korzyści z asyncio. W tym celu użyjemy aiohttp.

Przejdźmy od razu do kodu, a nastąpi bardziej szczegółowe wyjaśnienie.

 import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)

Tutaj jest sporo do rozpakowania. Zacznijmy od głównego punktu wejścia do programu. Pierwszą nową rzeczą, jaką robimy z modułem asyncio, jest uzyskanie pętli zdarzeń. Pętla zdarzeń obsługuje cały kod asynchroniczny. Następnie pętla jest uruchamiana aż do zakończenia i przekazana funkcji main . W definicji main jest kawałek nowej składni: async def . Zauważysz także await i with async .

Składnia async/await została wprowadzona w PEP492. Składnia async def oznacza funkcję jako współprogram. Wewnętrznie współprogramy są oparte na generatorach Pythona, ale nie są dokładnie tym samym. Współprogramy zwracają obiekt współprogramu, podobnie jak generatory zwracają obiekt generatora. Gdy masz współprogram, otrzymujesz jego wyniki za pomocą wyrażenia await . Gdy współprogram wywołują wywołania await , wykonanie współprogramu jest zawieszone do czasu zakończenia awaitable. To zawieszenie umożliwia wykonanie innych prac, podczas gdy współprogram jest zawieszony „w oczekiwaniu” na jakiś wynik. Ogólnie rzecz biorąc, wynikiem tego będzie rodzaj operacji wejścia/wyjścia, jak żądanie bazy danych lub w naszym przypadku żądanie HTTP.

Funkcja download_link musiała zostać dość znacząco zmieniona. Wcześniej polegaliśmy na urllib , aby wykonać dla nas ciężar pracy polegający na odczytaniu obrazu. Teraz, aby nasza metoda działała poprawnie z paradygmatem programowania asynchronicznego, wprowadziliśmy pętlę while , która odczytuje fragmenty obrazu na raz i wstrzymuje wykonywanie podczas oczekiwania na zakończenie operacji we/wy. Pozwala to pętli zdarzeń na zapętlenie pobierania różnych obrazów, ponieważ każdy z nich ma nowe dane dostępne podczas pobierania.

Powinien być jeden — najlepiej tylko jeden — oczywisty sposób na zrobienie tego

Chociaż zen Pythona mówi nam, że powinien istnieć jeden oczywisty sposób na zrobienie czegoś, w Pythonie jest wiele sposobów na wprowadzenie współbieżności do naszych programów. Najlepsza metoda do wyboru będzie zależeć od konkretnego przypadku użycia. Paradygmat asynchroniczny lepiej skaluje się do obciążeń o wysokiej współbieżności (takich jak serwer sieci Web) w porównaniu do wątków lub przetwarzania wieloprocesowego, ale wymaga, aby kod (i zależności) był asynchroniczny, aby w pełni skorzystać.

Mamy nadzieję, że przedstawione w tym artykule przykłady wątków Pythona — i aktualizacja — wskażą ci właściwy kierunek, dzięki czemu będziesz miał pojęcie, gdzie szukać w standardowej bibliotece Pythona, jeśli chcesz wprowadzić współbieżność do swoich programów.