Python Multithreading และ Multiprocessing Tutorial

เผยแพร่แล้ว: 2022-03-11
หมายเหตุ: ตามคำขอที่เป็นที่นิยม ฉันได้สาธิตเทคนิคทางเลือกบางอย่าง ซึ่งรวมถึง async/await ใช้ได้เฉพาะตั้งแต่การถือกำเนิดของ Python 3.5 เท่านั้น ฉันได้เพิ่มการอัปเดตบางส่วนไว้ที่ส่วนท้ายของบทความ สนุก!

การสนทนาที่วิพากษ์วิจารณ์ Python มักพูดถึงความยากลำบากในการใช้ Python สำหรับงานแบบมัลติเธรด โดยชี้ไปที่สิ่งที่เรียกว่าล็อกล่ามส่วนกลาง (GIL) ที่ป้องกันไม่ให้โค้ด Python หลายเธรดทำงานพร้อมกัน ด้วยเหตุนี้ โมดูลมัลติเธรดของ Python จึงทำงานได้ไม่ดีตามที่คุณคาดหวัง หากคุณไม่ใช่นักพัฒนา Python และคุณมาจากภาษาอื่น เช่น C++ หรือ Java จะต้องทำให้ชัดเจนว่าเรายังคงสามารถเขียนโค้ดใน Python ที่ทำงานพร้อมกันหรือแบบคู่ขนานและสร้างความแตกต่างอย่างมากในผลลัพธ์ที่ได้ ตราบใดที่มีการพิจารณาบางสิ่ง หากคุณยังไม่ได้อ่าน เราขอแนะนำให้คุณดูบทความของ Eqbal Quran เกี่ยวกับการทำงานพร้อมกันและความเท่าเทียมกันใน Ruby ที่นี่ในบล็อก Toptal Engineering

ในบทช่วยสอนเกี่ยวกับการทำงานพร้อมกันของ Python นี้ เราจะเขียนสคริปต์ Python ขนาดเล็กเพื่อดาวน์โหลดรูปภาพยอดนิยมจาก Imgur เราจะเริ่มต้นด้วยเวอร์ชันที่ดาวน์โหลดภาพตามลำดับหรือทีละภาพ คุณจะต้องลงทะเบียนแอปพลิเคชันบน Imgur เป็นข้อกำหนดเบื้องต้น หากคุณยังไม่มีบัญชี Imgur โปรดสร้างบัญชีก่อน

สคริปต์ในตัวอย่างเธรดเหล่านี้ได้รับการทดสอบด้วย Python 3.6.4 ด้วยการเปลี่ยนแปลงบางอย่าง พวกเขาควรรันด้วย Python 2—urllib คือสิ่งที่เปลี่ยนแปลงมากที่สุดระหว่าง Python สองเวอร์ชันนี้

เริ่มต้นใช้งาน Python Multithreading

ให้เราเริ่มต้นด้วยการสร้างโมดูล Python ชื่อ download.py ไฟล์นี้จะมีฟังก์ชันทั้งหมดที่จำเป็นในการดึงรายการรูปภาพและดาวน์โหลด เราจะแบ่งฟังก์ชันเหล่านี้ออกเป็นสามฟังก์ชันแยกกัน:

  • get_links
  • download_link
  • setup_download_dir

ฟังก์ชันที่สาม setup_download_dir จะใช้เพื่อสร้างไดเร็กทอรีปลายทางสำหรับการดาวน์โหลด หากยังไม่มีอยู่

API ของ Imgur ต้องใช้คำขอ HTTP เพื่อให้มีส่วนหัว Authorization ที่มีรหัสไคลเอ็นต์ คุณสามารถค้นหา ID ไคลเอ็นต์นี้ได้จากแดชบอร์ดของแอปพลิเคชันที่คุณได้ลงทะเบียนไว้บน Imgur และการตอบสนองจะถูกเข้ารหัส JSON เราสามารถใช้ไลบรารี JSON มาตรฐานของ Python เพื่อถอดรหัสได้ การดาวน์โหลดรูปภาพเป็นงานที่ง่ายกว่า เนื่องจากสิ่งที่คุณต้องทำคือดึงรูปภาพโดยใช้ URL ของรูปภาพและเขียนลงในไฟล์

