Tutorial de multithreading e multiprocessamento do Python
Publicados: 2022-03-11As discussões que criticam o Python geralmente falam sobre como é difícil usar o Python para trabalho multithread, apontando o dedo para o que é conhecido como o bloqueio global do interpretador (carinhosamente chamado de GIL) que impede que vários threads do código Python sejam executados simultaneamente. Devido a isso, o módulo multithreading Python não se comporta da maneira que você esperaria se você não for um desenvolvedor Python e estiver vindo de outras linguagens, como C++ ou Java. Deve ficar claro que ainda é possível escrever código em Python que seja executado simultaneamente ou em paralelo e faça uma grande diferença no desempenho resultante, desde que certas coisas sejam levadas em consideração. Se você ainda não leu, sugiro que dê uma olhada no artigo do Eqbal Quran sobre simultaneidade e paralelismo em Ruby aqui no Toptal Engineering Blog.
Neste tutorial de simultaneidade do Python, escreveremos um pequeno script Python para baixar as principais imagens populares do Imgur. Começaremos com uma versão que baixa imagens sequencialmente, ou uma de cada vez. Como pré-requisito, você terá que registrar um aplicativo no Imgur. Se você ainda não possui uma conta Imgur, crie uma primeiro.
Os scripts nesses exemplos de encadeamento foram testados com o Python 3.6.4. Com algumas mudanças, eles também devem ser executados com o Python 2 - urllib é o que mais mudou entre essas duas versões do Python.
Introdução ao Python Multithreading
Vamos começar criando um módulo Python, chamado download.py
. Este arquivo conterá todas as funções necessárias para buscar a lista de imagens e baixá-las. Vamos dividir essas funcionalidades em três funções separadas:
-
get_links
-
download_link
-
setup_download_dir
A terceira função, setup_download_dir
, será usada para criar um diretório de destino de download se ele ainda não existir.
A API do Imgur requer que as solicitações HTTP tenham o cabeçalho Authorization
com o ID do cliente. Você pode encontrar esse ID de cliente no painel do aplicativo que você registrou no Imgur e a resposta será codificada em JSON. Podemos usar a biblioteca JSON padrão do Python para decodificá-lo. Baixar a imagem é uma tarefa ainda mais simples, pois tudo o que você precisa fazer é buscar a imagem pelo seu URL e gravá-la em um arquivo.
É assim que o script se parece:
import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir
Em seguida, precisaremos escrever um módulo que usará essas funções para baixar as imagens, uma a uma. Vamos nomear este single.py
. Isso conterá a função principal da nossa primeira versão ingênua do downloader de imagens Imgur. O módulo irá recuperar o ID do cliente Imgur na variável de ambiente IMGUR_CLIENT_ID
. Ele invocará o setup_download_dir
para criar o diretório de destino do download. Finalmente, ele buscará uma lista de imagens usando a função get_links
, filtrará todos os GIFs e URLs de álbuns e, em seguida, usará download_link
para baixar e salvar cada uma dessas imagens no disco. Aqui está a aparência do single.py
:
import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
No meu laptop, esse script levou 19,4 segundos para baixar 91 imagens. Observe que esses números podem variar de acordo com a rede em que você está. 19,4 segundos não é muito longo, mas e se quisermos baixar mais fotos? Talvez 900 imagens, em vez de 90. Com uma média de 0,2 segundos por imagem, 900 imagens levariam aproximadamente 3 minutos. Para 9000 fotos levaria 30 minutos. A boa notícia é que, ao introduzir a simultaneidade ou o paralelismo, podemos acelerar isso drasticamente.
Todos os exemplos de código subsequentes mostrarão apenas instruções de importação que são novas e específicas para esses exemplos. Por conveniência, todos esses scripts Python podem ser encontrados neste repositório do GitHub.
Simultaneidade e Paralelismo em Python: Exemplo de Threading
Threading é uma das abordagens mais conhecidas para alcançar a simultaneidade e o paralelismo do Python. Threading é um recurso normalmente fornecido pelo sistema operacional. Threads são mais leves que processos e compartilham o mesmo espaço de memória.
Neste exemplo de threading Python, escreveremos um novo módulo para substituir single.py
. Este módulo criará um pool de oito threads, perfazendo um total de nove threads, incluindo o thread principal. Escolhi oito threads de trabalho porque meu computador tem oito núcleos de CPU e um thread de trabalho por núcleo parecia um bom número para quantos threads seriam executados de uma só vez. Na prática, esse número é escolhido com muito mais cuidado com base em outros fatores, como outros aplicativos e serviços executados na mesma máquina.
Isso é quase o mesmo que o anterior, com a exceção de que agora temos uma nova classe, DownloadWorker
, que é descendente da classe Thread
do Python. O método run foi substituído, que executa um loop infinito. Em cada iteração, ele chama self.queue.get()
para tentar buscar uma URL de uma fila thread-safe. Ele bloqueia até que haja um item na fila para o trabalhador processar. Depois que o trabalhador recebe um item da fila, ele chama o mesmo método download_link
usado no script anterior para baixar a imagem no diretório de imagens. Após a conclusão do download, o trabalhador sinaliza à fila que essa tarefa foi concluída. Isso é muito importante, porque a Fila registra quantas tarefas foram enfileiradas. A chamada para queue.join()
bloquearia o encadeamento principal para sempre se os trabalhadores não sinalizassem que concluíram uma tarefa.
import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
A execução deste script de exemplo de threading Python na mesma máquina usada anteriormente resulta em um tempo de download de 4,1 segundos! Isso é 4,7 vezes mais rápido que o exemplo anterior. Embora isso seja muito mais rápido, vale a pena mencionar que apenas um thread estava sendo executado por vez durante todo esse processo devido ao GIL. Portanto, esse código é concorrente, mas não paralelo. A razão pela qual ainda é mais rápido é porque esta é uma tarefa vinculada a E/S. O processador mal está suando ao baixar essas imagens, e a maior parte do tempo é gasto esperando pela rede. É por isso que o multithreading do Python pode fornecer um grande aumento de velocidade. O processador pode alternar entre as threads sempre que uma delas estiver pronta para fazer algum trabalho. Usar o módulo de encadeamento em Python ou qualquer outra linguagem interpretada com um GIL pode resultar em desempenho reduzido. Se o seu código estiver executando uma tarefa vinculada à CPU, como descompactar arquivos gzip, o uso do módulo de threading
resultará em um tempo de execução mais lento. Para tarefas vinculadas à CPU e execução verdadeiramente paralela, podemos usar o módulo de multiprocessamento.
Embora a implementação do Python de referência de fato - CPython - tenha um GIL, isso não é verdade para todas as implementações do Python. Por exemplo, IronPython, uma implementação do Python que usa a estrutura .NET, não possui um GIL, nem Jython, a implementação baseada em Java. Você pode encontrar uma lista de implementações Python em funcionamento aqui.
Simultaneidade e paralelismo em Python Exemplo 2: gerando vários processos
O módulo de multiprocessamento é mais fácil de incluir do que o módulo de encadeamento, pois não precisamos adicionar uma classe como o exemplo de encadeamento do Python. As únicas alterações que precisamos fazer estão na função main.
Para usar vários processos, criamos um Pool
multiprocessamento. Com o método map que ele fornece, passaremos a lista de URLs para o pool, que por sua vez gerará oito novos processos e usará cada um para baixar as imagens em paralelo. Este é o verdadeiro paralelismo, mas vem com um custo. Toda a memória do script é copiada em cada subprocesso gerado. Neste exemplo simples, não é grande coisa, mas pode facilmente se tornar uma sobrecarga séria para programas não triviais.
import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
Simultaneidade e paralelismo em Python Exemplo 3: Distribuindo para vários workers
Embora os módulos de threading e multiprocessamento sejam ótimos para scripts que estão sendo executados em seu computador pessoal, o que você deve fazer se quiser que o trabalho seja feito em uma máquina diferente ou se precisar escalar para mais do que a CPU em uma máquina pode lidar? Um ótimo caso de uso para isso são as tarefas de back-end de longa duração para aplicativos da Web. Se você tiver algumas tarefas de execução longa, não deseja ativar vários subprocessos ou threads na mesma máquina que precisam executar o restante do código do aplicativo. Isso degradará o desempenho do seu aplicativo para todos os seus usuários. O que seria ótimo é poder executar esses trabalhos em outra máquina, ou em muitas outras máquinas.
Uma ótima biblioteca Python para esta tarefa é a RQ, uma biblioteca muito simples e poderosa. Você primeiro enfileira uma função e seus argumentos usando a biblioteca. Isso reduz a representação da chamada de função, que é anexada a uma lista Redis. Enfileirar o trabalho é o primeiro passo, mas ainda não fará nada. Também precisamos de pelo menos um trabalhador para ouvir nessa fila de trabalho.
A primeira etapa é instalar e executar um servidor Redis em seu computador ou ter acesso a um servidor Redis em execução. Depois disso, há apenas algumas pequenas alterações feitas no código existente. Primeiro, criamos uma instância de uma fila RQ e passamos a ela uma instância de um servidor Redis da biblioteca redis-py. Então, em vez de apenas chamar nosso método download_link
, chamamos q.enqueue(download_link, download_dir, link)
. O método enqueue recebe uma função como seu primeiro argumento e, em seguida, quaisquer outros argumentos ou argumentos de palavra-chave são passados para essa função quando o trabalho é realmente executado.

