Python Multithreading and Multiprocessing البرنامج التعليمي
نشرت: 2022-03-11غالبًا ما تتحدث المناقشات التي تنتقد بايثون عن مدى صعوبة استخدام بايثون للعمل متعدد الخيوط ، مشيرةً بأصابع الاتهام إلى ما يُعرف باسم قفل المترجم العام (يشار إليه بمودة باسم GIL) الذي يمنع سلاسل متعددة من كود بايثون من العمل في وقت واحد. نتيجة لذلك ، لا تتصرف وحدة Python multithreading بالطريقة التي تتوقعها إذا لم تكن مطور Python وأنت قادم من لغات أخرى مثل C ++ أو Java. يجب توضيح أنه لا يزال بإمكان المرء كتابة تعليمات برمجية بلغة Python تعمل بشكل متزامن أو متوازٍ وتحدث فرقًا صارخًا في الأداء الناتج ، طالما يتم أخذ بعض الأشياء في الاعتبار. إذا لم تكن قد قرأته بعد ، أقترح عليك إلقاء نظرة على مقال إقبال القرآن عن التزامن والتوازي في روبي هنا على مدونة Toptal Engineering.
في هذا البرنامج التعليمي لمزامنة Python ، سنكتب نصًا برمجيًا صغيرًا من Python لتنزيل أفضل الصور الشائعة من Imgur. سنبدأ بإصدار يقوم بتنزيل الصور بالتتابع ، أو واحدًا تلو الآخر. كشرط أساسي ، سيكون عليك تسجيل تطبيق على Imgur. إذا لم يكن لديك حساب Imgur بالفعل ، يرجى إنشاء حساب أولاً.
تم اختبار البرامج النصية في أمثلة الترابط هذه باستخدام Python 3.6.4. مع بعض التغييرات ، يجب أن تعمل أيضًا مع Python 2 - urllib هو أكثر ما تغير بين هذين الإصدارين من Python.
الشروع في العمل مع Python Multithreading
دعونا نبدأ بإنشاء وحدة Python ، تسمى download.py
. سيحتوي هذا الملف على جميع الوظائف اللازمة لجلب قائمة الصور وتنزيلها. سنقسم هذه الوظائف إلى ثلاث وظائف منفصلة:
-
get_links
-
download_link
-
setup_download_dir
سيتم استخدام الوظيفة الثالثة ، setup_download_dir
، لإنشاء دليل وجهة التنزيل إذا لم يكن موجودًا بالفعل.
تتطلب واجهة برمجة تطبيقات Imgur طلبات HTTP لتحمل رأس Authorization
مع معرف العميل. يمكنك العثور على معرف العميل هذا من لوحة التحكم الخاصة بالتطبيق الذي قمت بتسجيله على Imgur ، وستكون الاستجابة بترميز JSON. يمكننا استخدام مكتبة JSON القياسية في Python لفك تشفيرها. يعد تنزيل الصورة مهمة أبسط ، حيث أن كل ما عليك فعله هو جلب الصورة من خلال عنوان URL الخاص بها وكتابتها في ملف.
هذا ما يبدو عليه البرنامج النصي:
import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir
بعد ذلك ، سنحتاج إلى كتابة وحدة تستخدم هذه الوظائف لتنزيل الصور ، واحدة تلو الأخرى. سنقوم بتسمية هذا single.py
. سيحتوي هذا على الوظيفة الرئيسية لإصدارنا الأول الساذج من برنامج تنزيل الصور Imgur. ستقوم الوحدة باسترداد معرّف عميل Imgur في متغير البيئة IMGUR_CLIENT_ID
. سوف يستدعي setup_download_dir
لإنشاء دليل وجهة التنزيل. أخيرًا ، سيحضر قائمة بالصور باستخدام وظيفة get_links
، ويقوم بتصفية جميع عناوين URL الخاصة بـ GIF والألبوم ، ثم يستخدم download_link
(رابط_التنزيل) لتنزيل كل صورة من هذه الصور وحفظها على القرص. إليك ما يبدو عليه single.py
:
import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
على الكمبيوتر المحمول الخاص بي ، استغرق هذا النص 19.4 ثانية لتنزيل 91 صورة. يرجى ملاحظة أن هذه الأرقام قد تختلف بناءً على الشبكة التي تستخدمها. 19.4 ثانية ليست طويلة بشكل رهيب ، ولكن ماذا لو أردنا تنزيل المزيد من الصور؟ ربما 900 صورة ، بدلاً من 90. بمتوسط 0.2 ثانية لكل صورة ، تستغرق 900 صورة حوالي 3 دقائق. 9000 صورة سيستغرق 30 دقيقة. الخبر السار هو أنه من خلال تقديم التزامن أو التوازي ، يمكننا تسريع ذلك بشكل كبير.
ستعرض جميع أمثلة التعليمات البرمجية اللاحقة عبارات الاستيراد الجديدة والمحددة لتلك الأمثلة فقط. للراحة ، يمكن العثور على كل نصوص Python النصية هذه في مستودع GitHub هذا.
التزامن والتوازي في بايثون: مثال خيوط
يعد الترابط أحد أكثر الأساليب شهرة لتحقيق التزامن والتوازي في بايثون. خيوط المعالجة هي ميزة يتم توفيرها عادةً بواسطة نظام التشغيل. الخيوط أخف من العمليات ، وتشترك في نفس مساحة الذاكرة.
في مثال خيوط Python هذا ، سنكتب وحدة نمطية جديدة لتحل محل single.py
. ستنشئ هذه الوحدة مجموعة من ثمانية خيوط ، مما يجعل إجمالي تسعة خيوط بما في ذلك الخيط الرئيسي. لقد اخترت ثمانية خيوط للعمال لأن جهاز الكمبيوتر الخاص بي يحتوي على ثمانية نوى لوحدة المعالجة المركزية ويبدو أن مؤشر ترابط عامل واحد لكل نواة يبدو رقمًا جيدًا لعدد مؤشرات الترابط التي سيتم تشغيلها مرة واحدة. من الناحية العملية ، يتم اختيار هذا الرقم بعناية أكبر بناءً على عوامل أخرى ، مثل التطبيقات والخدمات الأخرى التي تعمل على نفس الجهاز.
هذا هو نفسه تقريبًا مثل الفئة السابقة ، باستثناء أنه لدينا الآن فئة جديدة ، DownloadWorker
، والتي تنحدر من فئة Python Thread
. تم تجاوز طريقة التشغيل ، والتي تدير حلقة لا نهائية. في كل تكرار ، يستدعي self.queue.get()
لمحاولة جلب عنوان URL من قائمة انتظار آمنة لمؤشر الترابط. يتم حظره حتى يكون هناك عنصر في قائمة الانتظار ليقوم العامل بمعالجته. بمجرد أن يتلقى العامل عنصرًا من قائمة الانتظار ، فإنه يستدعي نفس طريقة رابط download_link
التي تم استخدامها في البرنامج النصي السابق لتنزيل الصورة إلى دليل الصور. بعد انتهاء التنزيل ، يشير العامل إلى قائمة الانتظار بإنجاز هذه المهمة. هذا مهم جدًا ، لأن قائمة الانتظار تتعقب عدد المهام التي تم إدراجها في قائمة الانتظار. سيؤدي استدعاء queue.join()
إلى حظر الخيط الرئيسي إلى الأبد إذا لم يشر العمال إلى أنهم أكملوا مهمة.
import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
تشغيل هذا البرنامج النصي لمثال خيوط Python على نفس الجهاز المستخدم سابقًا ينتج عنه وقت تنزيل يبلغ 4.1 ثانية! هذا 4.7 مرة أسرع من المثال السابق. في حين أن هذا أسرع بكثير ، من الجدير بالذكر أنه تم تنفيذ مؤشر ترابط واحد فقط في كل مرة خلال هذه العملية بسبب GIL. لذلك ، هذا الرمز متزامن ولكنه ليس موازيًا. السبب في أنها لا تزال أسرع لأن هذه مهمة ملزمة IO. لا يكاد المعالج يكسر عرقًا أثناء تنزيل هذه الصور ، ويقضي معظم الوقت في انتظار الشبكة. هذا هو السبب في أن تعدد مؤشرات الترابط في Python يمكن أن يوفر زيادة كبيرة في السرعة. يمكن للمعالج التبديل بين الخيوط متى كان أحدهم جاهزًا للقيام ببعض الأعمال. يمكن أن يؤدي استخدام وحدة الترابط في Python أو أي لغة مفسرة أخرى مع GIL إلى انخفاض الأداء. إذا كانت التعليمات البرمجية الخاصة بك تؤدي مهمة مرتبطة بوحدة المعالجة المركزية ، مثل فك ضغط ملفات gzip ، فسيؤدي استخدام وحدة threading
إلى إبطاء وقت التنفيذ. بالنسبة للمهام المرتبطة بوحدة المعالجة المركزية والتنفيذ المتوازي حقًا ، يمكننا استخدام وحدة المعالجة المتعددة.
بينما يحتوي تطبيق Python المرجعي الفعلي - CPython - على GIL ، فإن هذا لا ينطبق على جميع تطبيقات Python. على سبيل المثال ، لا يحتوي تطبيق IronPython ، وهو تطبيق Python باستخدام إطار عمل .NET ، على GIL ، وكذلك Jython ، وهو التطبيق المستند إلى Java. يمكنك العثور على قائمة بتطبيقات بايثون العاملة هنا.
التزامن والتوازي في بايثون مثال 2: إنتاج عمليات متعددة
من الأسهل إدخال وحدة المعالجة المتعددة مقارنة بوحدة الترابط ، حيث لا نحتاج إلى إضافة فئة مثل مثال خيوط Python. التغييرات الوحيدة التي نحتاج إلى إجرائها هي في الوظيفة الرئيسية.
لاستخدام عمليات متعددة ، نقوم بإنشاء Pool
متعدد المعالجات. باستخدام طريقة الخريطة التي توفرها ، سنقوم بتمرير قائمة عناوين URL إلى المجموعة ، والتي بدورها ستنتج ثماني عمليات جديدة وتستخدم كل عملية لتنزيل الصور بشكل متوازٍ. هذا توازي حقيقي ، لكن له تكلفة. يتم نسخ ذاكرة النص بالكامل في كل عملية فرعية يتم إنتاجها. في هذا المثال البسيط ، ليست مشكلة كبيرة ، ولكن يمكن بسهولة أن تصبح عبئًا خطيرًا للبرامج غير التافهة.
import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()
التزامن والتوازي في بايثون مثال 3: التوزيع على عدة عمال
في حين أن وحدات الترابط والمعالجة المتعددة تعتبر رائعة بالنسبة للبرامج النصية التي يتم تشغيلها على جهاز الكمبيوتر الشخصي الخاص بك ، فماذا يجب أن تفعل إذا كنت تريد أن يتم العمل على جهاز مختلف ، أو كنت بحاجة إلى توسيع نطاقه إلى أكثر مما تستطيعه وحدة المعالجة المركزية على جهاز واحد مقبض؟ حالة الاستخدام الرائعة لهذا هي المهام الخلفية طويلة المدى لتطبيقات الويب. إذا كان لديك بعض المهام طويلة الأمد ، فلا تريد أن تقوم بتدوير مجموعة من العمليات الفرعية أو سلاسل الرسائل على نفس الجهاز والتي تحتاج إلى تشغيل بقية كود التطبيق الخاص بك. سيؤدي ذلك إلى تدهور أداء تطبيقك لجميع المستخدمين. ما سيكون رائعًا هو أن تكون قادرًا على تشغيل هذه الوظائف على جهاز آخر ، أو العديد من الأجهزة الأخرى.
مكتبة Python الرائعة لهذه المهمة هي RQ ، وهي مكتبة بسيطة جدًا لكنها قوية. تقوم أولاً بإدراج دالة ووسيطاتها باستخدام المكتبة. يؤدي هذا إلى اختيار تمثيل استدعاء الوظيفة ، والذي يتم إلحاقه بعد ذلك بقائمة Redis. إن إدراج الوظيفة في قائمة الانتظار هو الخطوة الأولى ، ولكنها لن تفعل شيئًا بعد. نحتاج أيضًا إلى عامل واحد على الأقل للاستماع إلى قائمة انتظار الوظيفة هذه.
تتمثل الخطوة الأولى في تثبيت خادم Redis وتشغيله على جهاز الكمبيوتر الخاص بك ، أو الوصول إلى خادم Redis قيد التشغيل. بعد ذلك ، لا يوجد سوى عدد قليل من التغييرات الصغيرة التي تم إجراؤها على الكود الحالي. نقوم أولاً بإنشاء مثيل لـ RQ Queue ونمرره مثيلاً لخادم Redis من مكتبة redis-py. بعد ذلك ، بدلاً من مجرد استدعاء طريقة رابط download_link
، نسمي q.enqueue(download_link, download_dir, link)
. تأخذ طريقة enqueue وظيفة باعتبارها الوسيطة الأولى لها ، ثم يتم تمرير أي وسيطات أو وسيطات أساسية أخرى إلى هذه الوظيفة عند تنفيذ المهمة بالفعل.