นี่คือลักษณะของสคริปต์:

 import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir

ต่อไป เราจะต้องเขียนโมดูลที่จะใช้ฟังก์ชั่นเหล่านี้เพื่อดาวน์โหลดภาพทีละภาพ เราจะตั้งชื่อ single.py นี้ ซึ่งจะประกอบด้วยฟังก์ชันหลักของโปรแกรมดาวน์โหลดรูปภาพ Imgur เวอร์ชันแรกของเราที่ไร้เดียงสา โมดูลจะดึง ID ไคลเอ็นต์ Imgur ในตัวแปรสภาพแวดล้อม IMGUR_CLIENT_ID มันจะเรียกใช้ setup_download_dir เพื่อสร้างไดเร็กทอรีปลายทางดาวน์โหลด สุดท้าย มันจะดึงรายการรูปภาพโดยใช้ฟังก์ชัน get_links กรอง URL ของ GIF และอัลบั้มทั้งหมด จากนั้นใช้ download_link เพื่อดาวน์โหลดและบันทึกแต่ละภาพเหล่านั้นลงในดิสก์ นี่คือสิ่งที่ single.py ดูเหมือน:

 import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

บนแล็ปท็อปของฉัน สคริปต์นี้ใช้เวลา 19.4 วินาทีในการดาวน์โหลด 91 ภาพ โปรดทราบว่าตัวเลขเหล่านี้อาจแตกต่างกันไปตามเครือข่ายที่คุณอยู่ 19.4 วินาทีนั้นไม่นานนัก แต่ถ้าเราต้องการดาวน์โหลดรูปภาพเพิ่มเติมล่ะ บางที 900 ภาพแทนที่จะเป็น 90 โดยเฉลี่ย 0.2 วินาทีต่อภาพ 900 ภาพจะใช้เวลาประมาณ 3 นาที สำหรับ 9000 ภาพจะใช้เวลา 30 นาที ข่าวดีก็คือว่าด้วยการแนะนำการทำงานพร้อมกันหรือการขนานกัน เราสามารถเร่งความเร็วได้อย่างมาก

ตัวอย่างโค้ดที่ตามมาทั้งหมดจะแสดงเฉพาะคำสั่งนำเข้าที่ใหม่และเฉพาะเจาะจงสำหรับตัวอย่างเหล่านั้น เพื่อความสะดวก สามารถดูสคริปต์ Python เหล่านี้ได้ในที่เก็บ GitHub นี้

การทำงานพร้อมกันและความเท่าเทียมกันใน Python: ตัวอย่างเธรด

การทำเกลียวเป็นหนึ่งในวิธีการที่รู้จักกันดีที่สุดในการบรรลุการทำงานพร้อมกันและความเท่าเทียมกันของ Python การทำเกลียวเป็นคุณลักษณะที่ระบบปฏิบัติการมักจะให้มา เธรดมีน้ำหนักเบากว่ากระบวนการ และใช้พื้นที่หน่วยความจำร่วมกัน

Python multithreading memory model

ในตัวอย่างเธรด Python นี้ เราจะเขียนโมดูลใหม่เพื่อแทนที่ single.py โมดูลนี้จะสร้างพูลแปดเธรด รวมเป็นเก้าเธรดรวมถึงเธรดหลัก ฉันเลือกเธรดผู้ปฏิบัติงานแปดเธรดเนื่องจากคอมพิวเตอร์ของฉันมีคอร์ CPU แปดคอร์และเธรดผู้ปฏิบัติงานหนึ่งคอร์ต่อคอร์ดูเหมือนจะเป็นตัวเลขที่ดีสำหรับจำนวนเธรดที่จะรันพร้อมกัน ในทางปฏิบัติ ตัวเลขนี้ถูกเลือกอย่างระมัดระวังมากขึ้นโดยพิจารณาจากปัจจัยอื่นๆ เช่น แอปพลิเคชันและบริการอื่นๆ ที่ทำงานบนเครื่องเดียวกัน

