Tutorial de multihilo y multiprocesamiento de Python
Publicado: 2022-03-11Las discusiones que critican a Python a menudo hablan de lo difícil que es usar Python para el trabajo de subprocesos múltiples, señalando con el dedo lo que se conoce como el bloqueo global del intérprete (denominado cariñosamente como GIL) que evita que varios subprocesos del código de Python se ejecuten simultáneamente. Debido a esto, el módulo de subprocesos múltiples de Python no se comporta de la manera que esperaría si no es un desarrollador de Python y proviene de otros lenguajes como C ++ o Java. Debe quedar claro que aún se puede escribir código en Python que se ejecute simultáneamente o en paralelo y marque una gran diferencia en el rendimiento resultante, siempre que se tengan en cuenta ciertas cosas. Si aún no lo ha leído, le sugiero que eche un vistazo al artículo de Eqbal Quran sobre concurrencia y paralelismo en Ruby aquí en el blog de ingeniería de Toptal.
En este tutorial de simultaneidad de Python, escribiremos un pequeño script de Python para descargar las imágenes más populares de Imgur. Comenzaremos con una versión que descarga las imágenes de forma secuencial, o de una en una. Como requisito previo, deberá registrar una aplicación en Imgur. Si aún no tiene una cuenta de Imgur, cree una primero.
Los scripts de estos ejemplos de subprocesos se han probado con Python 3.6.4. Con algunos cambios, también deberían ejecutarse con Python 2: urllib es lo que más ha cambiado entre estas dos versiones de Python.
Introducción a los subprocesos múltiples de Python
Comencemos por crear un módulo de Python, llamado download.py
. Este archivo contendrá todas las funciones necesarias para obtener la lista de imágenes y descargarlas. Dividiremos estas funcionalidades en tres funciones separadas:
-
get_links
-
download_link
-
setup_download_dir
La tercera función, setup_download_dir
, se usará para crear un directorio de destino de descarga si aún no existe.
La API de Imgur requiere que las solicitudes HTTP lleven el encabezado de Authorization
con la ID del cliente. Puede encontrar este ID de cliente en el panel de control de la aplicación que ha registrado en Imgur, y la respuesta estará codificada en JSON. Podemos usar la biblioteca JSON estándar de Python para decodificarlo. Descargar la imagen es una tarea aún más simple, ya que todo lo que tiene que hacer es buscar la imagen por su URL y escribirla en un archivo.
Así es como se ve el guión:
import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir
A continuación, necesitaremos escribir un módulo que utilice estas funciones para descargar las imágenes, una por una. Llamaremos a este single.py
. Esto contendrá la función principal de nuestra primera versión ingenua del descargador de imágenes Imgur. El módulo recuperará el ID de cliente de Imgur en la variable de entorno IMGUR_CLIENT_ID
. setup_download_dir
para crear el directorio de destino de descarga. Finalmente, obtendrá una lista de imágenes usando la función get_links
, filtrará todas las URL de GIF y álbumes y luego usará download_link
para descargar y guardar cada una de esas imágenes en el disco. Así es como luce single.py
:
import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
En mi computadora portátil, esta secuencia de comandos tardó 19,4 segundos en descargar 91 imágenes. Tenga en cuenta que estos números pueden variar según la red en la que se encuentre. 19,4 segundos no es mucho tiempo, pero ¿y si quisiéramos descargar más imágenes? Quizás 900 imágenes, en lugar de 90. Con un promedio de 0,2 segundos por imagen, 900 imágenes tardarían aproximadamente 3 minutos. Para 9000 fotos tomaría 30 minutos. La buena noticia es que al introducir la concurrencia o el paralelismo, podemos acelerar esto drásticamente.
Todos los ejemplos de código posteriores solo mostrarán declaraciones de importación que son nuevas y específicas de esos ejemplos. Para mayor comodidad, todos estos scripts de Python se pueden encontrar en este repositorio de GitHub.
Concurrencia y paralelismo en Python: ejemplo de subprocesamiento
Threading es uno de los enfoques más conocidos para lograr la concurrencia y el paralelismo de Python. El subprocesamiento es una función que suele proporcionar el sistema operativo. Los subprocesos son más ligeros que los procesos y comparten el mismo espacio de memoria.
En este ejemplo de subprocesamiento de Python, escribiremos un nuevo módulo para reemplazar single.py
. Este módulo creará un grupo de ocho subprocesos, haciendo un total de nueve subprocesos, incluido el subproceso principal. Elegí ocho subprocesos de trabajo porque mi computadora tiene ocho núcleos de CPU y un subproceso de trabajo por núcleo parecía un buen número para cuántos subprocesos ejecutar a la vez. En la práctica, este número se elige con mucho más cuidado en función de otros factores, como otras aplicaciones y servicios que se ejecutan en la misma máquina.
Esto es casi igual que el anterior, con la excepción de que ahora tenemos una nueva clase, DownloadWorker
, que es descendiente de la clase Python Thread
. Se anuló el método de ejecución, que ejecuta un bucle infinito. En cada iteración, llama a self.queue.get()
para intentar obtener una URL desde una cola segura para subprocesos. Se bloquea hasta que haya un elemento en la cola para que el trabajador lo procese. Una vez que el trabajador recibe un elemento de la cola, llama al mismo método download_link
que se usó en el script anterior para descargar la imagen en el directorio de imágenes. Una vez finalizada la descarga, el trabajador señala a la cola que esa tarea ha finalizado. Esto es muy importante, porque Queue realiza un seguimiento de cuántas tareas se pusieron en cola. La llamada a queue.join()
bloquearía el hilo principal para siempre si los trabajadores no indicaran que completaron una tarea.
import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
Ejecutar este script de ejemplo de subprocesos de Python en la misma máquina utilizada anteriormente da como resultado un tiempo de descarga de 4,1 segundos. Eso es 4,7 veces más rápido que el ejemplo anterior. Si bien esto es mucho más rápido, vale la pena mencionar que solo se estaba ejecutando un subproceso a la vez durante este proceso debido a la GIL. Por lo tanto, este código es concurrente pero no paralelo. La razón por la que aún es más rápido es porque se trata de una tarea vinculada a IO. El procesador apenas suda mientras descarga estas imágenes, y la mayor parte del tiempo se pasa esperando a la red. Esta es la razón por la que los subprocesos múltiples de Python pueden proporcionar un gran aumento de velocidad. El procesador puede cambiar entre los subprocesos siempre que uno de ellos esté listo para realizar algún trabajo. El uso del módulo de subprocesos en Python o cualquier otro lenguaje interpretado con un GIL puede resultar en un rendimiento reducido. Si su código está realizando una tarea vinculada a la CPU, como descomprimir archivos gzip, el uso del módulo de threading
dará como resultado un tiempo de ejecución más lento. Para tareas vinculadas a la CPU y ejecución verdaderamente paralela, podemos usar el módulo de multiprocesamiento.
Si bien la implementación de Python de referencia de facto , CPython, tiene un GIL, esto no es cierto para todas las implementaciones de Python. Por ejemplo, IronPython, una implementación de Python que usa el marco .NET, no tiene un GIL, y tampoco Jython, la implementación basada en Java. Puede encontrar una lista de implementaciones de Python en funcionamiento aquí.
Concurrencia y paralelismo en Python Ejemplo 2: generación de múltiples procesos
El módulo de multiprocesamiento es más fácil de colocar que el módulo de subprocesos, ya que no necesitamos agregar una clase como el ejemplo de subprocesos de Python. Los únicos cambios que necesitamos hacer están en la función principal.
Para usar múltiples procesos, creamos un Pool
multiprocesamiento. Con el método de mapa que proporciona, pasaremos la lista de URL al grupo, que a su vez generará ocho nuevos procesos y usará cada uno para descargar las imágenes en paralelo. Este es un verdadero paralelismo, pero tiene un costo. Toda la memoria del script se copia en cada subproceso que se genera. En este ejemplo simple, no es un gran problema, pero puede convertirse fácilmente en una sobrecarga importante para programas no triviales.
import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
Concurrencia y paralelismo en Python Ejemplo 3: Distribuir a varios trabajadores
Si bien los módulos de subprocesamiento y multiprocesamiento son excelentes para los scripts que se ejecutan en su computadora personal, ¿qué debe hacer si desea que el trabajo se realice en una máquina diferente o necesita escalar a más de lo que la CPU en una máquina puede? ¿encargarse de? Un gran caso de uso para esto son las tareas de back-end de larga ejecución para aplicaciones web. Si tiene algunas tareas de ejecución prolongada, no desea activar un montón de subprocesos o subprocesos en la misma máquina que deben ejecutar el resto del código de su aplicación. Esto degradará el rendimiento de su aplicación para todos sus usuarios. Lo que sería genial es poder ejecutar estos trabajos en otra máquina o en muchas otras máquinas.
Una gran biblioteca de Python para esta tarea es RQ, una biblioteca muy simple pero poderosa. Primero pone en cola una función y sus argumentos usando la biblioteca. Esto selecciona la representación de la llamada de función, que luego se agrega a una lista de Redis. Poner en cola el trabajo es el primer paso, pero no hará nada todavía. También necesitamos al menos un trabajador para escuchar en esa cola de trabajo.