الخطوة الأخيرة التي يتعين علينا القيام بها هي بدء تشغيل بعض العمال. يوفر RQ نصًا مفيدًا لتشغيل العمال في قائمة الانتظار الافتراضية. ما عليك سوى تشغيل rqworker
في نافذة طرفية وسيبدأ العامل بالاستماع إلى قائمة الانتظار الافتراضية. يرجى التأكد من أن دليل العمل الحالي الخاص بك هو نفسه حيث توجد البرامج النصية. إذا كنت تريد الاستماع إلى قائمة انتظار مختلفة ، فيمكنك تشغيل rqworker queue_name
وسوف يستمع إلى قائمة الانتظار المسماة تلك. إن الشيء العظيم في RQ هو أنه طالما يمكنك الاتصال بـ Redis ، يمكنك تشغيل أكبر عدد تريده من العمال على أي عدد تريده من الأجهزة المختلفة ؛ لذلك ، من السهل جدًا التوسع مع نمو تطبيقك. هذا هو مصدر إصدار RQ:
import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()
ومع ذلك ، فإن RQ ليس هو حل قائمة انتظار الوظائف الوحيد في Python. RQ سهل الاستخدام ويغطي حالات الاستخدام البسيطة بشكل جيد للغاية ، ولكن إذا كانت هناك حاجة إلى المزيد من الخيارات المتقدمة ، فيمكن استخدام حلول قائمة انتظار Python 3 الأخرى (مثل الكرفس).
تعدد خيوط البايثون مقابل المعالجة المتعددة
إذا كانت التعليمات البرمجية الخاصة بك مرتبطة بـ IO ، فستعمل المعالجة المتعددة والمعالجات المتعددة في Python من أجلك. تعد المعالجة المتعددة أسهل في الإسقاط فقط من الخيوط ولكن بها ذاكرة أعلى. إذا كانت التعليمات البرمجية الخاصة بك مرتبطة بوحدة المعالجة المركزية ، فمن المرجح أن تكون المعالجة المتعددة هي الخيار الأفضل - خاصة إذا كان الجهاز المستهدف يحتوي على عدة مراكز أو وحدات معالجة مركزية. بالنسبة لتطبيقات الويب ، وعندما تحتاج إلى توسيع نطاق العمل عبر أجهزة متعددة ، سيكون RQ أفضل بالنسبة لك.
تحديث
بايثون concurrent.futures
شيء جديد منذ Python 3.2 لم يتم التطرق إليه في المقالة الأصلية هو حزمة concurrent.futures
. توفر هذه الحزمة طريقة أخرى لاستخدام التزامن والتوازي مع Python.
في المقالة الأصلية ، ذكرت أن وحدة المعالجة المتعددة في Python سيكون من الأسهل إدخالها في الكود الحالي مقارنة بوحدة الترابط. كان هذا بسبب أن وحدة خيوط Python 3 تتطلب تصنيفًا فرعيًا لفئة Thread
وأيضًا إنشاء Queue
المراد مراقبتها للعمل.
يؤدي استخدام concurrent.futures.ThreadPoolExecutor إلى جعل رمز مثال خيوط Python مطابقًا تقريبًا لوحدة المعالجة المتعددة.
import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()
الآن بعد أن تم تنزيل كل هذه الصور باستخدام Python ThreadPoolExecutor
، يمكننا استخدامها لاختبار مهمة مرتبطة بوحدة المعالجة المركزية. يمكننا إنشاء نسخ مصغرة من جميع الصور في كلٍّ من برنامج نصي أحادي العملية وعملية واحدة ثم اختبار حل قائم على المعالجة المتعددة.
سنستخدم مكتبة الوسادة للتعامل مع تغيير حجم الصور.
هنا نصنا الأولي.
import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
يتكرر هذا البرنامج النصي عبر المسارات في مجلد images
ولكل مسار يتم تشغيل وظيفة create_thumbnail. تستخدم هذه الوظيفة وسادة لفتح الصورة ، وإنشاء صورة مصغرة ، وحفظ الصورة الجديدة الأصغر بنفس اسم الصورة الأصلية ولكن مع _thumbnail
بالاسم.
يستغرق تشغيل هذا البرنامج النصي على 160 صورة بإجمالي 36 مليون 2.32 ثانية. لنرى ما إذا كان يمكننا تسريع هذا باستخدام ProcessPoolExecutor.
import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. Eg: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
يتطابق أسلوب create_thumbnail
الأسلوب النصي الأخير. الاختلاف الرئيسي هو إنشاء ProcessPoolExecutor
. يتم استخدام طريقة خريطة المنفذ لإنشاء الصور المصغرة بالتوازي. بشكل افتراضي ، ينشئ ProcessPoolExecutor
عملية فرعية واحدة لكل وحدة معالجة مركزية. تشغيل هذا البرنامج النصي على نفس 160 صورة استغرق 1.05 ثانية - 2.2 مرة أسرع!
Async / Await (Python 3.5+ فقط)
أحد العناصر الأكثر طلبًا في التعليقات على المقالة الأصلية كان على سبيل المثال باستخدام وحدة asyncio في Python 3. مقارنة بالأمثلة الأخرى ، هناك بعض بناء جملة Python الجديد الذي قد يكون جديدًا لمعظم الناس وكذلك بعض المفاهيم الجديدة. طبقة إضافية مؤسفة من التعقيد ناتجة عن عدم تزامن وحدة urllib
المدمجة في Python. سنحتاج إلى استخدام مكتبة غير متزامنة HTTP للحصول على الفوائد الكاملة لـ Asyncio. لهذا ، سنستخدم aiohttp.
دعنا ننتقل مباشرة إلى الكود وسيتبع ذلك شرح أكثر تفصيلاً.
import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)
هناك الكثير لتفريغه هنا. لنبدأ بنقطة الدخول الرئيسية للبرنامج. أول شيء جديد نقوم به مع وحدة asyncio هو الحصول على حلقة الحدث. تعالج حلقة الحدث كل التعليمات البرمجية غير المتزامنة. بعد ذلك ، يتم تشغيل الحلقة حتى اكتمالها وتمرير الوظيفة main
. هناك جزء من بناء الجملة الجديد في تعريف main: async def
. ستلاحظ أيضًا await
with async
.
تم تقديم صيغة غير متزامن / انتظار في PEP492. يشير بناء جملة async def
إلى وظيفة كـ coroutine. داخليًا ، تعتمد coroutines على مولدات Python ، ولكنها ليست نفس الشيء تمامًا. تقوم Coroutines بإرجاع كائن coroutine مشابه لكيفية إرجاع المولدات لكائن المولد. بمجرد حصولك على coroutine ، تحصل على نتائجه مع تعبير await
. عندما await
مكالمات coroutine ، يتم تعليق تنفيذ coroutine حتى يكتمل المنتظر. يسمح هذا التعليق بإكمال الأعمال الأخرى أثناء تعليق coroutine "في انتظار" بعض النتائج. بشكل عام ، ستكون هذه النتيجة نوعًا من الإدخال / الإخراج مثل طلب قاعدة بيانات أو في حالتنا طلب HTTP.
كان لابد من تغيير وظيفة download_link
بشكل كبير. في السابق ، كنا نعتمد على urllib
للقيام بالعبء الأكبر من عمل قراءة الصورة لنا. الآن ، للسماح لطريقتنا بالعمل بشكل صحيح مع نموذج البرمجة غير المتزامن ، قدمنا حلقة while
التي تقرأ أجزاء من الصورة في كل مرة وتوقف التنفيذ أثناء انتظار اكتمال الإدخال / الإخراج. يسمح هذا لحلقة الحدث بالتكرار من خلال تنزيل الصور المختلفة حيث أن كل واحدة لديها بيانات جديدة متاحة أثناء التنزيل.
يجب أن يكون هناك طريقة واحدة واضحة - ويفضل أن تكون واحدة فقط - للقيام بذلك
بينما يخبرنا zen of Python أنه يجب أن تكون هناك طريقة واحدة واضحة للقيام بشيء ما ، هناك العديد من الطرق في Python لإدخال التزامن في برامجنا. أفضل طريقة للاختيار ستعتمد على حالة الاستخدام المحددة الخاصة بك. يتناسب النموذج غير المتزامن بشكل أفضل مع أعباء العمل عالية التزامن (مثل خادم الويب) مقارنةً بالترابط أو المعالجة المتعددة ، ولكنه يتطلب أن تكون التعليمات البرمجية (والاعتماديات) غير متزامنة من أجل الاستفادة الكاملة.
نأمل أن تشير أمثلة خيوط Python في هذه المقالة - والتحديث - إلى الاتجاه الصحيح بحيث يكون لديك فكرة عن مكان البحث في مكتبة Python القياسية إذا كنت بحاجة إلى إدخال التزامن في برامجك.