ซึ่งเกือบจะเหมือนกับคลาสก่อนหน้านี้ ยกเว้นว่าตอนนี้เรามีคลาสใหม่ DownloadWorker ซึ่งเป็นคลาสที่สืบทอดมาจากคลาส Python Thread วิธีการรันถูกแทนที่ ซึ่งรันการวนซ้ำแบบอนันต์ ในการทำซ้ำทุกครั้ง จะเรียก self.queue.get() เพื่อลองดึง URL จากคิวที่ปลอดภัยสำหรับเธรด บล็อกจนกว่าจะมีรายการในคิวให้ผู้ปฏิบัติงานดำเนินการ เมื่อผู้ปฏิบัติงานได้รับรายการจากคิว จากนั้นจะเรียกเมธอด download_link เดียวกับที่ใช้ในสคริปต์ก่อนหน้าเพื่อดาวน์โหลดภาพไปยังไดเร็กทอรีภาพ หลังจากการดาวน์โหลดเสร็จสิ้น ผู้ปฏิบัติงานจะส่งสัญญาณไปยังคิวว่างานนั้นเสร็จสิ้นแล้ว สิ่งนี้สำคัญมาก เนื่องจากคิวจะติดตามจำนวนงานที่ถูกจัดคิว การเรียกใช้ queue.join() จะบล็อกเธรดหลักอย่างถาวรหากผู้ปฏิบัติงานไม่ได้ส่งสัญญาณว่าพวกเขาทำงานเสร็จ

 import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

การรันสคริปต์ตัวอย่างเธรด Python นี้บนเครื่องเดียวกันที่ใช้ผลลัพธ์ก่อนหน้านี้ในการดาวน์โหลด 4.1 วินาที! ซึ่งเร็วกว่าตัวอย่างก่อนหน้านี้ 4.7 เท่า แม้ว่าการดำเนินการนี้จะเร็วกว่ามาก แต่ก็ควรค่าแก่การกล่าวถึงว่ามีการดำเนินการเพียงเธรดเดียวในแต่ละครั้งตลอดกระบวนการนี้เนื่องจาก GIL ดังนั้นรหัสนี้จึงเกิดขึ้นพร้อมกันแต่ไม่ขนานกัน เหตุผลที่มันยังเร็วกว่านั้นเป็นเพราะนี่เป็นงานที่ผูกกับ IO โปรเซสเซอร์แทบจะไม่ต้องเหนื่อยเลยในขณะที่ดาวน์โหลดภาพเหล่านี้ และส่วนใหญ่ก็ใช้ไปกับการรอเครือข่าย นี่คือเหตุผลที่ Python multithreading สามารถเพิ่มความเร็วได้มาก โปรเซสเซอร์สามารถสลับไปมาระหว่างเธรดเมื่อใดก็ตามที่เธรดใดพร้อมที่จะทำงาน การใช้โมดูลเธรดใน Python หรือภาษาอื่นๆ ที่ตีความด้วย GIL อาจส่งผลให้ประสิทธิภาพลดลงได้ หากโค้ดของคุณทำงานที่ผูกไว้กับ CPU เช่น การคลายการบีบอัดไฟล์ gzip การใช้โมดูล threading จะทำให้เวลาดำเนินการช้าลง สำหรับงานที่เกี่ยวข้องกับ CPU และการดำเนินการแบบขนานอย่างแท้จริง เราสามารถใช้โมดูลการประมวลผลหลายตัวได้

แม้ว่าการใช้งาน Python อ้างอิง ตามพฤตินัย —CPython– มี GIL สิ่งนี้ไม่เป็นความจริงสำหรับการนำ Python ไปใช้ทั้งหมด ตัวอย่างเช่น IronPython ซึ่งเป็นการนำ Python ไปใช้งานโดยใช้ .NET framework ไม่มี GIL และ Jython ก็เช่นกัน ซึ่งเป็นการนำ Java ไปใช้ คุณสามารถดูรายการการใช้งาน Python ที่ใช้งานได้ที่นี่