El primer paso es instalar y ejecutar un servidor Redis en su computadora, o tener acceso a un servidor Redis en ejecución. Después de eso, solo se realizan algunos pequeños cambios en el código existente. Primero creamos una instancia de RQ Queue y le pasamos una instancia de un servidor Redis de la biblioteca redis-py. Luego, en lugar de simplemente llamar a nuestro método download_link
, llamamos a q.enqueue(download_link, download_dir, link)
. El método enqueue toma una función como su primer argumento, luego cualquier otro argumento o argumento de palabra clave se pasa a esa función cuando el trabajo se ejecuta realmente.
Un último paso que tenemos que hacer es poner en marcha algunos trabajadores. RQ proporciona un script útil para ejecutar trabajadores en la cola predeterminada. Simplemente ejecute rqworker
en una ventana de terminal e iniciará un trabajador escuchando en la cola predeterminada. Asegúrese de que su directorio de trabajo actual sea el mismo en el que residen los scripts. Si desea escuchar una cola diferente, puede ejecutar rqworker queue_name
y escuchará esa cola nombrada. Lo mejor de RQ es que, siempre que pueda conectarse a Redis, puede ejecutar tantos trabajadores como desee en tantas máquinas diferentes como desee; por lo tanto, es muy fácil de escalar a medida que crece su aplicación. Aquí está la fuente de la versión RQ:
import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()
Sin embargo, RQ no es la única solución de cola de trabajos de Python. RQ es fácil de usar y cubre extremadamente bien los casos de uso simples, pero si se requieren opciones más avanzadas, se pueden usar otras soluciones de cola de Python 3 (como Celery).
Multiproceso de Python frente a multiprocesamiento
Si su código está vinculado a IO, tanto el multiprocesamiento como el multiproceso en Python funcionarán para usted. El multiprocesamiento es más fácil de usar que el subprocesamiento, pero tiene una sobrecarga de memoria más alta. Si su código está vinculado a la CPU, lo más probable es que el multiprocesamiento sea la mejor opción, especialmente si la máquina de destino tiene múltiples núcleos o CPU. Para aplicaciones web, y cuando necesite escalar el trabajo en varias máquinas, RQ será mejor para usted.
Actualizar
Python concurrent.futures
Algo nuevo desde Python 3.2 que no se mencionó en el artículo original es el paquete concurrent.futures
. Este paquete proporciona otra forma de usar la concurrencia y el paralelismo con Python.
En el artículo original, mencioné que el módulo de multiprocesamiento de Python sería más fácil de colocar en el código existente que el módulo de subprocesamiento. Esto se debió a que el módulo de subprocesos de Python 3 requería subclasificar la clase Thread
y también crear una Queue
para que los subprocesos monitorearan el trabajo.
El uso de concurrent.futures.ThreadPoolExecutor hace que el código de ejemplo de subprocesamiento de Python sea casi idéntico al módulo de multiprocesamiento.
import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()
Ahora que tenemos todas estas imágenes descargadas con nuestro ThreadPoolExecutor
de Python, podemos usarlas para probar una tarea vinculada a la CPU. Podemos crear versiones en miniatura de todas las imágenes en un script de proceso único y de un solo subproceso y luego probar una solución basada en multiprocesamiento.
Vamos a usar la biblioteca de Pillow para manejar el cambio de tamaño de las imágenes.
Aquí está nuestro script inicial.
import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
Este script itera sobre las rutas en la carpeta de images
y para cada ruta ejecuta la función create_thumbnail. Esta función usa Pillow para abrir la imagen, crear una miniatura y guardar la nueva imagen más pequeña con el mismo nombre que la original pero con _thumbnail
adjunto al nombre.
Ejecutar este script en 160 imágenes por un total de 36 millones lleva 2,32 segundos. Veamos si podemos acelerar esto usando un ProcessPoolExecutor.
import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
El método create_thumbnail
es idéntico al último script. La principal diferencia es la creación de un ProcessPoolExecutor
. El método del mapa del ejecutor se utiliza para crear las miniaturas en paralelo. De forma predeterminada, ProcessPoolExecutor
crea un subproceso por CPU. Ejecutar este script en las mismas 160 imágenes tomó 1,05 segundos, ¡2,2 veces más rápido!
Async/Await (solo Python 3.5+)
Uno de los elementos más solicitados en los comentarios del artículo original fue un ejemplo usando el módulo asyncio de Python 3. En comparación con los otros ejemplos, hay una nueva sintaxis de Python que puede ser nueva para la mayoría de las personas y también algunos conceptos nuevos. Una desafortunada capa adicional de complejidad es causada por el módulo urllib
incorporado de Python que no es asíncrono. Necesitaremos usar una biblioteca HTTP asíncrona para obtener todos los beneficios de asyncio. Para esto, usaremos aiohttp.
Pasemos directamente al código y seguiremos con una explicación más detallada.
import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)
Hay bastante que desempacar aquí. Comencemos con el punto de entrada principal del programa. Lo primero que hacemos con el módulo asyncio es obtener el bucle de eventos. El bucle de eventos maneja todo el código asíncrono. Luego, el bucle se ejecuta hasta que se completa y pasa la función main
. Hay una nueva sintaxis en la definición de main: async def
. También notará await
y with async
.
La sintaxis async/await se introdujo en PEP492. La sintaxis async def
marca una función como una rutina. Internamente, las rutinas se basan en generadores de Python, pero no son exactamente lo mismo. Las corrutinas devuelven un objeto de corrutina similar a cómo los generadores devuelven un objeto generador. Una vez que tiene una corrutina, obtiene sus resultados con la expresión await
. Cuando una corrutina llama await
, la ejecución de la corrutina se suspende hasta que finaliza el awaitable. Esta suspensión permite que se completen otros trabajos mientras la rutina está suspendida “en espera” de algún resultado. En general, este resultado será algún tipo de E/S como una solicitud de base de datos o, en nuestro caso, una solicitud HTTP.
La función download_link
tuvo que cambiarse bastante. Anteriormente, dependíamos de urllib
para hacer la mayor parte del trabajo de leer la imagen por nosotros. Ahora, para permitir que nuestro método funcione correctamente con el paradigma de programación asincrónica, hemos introducido un bucle while
que lee fragmentos de la imagen a la vez y suspende la ejecución mientras espera que se complete la E/S. Esto permite que el bucle de eventos se descargue a través de la descarga de las diferentes imágenes, ya que cada una tiene nuevos datos disponibles durante la descarga.
Debería haber una, preferiblemente solo una, manera obvia de hacerlo
Si bien el zen de Python nos dice que debería haber una forma obvia de hacer algo, Python tiene muchas formas de introducir la concurrencia en nuestros programas. El mejor método a elegir dependerá de su caso de uso específico. El paradigma asincrónico se adapta mejor a cargas de trabajo de alta simultaneidad (como un servidor web) en comparación con subprocesos o multiprocesamiento, pero requiere que su código (y dependencias) sean asincrónicos para poder beneficiarse por completo.
Con suerte, los ejemplos de subprocesos de Python en este artículo, y la actualización, lo orientarán en la dirección correcta para que tenga una idea de dónde buscar en la biblioteca estándar de Python si necesita introducir concurrencia en sus programas.