Um último passo que precisamos fazer é iniciar alguns trabalhadores. O RQ fornece um script útil para executar workers na fila padrão. Basta executar rqworker
em uma janela de terminal e ele iniciará um trabalhador ouvindo na fila padrão. Certifique-se de que seu diretório de trabalho atual seja o mesmo onde os scripts residem. Se você quiser ouvir uma fila diferente, você pode executar rqworker queue_name
e ele escutará essa fila nomeada. O melhor do RQ é que, contanto que você possa se conectar ao Redis, você pode executar quantos trabalhadores quiser em quantas máquinas diferentes desejar; portanto, é muito fácil escalar conforme seu aplicativo cresce. Aqui está a fonte para a versão RQ:
import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()
No entanto, o RQ não é a única solução de fila de tarefas do Python. O RQ é fácil de usar e cobre casos de uso simples extremamente bem, mas se forem necessárias opções mais avançadas, outras soluções de fila do Python 3 (como Celery) podem ser usadas.
Python Multithreading x Multiprocessamento
Se o seu código estiver vinculado a IO, tanto o multiprocessamento quanto o multithreading em Python funcionarão para você. O multiprocessamento é mais fácil de usar do que o threading, mas tem uma sobrecarga de memória maior. Se o seu código estiver vinculado à CPU, o multiprocessamento provavelmente será a melhor escolha, especialmente se a máquina de destino tiver vários núcleos ou CPUs. Para aplicativos da Web e quando você precisar dimensionar o trabalho em várias máquinas, o RQ será melhor para você.
Atualizar
Python concurrent.futures
Algo novo desde o Python 3.2 que não foi abordado no artigo original é o pacote concurrent.futures
. Este pacote fornece mais uma maneira de usar simultaneidade e paralelismo com Python.
No artigo original, mencionei que o módulo de multiprocessamento do Python seria mais fácil de inserir no código existente do que o módulo de encadeamento. Isso ocorreu porque o módulo de encadeamento do Python 3 exigia a subclassificação da classe Thread
e também a criação de uma Queue
para os encadeamentos monitorarem o trabalho.
O uso de um concurrent.futures.ThreadPoolExecutor torna o código de exemplo de threading do Python quase idêntico ao módulo de multiprocessamento.
import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()
Agora que temos todas essas imagens baixadas com nosso ThreadPoolExecutor
do Python, podemos usá-las para testar uma tarefa vinculada à CPU. Podemos criar versões em miniatura de todas as imagens em um script de processo único de thread único e, em seguida, testar uma solução baseada em multiprocessamento.
Vamos usar a biblioteca Pillow para lidar com o redimensionamento das imagens.
Aqui está o nosso script inicial.
import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
Este script itera sobre os caminhos na pasta de images
e para cada caminho ele executa a função create_thumbnail. Esta função usa Pillow para abrir a imagem, criar uma miniatura e salvar a nova imagem menor com o mesmo nome da original, mas com _thumbnail
anexado ao nome.
A execução deste script em 160 imagens totalizando 36 milhões leva 2,32 segundos. Vamos ver se podemos acelerar isso usando um ProcessPoolExecutor.
import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
O método create_thumbnail
é idêntico ao último script. A principal diferença é a criação de um ProcessPoolExecutor
. O método map do executor é usado para criar as miniaturas em paralelo. Por padrão, o ProcessPoolExecutor
cria um subprocesso por CPU. A execução deste script nas mesmas 160 imagens levou 1,05 segundos — 2,2 vezes mais rápido!
Async/Await (somente Python 3.5+)
Um dos itens mais solicitados nos comentários do artigo original foi um exemplo usando o módulo assíncrono do Python 3. Comparado com os outros exemplos, há uma nova sintaxe do Python que pode ser nova para a maioria das pessoas e também alguns novos conceitos. Uma infeliz camada adicional de complexidade é causada pelo módulo urllib
integrado do Python não ser assíncrono. Precisaremos usar uma biblioteca HTTP assíncrona para obter todos os benefícios do assíncrono. Para isso, usaremos aiohttp.
Vamos pular direto para o código e uma explicação mais detalhada seguirá.
import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)
Há um pouco para descompactar aqui. Vamos começar com o ponto de entrada principal do programa. A primeira novidade que fazemos com o módulo assíncrono é obter o loop de eventos. O loop de eventos manipula todo o código assíncrono. Em seguida, o loop é executado até completar e passar a função main
. Há uma nova sintaxe na definição de main: async def
. Você também notará await
e with async
.
A sintaxe async/await foi introduzida no PEP492. A sintaxe async def
marca uma função como uma corrotina. Internamente, as corrotinas são baseadas em geradores Python, mas não são exatamente a mesma coisa. Corrotinas retornam um objeto corrotina semelhante a como os geradores retornam um objeto gerador. Uma vez que você tenha uma corrotina, você obtém seus resultados com a expressão await
. Quando uma corrotina chama await
, a execução da corrotina é suspensa até que o awaitable seja concluído. Essa suspensão permite que outros trabalhos sejam concluídos enquanto a corrotina está suspensa “aguardando” algum resultado. Em geral, esse resultado será algum tipo de E/S como uma solicitação de banco de dados ou, no nosso caso, uma solicitação HTTP.
A função download_link
teve que ser alterada de forma bastante significativa. Anteriormente, dependíamos do urllib
para fazer o trabalho de leitura da imagem para nós. Agora, para permitir que nosso método funcione corretamente com o paradigma de programação assíncrona, introduzimos um loop while
que lê partes da imagem por vez e suspende a execução enquanto aguarda a conclusão da E/S. Isso permite que o loop de eventos percorra o download das diferentes imagens, pois cada uma tem novos dados disponíveis durante o download.
Deve haver uma - de preferência apenas uma - maneira óbvia de fazer isso
Embora o zen do Python nos diga que deve haver uma maneira óbvia de fazer algo, há muitas maneiras no Python de introduzir a simultaneidade em nossos programas. O melhor método a ser escolhido dependerá do seu caso de uso específico. O paradigma assíncrono é melhor dimensionado para cargas de trabalho de alta simultaneidade (como um servidor da Web) em comparação com threading ou multiprocessamento, mas exige que seu código (e dependências) seja assíncrono para se beneficiar totalmente.
Espero que os exemplos de encadeamento do Python neste artigo – e atualização – o apontem na direção certa para que você tenha uma ideia de onde procurar na biblioteca padrão do Python se precisar introduzir simultaneidade em seus programas.