ที่เกี่ยวข้อง: Python Best Practices and Tips โดย Toptal Developers

การทำงานพร้อมกันและความเท่าเทียมกันใน Python ตัวอย่างที่ 2: การวางไข่หลายกระบวนการ

โมดูลมัลติโพรเซสซิงจะดร็อปอินได้ง่ายกว่าโมดูลเธรด เนื่องจากเราไม่จำเป็นต้องเพิ่มคลาส เช่น ตัวอย่างการทำเธรดของ Python การเปลี่ยนแปลงเพียงอย่างเดียวที่เราต้องทำคือในหน้าที่หลัก

บทช่วยสอนการประมวลผลหลายตัวของ Python: โมดูล

ในการใช้หลายโปรเซส เราสร้าง multiprocessing Pool ด้วยวิธีแผนที่ที่มีให้ เราจะส่งรายการ URL ไปยังพูล ซึ่งจะทำให้เกิดกระบวนการใหม่แปดขั้นตอน และใช้แต่ละขั้นตอนเพื่อดาวน์โหลดภาพแบบขนานกัน นี่คือความขนานที่แท้จริง แต่มาพร้อมกับต้นทุน หน่วยความจำทั้งหมดของสคริปต์จะถูกคัดลอกไปยังกระบวนการย่อยที่สร้างขึ้น ในตัวอย่างง่ายๆ นี้ ไม่ใช่เรื่องใหญ่ แต่อาจกลายเป็นค่าใช้จ่ายที่ร้ายแรงสำหรับโปรแกรมที่ไม่สำคัญได้ง่ายๆ

 import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

การทำงานพร้อมกันและความเท่าเทียมกันใน Python ตัวอย่างที่ 3: การแจกจ่ายไปยังผู้ปฏิบัติงานหลายคน

แม้ว่าโมดูลเธรดและมัลติโพรเซสซิงจะดีสำหรับสคริปต์ที่ทำงานบนคอมพิวเตอร์ส่วนบุคคลของคุณ คุณควรทำอย่างไรหากต้องการให้งานเสร็จสิ้นในเครื่องอื่น หรือคุณต้องการขยายขนาดให้มากกว่า CPU ในเครื่องเดียว ด้ามจับ? กรณีการใช้งานที่ยอดเยี่ยมสำหรับสิ่งนี้คืองานแบ็คเอนด์ที่ใช้เวลานานสำหรับเว็บแอปพลิเคชัน หากคุณมีงานที่ต้องใช้เวลานาน คุณไม่ต้องการที่จะรวมกระบวนการย่อยหรือเธรดจำนวนมากในเครื่องเดียวกันที่จำเป็นต้องเรียกใช้โค้ดแอปพลิเคชันที่เหลือของคุณ การดำเนินการนี้จะลดประสิทธิภาพของแอปพลิเคชันสำหรับผู้ใช้ทั้งหมดของคุณ สิ่งที่จะดีมากคือสามารถเรียกใช้งานเหล่านี้บนเครื่องอื่นหรือเครื่องอื่นๆ ได้อีกมากมาย

ไลบรารี Python ที่ยอดเยี่ยมสำหรับงานนี้คือ RQ ซึ่งเป็นไลบรารีที่เรียบง่ายแต่ทรงพลัง ก่อนอื่นคุณต้องจัดคิวฟังก์ชันและอาร์กิวเมนต์โดยใช้ไลบรารี สิ่งนี้จะทำหน้าที่แทนการเรียกใช้ฟังก์ชัน ซึ่งต่อท้ายรายการ Redis การเข้าคิวงานเป็นขั้นตอนแรก แต่ยังไม่ได้ทำอะไรเลย นอกจากนี้เรายังต้องการคนงานอย่างน้อยหนึ่งคนเพื่อฟังคิวงานนั้น

แบบจำลองของไลบรารีคิว RQ Python

ขั้นตอนแรกคือการติดตั้งและเรียกใช้เซิร์ฟเวอร์ Redis บนคอมพิวเตอร์ของคุณ หรือเข้าถึงเซิร์ฟเวอร์ Redis ที่กำลังทำงานอยู่ หลังจากนั้น มีการเปลี่ยนแปลงเพียงเล็กน้อยในโค้ดที่มีอยู่ ขั้นแรก เราสร้างอินสแตนซ์ของคิว RQ และส่งผ่านอินสแตนซ์ของเซิร์ฟเวอร์ Redis จากไลบรารี redis-py จากนั้น แทนที่จะเรียกวิธีการ download_link ของเรา เราเรียก q.enqueue(download_link, download_dir, link) วิธีการ enqueue ใช้ฟังก์ชันเป็นอาร์กิวเมนต์แรก จากนั้นอาร์กิวเมนต์หรืออาร์กิวเมนต์ของคีย์เวิร์ดอื่นๆ จะถูกส่งต่อไปยังฟังก์ชันนั้นเมื่องานถูกดำเนินการจริง

ขั้นตอนสุดท้ายที่เราต้องทำคือเริ่มต้นพนักงานบางคน RQ จัดเตรียมสคริปต์ที่ใช้งานง่ายเพื่อรันผู้ปฏิบัติงานบนคิวเริ่มต้น เพียงเรียกใช้ rqworker ในหน้าต่างเทอร์มินัล และมันจะเริ่มให้ผู้ปฏิบัติงานฟังคิวเริ่มต้น โปรดตรวจสอบให้แน่ใจว่าไดเร็กทอรีการทำงานปัจจุบันของคุณเหมือนกับที่สคริปต์อยู่ หากคุณต้องการฟังคิวอื่น คุณสามารถเรียกใช้ rqworker queue_name และโปรแกรมจะฟังคิวที่มีชื่อนั้น สิ่งที่ยอดเยี่ยมเกี่ยวกับ RQ ก็คือ ตราบใดที่คุณสามารถเชื่อมต่อกับ Redis ได้ คุณก็สามารถเรียกใช้พนักงานได้มากเท่าที่คุณต้องการบนเครื่องต่างๆ มากมายเท่าที่คุณต้องการ ดังนั้นจึงเป็นเรื่องง่ายมากที่จะขยายขนาดเมื่อแอปพลิเคชันของคุณเติบโตขึ้น นี่คือที่มาของเวอร์ชัน RQ:

 import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()

อย่างไรก็ตาม RQ ไม่ใช่โซลูชันคิวงาน Python เดียว RQ ใช้งานง่ายและครอบคลุมกรณีการใช้งานทั่วไปได้เป็นอย่างดี แต่ถ้าจำเป็นต้องมีตัวเลือกขั้นสูงเพิ่มเติม สามารถใช้โซลูชันคิว Python 3 อื่นๆ (เช่น Celery) ได้

Python Multithreading กับ Multiprocessing

หากโค้ดของคุณถูกผูกไว้กับ IO ทั้งมัลติโพรเซสซิงและมัลติเธรดใน Python จะทำงานให้คุณ การประมวลผลหลายตัวทำได้ง่ายกว่าการทำเธรดแต่มีค่าใช้จ่ายหน่วยความจำสูงกว่า หากโค้ดของคุณถูกผูกไว้กับ CPU การประมวลผลหลายตัวน่าจะเป็นทางเลือกที่ดีกว่า โดยเฉพาะอย่างยิ่งถ้าเครื่องเป้าหมายมีหลายคอร์หรือซีพียู สำหรับเว็บแอปพลิเคชัน และเมื่อคุณต้องการปรับขนาดงานในเครื่องหลายเครื่อง RQ จะดีกว่าสำหรับคุณ

ที่เกี่ยวข้อง: เป็นขั้นสูงมากขึ้น: หลีกเลี่ยง 10 ข้อผิดพลาดที่พบบ่อยที่สุดที่โปรแกรมเมอร์ Python ทำ

อัปเดต

Python concurrent.futures

สิ่งใหม่ตั้งแต่ Python 3.2 ที่ไม่ได้กล่าวถึงในบทความต้นฉบับคือแพ็คเกจ concurrent.futures แพ็คเกจนี้เป็นอีกวิธีหนึ่งในการใช้การทำงานพร้อมกันและความขนานกับ Python

ในบทความต้นฉบับ ฉันกล่าวว่าโมดูลการประมวลผลหลายตัวของ Python จะวางลงในโค้ดที่มีอยู่ได้ง่ายกว่าโมดูลเธรด นี่เป็นเพราะโมดูลเธรด Python 3 จำเป็นต้องมีคลาสย่อยของคลาส Thread และสร้าง Queue สำหรับเธรดเพื่อตรวจสอบการทำงาน

การใช้ concurrent.futures.ThreadPoolExecutor ทำให้โค้ดตัวอย่างการทำเธรด Python เกือบจะเหมือนกับโมดูลการประมวลผลหลายตัว

 import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()

ตอนนี้เราได้ดาวน์โหลดรูปภาพทั้งหมดด้วย Python ThreadPoolExecutor แล้ว เราจึงสามารถใช้รูปภาพเหล่านี้เพื่อทดสอบงานที่ผูกกับ CPU ได้ เราสามารถสร้างภาพขนาดย่อของรูปภาพทั้งหมดได้ทั้งในสคริปต์แบบเธรดเดียวและแบบกระบวนการเดียว จากนั้นจึงทดสอบโซลูชันที่ใช้หลายการประมวลผล

เราจะใช้ไลบรารี่ Pillow เพื่อจัดการการปรับขนาดรูปภาพ

นี่คือสคริปต์เริ่มต้นของเรา

 import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

สคริปต์นี้จะวนซ้ำบนพาธในโฟลเดอร์ images และสำหรับแต่ละพาธจะรันฟังก์ชัน create_thumbnail ฟังก์ชันนี้ใช้ Pillow เพื่อเปิดรูปภาพ สร้างภาพขนาดย่อ และบันทึกรูปภาพใหม่ที่เล็กกว่าโดยใช้ชื่อเดียวกันกับต้นฉบับ แต่มี _thumbnail ต่อท้ายชื่อ

การเรียกใช้สคริปต์นี้บนภาพ 160 ภาพรวม 36 ล้านใช้เวลา 2.32 วินาที มาดูกันว่าเราสามารถเร่งความเร็วโดยใช้ ProcessPoolExecutor ได้หรือไม่

 import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

เมธอด create_thumbnail เหมือนกับสคริปต์สุดท้าย ความแตกต่างหลักคือการสร้าง ProcessPoolExecutor วิธีการแมปของผู้ดำเนินการใช้เพื่อสร้างภาพขนาดย่อในแบบคู่ขนาน ตามค่าเริ่มต้น ProcessPoolExecutor จะสร้างหนึ่งกระบวนการย่อยต่อ CPU การเรียกใช้สคริปต์นี้บนภาพ 160 ภาพเดียวกันใช้เวลา 1.05 วินาที—เร็วกว่า 2.2 เท่า!

ไม่ซิงค์/รอ (Python 3.5+ เท่านั้น)

หนึ่งในรายการที่ร้องขอมากที่สุดในความคิดเห็นในบทความต้นฉบับคือตัวอย่างการใช้โมดูล asyncio ของ Python 3 เมื่อเทียบกับตัวอย่างอื่น ๆ มีไวยากรณ์ Python ใหม่ที่อาจเป็นสิ่งใหม่สำหรับคนส่วนใหญ่และแนวคิดใหม่บางอย่าง เลเยอร์ความซับซ้อนเพิ่มเติมที่โชคร้ายนั้นเกิดจากโมดูล urllib ในตัวของ Python ที่ไม่เป็นแบบอะซิงโครนัส เราจะต้องใช้ไลบรารี HTTP แบบ async เพื่อรับประโยชน์เต็มที่จาก asyncio สำหรับสิ่งนี้ เราจะใช้ aiohttp

ไปที่โค้ดกันเลย แล้วคำอธิบายโดยละเอียดเพิ่มเติมจะตามมา

 import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)

มีค่อนข้างน้อยที่จะแกะที่นี่ เริ่มต้นด้วยจุดเริ่มต้นหลักของโปรแกรม สิ่งแรกที่เราทำกับโมดูล asyncio คือการรับลูปเหตุการณ์ วนรอบเหตุการณ์จัดการโค้ดอะซิงโครนัสทั้งหมด จากนั้นวนซ้ำจะทำงานจนเสร็จและส่งผ่านฟังก์ชัน main มีส่วนของไวยากรณ์ใหม่ในคำจำกัดความของ main: async def คุณจะสังเกตเห็นด้วย await และ with async

ไวยากรณ์ async/await ถูกนำมาใช้ใน PEP492 ไวยากรณ์ async def ทำเครื่องหมายฟังก์ชันเป็น coroutine ภายใน Coroutines นั้นใช้ตัวสร้าง Python แต่ไม่เหมือนกันทุกประการ Coroutines ส่งคืนวัตถุ coroutine คล้ายกับวิธีที่ตัวสร้างส่งคืนวัตถุตัวสร้าง เมื่อคุณมีคอรูทีนแล้ว คุณจะได้ผลลัพธ์ด้วยนิพจน์ await เมื่อ coroutine เรียกใช้ await การดำเนินการของ coroutine จะถูกระงับจนกว่าการรอจะเสร็จสิ้น ระบบกันสะเทือนนี้ช่วยให้งานอื่นๆ เสร็จได้ในขณะที่คอร์รูทีนถูกระงับ "รอ" ผลลัพธ์บางอย่าง โดยทั่วไป ผลลัพธ์นี้จะเป็น I/O บางประเภท เช่น คำขอฐานข้อมูล หรือคำขอ HTTP ในกรณีของเรา

ฟังก์ชัน download_link จะต้องมีการเปลี่ยนแปลงค่อนข้างมาก ก่อนหน้านี้ เราใช้ urllib ในการอ่านรูปภาพให้เราอย่างเต็มที่ ในตอนนี้ เพื่อให้วิธีการของเราทำงานได้อย่างถูกต้องกับกระบวนทัศน์การเขียนโปรแกรมแบบอะซิงโครนัส เราจึงได้แนะนำการวนซ้ำแบบ while ที่จะอ่านทีละส่วนของอิมเมจและหยุดการทำงานชั่วคราวในขณะที่รอให้ I/O เสร็จสิ้น วิธีนี้ทำให้การวนซ้ำของเหตุการณ์สามารถวนซ้ำผ่านการดาวน์โหลดภาพต่างๆ เนื่องจากแต่ละภาพมีข้อมูลใหม่ให้ใช้ได้ในระหว่างการดาวน์โหลด

ควรมีหนึ่งวิธี—ควรมีเพียงวิธีเดียวเท่านั้น—วิธีที่ชัดเจนในการทำสิ่งนี้

ในขณะที่ zen ของ Python บอกเราว่าควรมีวิธีหนึ่งที่ชัดเจนในการทำบางสิ่ง แต่ก็มีหลายวิธีใน Python ที่จะแนะนำการทำงานพร้อมกันในโปรแกรมของเรา วิธีที่ดีที่สุดในการเลือกจะขึ้นอยู่กับกรณีการใช้งานเฉพาะของคุณ กระบวนทัศน์แบบอะซิงโครนัสจะปรับขนาดได้ดีกว่าสำหรับเวิร์กโหลดที่มีการทำงานพร้อมกันสูง (เช่น เว็บเซิร์ฟเวอร์) เมื่อเทียบกับเธรดหรือการประมวลผลหลายรายการ แต่ต้องการให้โค้ดของคุณ (และการขึ้นต่อกัน) เป็นแบบอะซิงโครนัสเพื่อให้เกิดประโยชน์สูงสุด

หวังว่าตัวอย่าง Python threading ในบทความนี้ และการอัปเดต จะนำคุณไปในทิศทางที่ถูกต้อง เพื่อให้คุณมีความคิดที่จะมองหาที่ใดในไลบรารีมาตรฐาน Python หากคุณต้องการแนะนำการทำงานพร้อมกันในโปรแกรมของคุณ