مقدمة في البرمجة المتزامنة: دليل المبتدئين

نشرت: 2022-03-11

ما هي البرمجة المتزامنة؟ وصفها ببساطة ، عندما تفعل أكثر من شيء في نفس الوقت. لا ينبغي الخلط بينه وبين التوازي ، فإن التزامن هو عندما يتم تشغيل تسلسلات متعددة من العمليات في فترات متداخلة من الوقت. في عالم البرمجة ، يعد التزامن موضوعًا معقدًا جدًا. قد يكون التعامل مع التركيبات مثل الخيوط والأقفال وتجنب المشكلات مثل ظروف السباق والمآزق أمرًا مرهقًا للغاية ، مما يجعل من الصعب كتابة البرامج المتزامنة. من خلال التزامن ، يمكن تصميم البرامج كعمليات مستقلة تعمل معًا في تركيبة محددة. قد يكون مثل هذا الهيكل متوازيًا وقد لا يكون ؛ ومع ذلك ، فإن تحقيق مثل هذا الهيكل في برنامجك يوفر مزايا عديدة.

مقدمة في البرمجة المتزامنة

في هذه المقالة ، سوف نلقي نظرة على عدد من نماذج التزامن المختلفة ، وكيفية تحقيقها في لغات برمجة مختلفة مصممة للتزامن.

نموذج الدولة المتغير المشترك

لنلق نظرة على مثال بسيط مع عداد وخيطين يزيدانه. لا ينبغي أن يكون البرنامج معقدًا للغاية. لدينا كائن يحتوي على عداد يزداد مع زيادة الطريقة ، ويسترده باستخدام طريقة get واثنين من مؤشرات الترابط التي تزيده.

 // // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

هذا البرنامج الساذج ليس ساذجًا كما يبدو للوهلة الأولى. عندما أقوم بتشغيل هذا البرنامج مرات أكثر أحصل على نتائج مختلفة. توجد ثلاث قيم بعد ثلاث عمليات تنفيذ على الكمبيوتر المحمول الخاص بي.

 java Counting 553706 java Counting 547818 java Counting 613014

ما سبب هذا السلوك غير المتوقع؟ يقوم البرنامج بزيادة العداد في مكان واحد ، في طريقة الزيادة التي تستخدم الأمر counter ++. إذا نظرنا إلى كود بايت الأمر ، فسنرى أنه يتكون من عدة أجزاء:

  1. قراءة قيمة العداد من الذاكرة
  2. زيادة القيمة محليًا
  3. تخزين قيمة العداد في الذاكرة

الآن يمكننا تخيل الخطأ الذي يمكن أن يحدث في هذا التسلسل. إذا كان لدينا خيطان يعملان على زيادة العداد بشكل مستقل ، فيمكننا الحصول على هذا السيناريو:

  1. قيمة العداد هي 115
  2. الخيط الأول يقرأ قيمة العداد من الذاكرة (115)
  3. يزيد الخيط الأول من قيمة العداد المحلي (116)
  4. الخيط الثاني يقرأ قيمة العداد من الذاكرة (115)
  5. الخيط الثاني يزيد من قيمة العداد المحلي (116)
  6. الخيط الثاني يحفظ قيمة العداد المحلي في الذاكرة (116)
  7. الخيط الأول يحفظ قيمة العداد المحلي في الذاكرة (116)
  8. قيمة العداد 116

في هذا السيناريو ، يتم تشابك خيطين بحيث يتم زيادة قيمة العداد بمقدار 1 ، ولكن يجب زيادة قيمة العداد بمقدار 2 لأن كل خيط يزيدها بمقدار 1. يؤثر تشابك الخيوط المختلفة على نتيجة البرنامج. سبب عدم القدرة على التنبؤ بالبرنامج هو أن البرنامج لا يتحكم في تشابك مؤشر الترابط ولكن نظام التشغيل. في كل مرة يتم فيها تنفيذ البرنامج ، يمكن أن تتشابك الخيوط بشكل مختلف. بهذه الطريقة أدخلنا عدم القدرة على التنبؤ العرضي (اللاحتمية) للبرنامج.

لإصلاح هذا عدم القدرة على التنبؤ العرضي (عدم الحتمية) ، يجب أن يتحكم البرنامج في تشابك مؤشر الترابط. عندما يكون هناك خيط واحد في الطريقة ، يجب ألا يكون الخيط الآخر بنفس الطريقة حتى يخرج الأول منه. بهذه الطريقة نقوم بتسلسل الوصول إلى طريقة الزيادة.

 // // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

الحل الآخر هو استخدام عداد يمكن أن يزيد ذريًا ، مما يعني أنه لا يمكن فصل العملية إلى عمليات متعددة. بهذه الطريقة ، لا نحتاج إلى كتل من التعليمات البرمجية التي تحتاج إلى المزامنة. تحتوي Java على أنواع بيانات ذرية في مساحة الاسم java.util.concurrent.atomic ، وسنستخدم AtomicInteger.

 // // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

يحتوي العدد الصحيح الذري على العمليات التي نحتاجها ، لذا يمكننا استخدامه بدلاً من فئة Counter. من المثير للاهتمام أن نلاحظ أن جميع طرق atomicinteger لا تستخدم القفل ، بحيث لا توجد إمكانية للمآزق ، مما يسهل تصميم البرنامج.

يجب أن يؤدي استخدام الكلمات الأساسية المتزامنة لمزامنة الأساليب الحرجة إلى حل جميع المشكلات ، أليس كذلك؟ دعنا نتخيل أن لدينا حسابين يمكن الإيداع والسحب والتحويل إلى حساب آخر. ماذا يحدث إذا أردنا في نفس الوقت تحويل الأموال من حساب إلى آخر والعكس صحيح؟ لنلقي نظرة على مثال.

 // // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

عندما أقوم بتشغيل هذا البرنامج على جهاز الكمبيوتر المحمول ، عادةً ما يتعطل. لماذا يحدث هذا؟ إذا نظرنا عن كثب ، يمكننا أن نرى أنه عندما نقوم بتحويل الأموال ، فإننا ندخل في طريقة التحويل التي تتم مزامنتها ونؤمن الوصول إلى جميع الطرق المتزامنة على الحساب المصدر ، ثم يقفل حساب الوجهة الذي يقفل الوصول إلى جميع الطرق المتزامنة عليه.

تخيل السيناريو التالي:

  1. نقل مكالمات سلسلة الرسائل الأولى من حساب Bob إلى حساب Joe
  2. نقل مكالمات سلسلة الرسائل الثانية من حساب Joe إلى حساب Bob
  3. الخيط الثاني يقلل المبلغ من حساب Joe
  4. يذهب الخيط الثاني لإيداع المبلغ في حساب Bob لكنه ينتظر الخيط الأول لإكمال التحويل.
  5. الخيط الأول يقلل المبلغ من حساب بوب
  6. يذهب الخيط الأول لإيداع المبلغ في حساب Joe لكنه ينتظر حتى يكتمل التحويل الثاني.

في هذا السيناريو ، ينتظر مؤشر ترابط آخر حتى ينتهي النقل والعكس صحيح. إنهم عالقون مع بعضهم البعض ولا يمكن للبرنامج أن يستمر. هذا يسمى الجمود. لتجنب الجمود ، من الضروري قفل الحسابات بنفس الترتيب. لإصلاح البرنامج ، سنمنح كل حساب رقمًا فريدًا حتى نتمكن من قفل الحسابات بنفس الترتيب عند تحويل الأموال.

 // // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

بسبب عدم القدرة على التنبؤ بمثل هذه الأخطاء ، فإنها تحدث أحيانًا ، ولكن ليس دائمًا ويصعب تكرارها. إذا كان البرنامج يتصرف بشكل غير متوقع ، فعادةً ما يكون بسبب التزامن الذي يقدم عدم الحتمية العرضي. لتجنب اللاحتمية العرضية ، يجب أن نأخذ في الاعتبار جميع حالات التزاوج.

مثال على برنامج به عدم حتمية عرضي.

 // // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

يحتوي هذا البرنامج على عدم حتمية عرضي. سيتم عرض آخر قيمة تم إدخالها في الحاوية.

 java NonDeterminism Slow

ستدخل الخيوط الأبطأ القيمة لاحقًا ، وستتم طباعة هذه القيمة (بطيئة). لكن هذا لا يجب أن يكون هو الحال. ماذا لو نفذ الكمبيوتر في نفس الوقت برنامجًا آخر يحتاج إلى الكثير من موارد وحدة المعالجة المركزية؟ ليس لدينا ما يضمن أنه سيكون الخيط الأبطأ الذي يدخل القيمة أخيرًا لأنه يتحكم فيه نظام التشغيل ، وليس البرنامج. يمكن أن يكون لدينا مواقف حيث يعمل البرنامج على جهاز كمبيوتر والآخر يتصرف بشكل مختلف. يصعب العثور على مثل هذه الأخطاء وتسبب المتاعب للمطورين. لكل هذه الأسباب ، من الصعب جدًا تنفيذ نموذج التزامن هذا بشكل صحيح.

الطريقة الوظيفية

تماثل

لنلق نظرة على نموذج آخر تستخدمه اللغات الوظيفية. على سبيل المثال سوف نستخدم Clojure ، والتي يمكن تفسيرها باستخدام الأداة Leiningen. Clojure هي لغة مثيرة للاهتمام للغاية مع دعم جيد للتزامن. كان نموذج التزامن السابق بحالة مشتركة قابلة للتغيير. يمكن أن يكون للفئات التي نستخدمها أيضًا حالة مخفية تتغير ولا نعرف عنها شيئًا ، لأنها غير واضحة من واجهة برمجة التطبيقات الخاصة بهم. كما رأينا ، يمكن أن يتسبب هذا النموذج في عدم الحتمية العرضية والمآزق إذا لم نتوخى الحذر. تحتوي اللغات الوظيفية على أنواع بيانات لا تتغير ، لذا يمكن مشاركتها بأمان دون المخاطرة بتغيرها. الوظائف لها خصائص بالإضافة إلى أنواع البيانات الأخرى. يمكن إنشاء الوظائف أثناء تنفيذ البرنامج وتمريرها كمعامل إلى وظيفة أخرى أو إرجاعها كنتيجة لاستدعاء الوظيفة.

الأساسيات الأساسية للبرمجة المتزامنة مستقبلية واعدة. المستقبل ينفذ كتلة من التعليمات البرمجية في مؤشر ترابط آخر ويعيد كائنًا للقيمة المستقبلية التي سيتم إدخالها عند تنفيذ الكتلة.

 ; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))

عندما أقوم بتنفيذ هذا البرنامج النصي ، يكون الإخراج:

 Started A Started B Waiting for futures Finished A Finished B 10

في هذا المثال لدينا كتلتان مستقبليتان يتم تنفيذهما بشكل مستقل. يتم حظر البرنامج فقط عند قراءة القيمة من الكائن المستقبلي الذي لم يتوفر بعد. في حالتنا ، في انتظار تلخيص نتائج الكتل المستقبلية. يمكن التنبؤ بالسلوك (حتمي) وسيعطي دائمًا نفس النتيجة لأنه لا توجد حالة مشتركة قابلة للتغيير.

الوعد هو بدائية أخرى تستخدم للتزامن. الوعد هو الحاوية التي يمكن للمرء أن يضع فيها قيمة مرة واحدة. عند قراءة الوعود ، سينتظر الخيط حتى يتم الوفاء بقيمة الوعد.

 ; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)

في هذا المثال ، سينتظر المستقبل لطباعة النتيجة طالما وعد بعدم حفظ القيمة. بعد ثانيتين ، في الوعد سيتم تخزين القيمة 42 لتتم طباعتها في الخيط المستقبلي. قد يؤدي استخدام الوعود إلى طريق مسدود بدلاً من المستقبل ، لذا كن حذرًا عند استخدام الوعد.

 ; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)

في هذا المثال ، نستخدم نتيجة المستقبل ونتيجة الوعد. ترتيب الإعداد والقراءة هو أن الخيط الرئيسي ينتظر قيمة من سلسلة الرسائل المستقبلية وأن مؤشر الترابط المستقبلي ينتظر قيمة من السلسلة الرئيسية. سيكون هذا السلوك متوقعًا (حتميًا) وسيتم تشغيله في كل مرة يتم فيها تنفيذ البرنامج مما يسهل العثور على الخطأ وإزالته.

يسمح استخدام المستقبل للبرنامج بمواصلة التمرين حتى يحتاج إلى نتيجة تنفيذ المستقبل. ينتج عن هذا تنفيذ أسرع للبرنامج. إذا كان لديك معالجات متعددة مع المستقبل ، فيمكنك إجراء تنفيذ متوازٍ للبرنامج ذي سلوك (حتمي) يمكن التنبؤ به (كل مرة تعطي نفس النتيجة). بهذه الطريقة نستغل بشكل أفضل قوة الكمبيوتر.

 ; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))

في هذا المثال ، يمكنك أن ترى كيف يمكن لاستخدام المستقبل أن يستفيد بشكل أفضل من سرعة الكمبيوتر. لدينا رقمان فيبوناتشي يتم جمعهما. يمكننا أن نرى أن البرنامج يحسب النتيجة مرتين ، المرة الأولى بالتتابع في سلسلة واحدة ، والمرة الثانية على التوازي في خيطين. نظرًا لأن جهاز الكمبيوتر المحمول الخاص بي يحتوي على معالج متعدد النواة ، فإن التنفيذ المتوازي يعمل مرتين أسرع من الحساب المتسلسل.

نتيجة تنفيذ هذا البرنامج النصي على جهاز الكمبيوتر المحمول الخاص بي:

 Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"

التزامن

لدعم التزامن وعدم القدرة على التنبؤ في لغة برمجة Clojure ، يجب علينا استخدام نوع بيانات متغير حتى تتمكن مؤشرات الترابط الأخرى من رؤية التغييرات. أبسط نوع بيانات متغير هو الذرة. Atom عبارة عن حاوية لها دائمًا القيمة التي يمكن استبدالها بقيمة أخرى. يمكن استبدال القيمة بإدخال قيمة جديدة أو عن طريق استدعاء دالة تأخذ القيمة القديمة وتعيد قيمة جديدة يتم استخدامها بشكل متكرر. من المثير للاهتمام أن الذرة يتم تنفيذها بدون قفل وهي آمنة للاستخدام في الخيوط ، مما يعني أنه من المستحيل الوصول إلى طريق مسدود. داخليًا ، يستخدم atom مكتبة java.util.concurrent.AtomicReference. لنلق نظرة على مثال مضاد تم تنفيذه باستخدام الذرة.

 ; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

نتيجة تنفيذ البرنامج النصي على جهاز الكمبيوتر المحمول الخاص بي:

 The counter is: 1000000 Number of attempts: 1680212

في هذا المثال نستخدم الذرة التي تحتوي على قيمة العداد. يزيد العداد بـ (swap! counter inc). تعمل وظيفة المبادلة على النحو التالي: 1. خذ قيمة العداد واحتفظ بها 2. لهذه القيمة تستدعي الدالة المعطاة التي تحسب القيمة الجديدة 3. لحفظ القيمة الجديدة ، تستخدم العملية الذرية التي تتحقق مما إذا كانت القيمة القديمة قد تغيرت 3 أ. إذا لم تتغير القيمة فإنها تدخل قيمة جديدة 3 ب. إذا تم تغيير القيمة في غضون ذلك ، فانتقل إلى الخطوة 1 ونرى أنه يمكن استدعاء الوظيفة مرة أخرى إذا تم تغيير القيمة في هذه الأثناء. لا يمكن تغيير القيمة إلا من موضوع آخر. لذلك ، من الضروري ألا يكون للدالة التي تحسب قيمة جديدة أي آثار جانبية ، لذلك لا يهم إذا تم استدعاؤها مرات أكثر. أحد قيود الذرة هو أنها تزامن التغييرات إلى قيمة واحدة.

 ; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)

عندما أقوم بتنفيذ هذا البرنامج النصي ، أحصل على:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

في هذا المثال يمكننا أن نرى كيف نغير المزيد من الذرات. في وقت ما ، يمكن أن يحدث عدم الاتساق. مجموع حسابين في وقت ما ليس هو نفسه. إذا كان علينا تنسيق تغييرات القيم المتعددة ، فهناك حلان:

  1. ضع المزيد من القيم في ذرة واحدة
  2. استخدم المراجع وذاكرة المعاملات البرمجية ، كما سنرى لاحقًا
 ; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)

عندما أقوم بتشغيل هذا البرنامج النصي على جهاز الكمبيوتر الخاص بي ، أحصل على:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

في المثال ، تم حل التنسيق بحيث نضع قيمة أكبر باستخدام الخريطة. عندما نقوم بتحويل الأموال من الحساب ، فإننا نغير جميع الحسابات في ذلك الوقت بحيث لا يحدث أبدًا ألا يكون مجموع الأموال هو نفسه.

نوع البيانات المتغير التالي هو العامل. يتصرف العامل مثل الذرة فقط حيث يتم تنفيذ الوظيفة التي تغير القيمة في سلسلة مختلفة ، بحيث يستغرق التغيير بعض الوقت ليصبح مرئيًا. لذلك ، عند قراءة قيمة الوكيل ، من الضروري استدعاء دالة تنتظر حتى يتم تنفيذ جميع الوظائف التي تغير قيمة الوكيل. على عكس وظيفة الذرات التي تغير القيمة تسمى مرة واحدة فقط وبالتالي يمكن أن يكون لها آثار جانبية. يمكن لهذا النوع أيضًا مزامنة قيمة واحدة ولا يمكنه الوصول إلى طريق مسدود.

 ; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

عندما أقوم بتشغيل هذا البرنامج النصي على جهاز الكمبيوتر المحمول ، أحصل على:

 The counter is: 1000000 Number of attempts: 1000000

هذا المثال مماثل لتطبيق العداد بالذرة. الاختلاف الوحيد هو أننا هنا ننتظر اكتمال جميع تغييرات الوكيل قبل قراءة القيمة النهائية باستخدام انتظار.

نوع بيانات المتغير الأخير هو المراجع. على عكس الذرات ، يمكن للمراجع مزامنة التغييرات على قيم متعددة. يجب أن تكون كل عملية مرجعية في معاملة باستخدام dosync. تسمى طريقة تغيير البيانات هذه بذاكرة معاملات البرامج أو STM المختصرة. لنلقِ نظرة على مثال لتحويل الأموال في الحسابات.

 ; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)

عندما أقوم بتشغيل هذا البرنامج النصي ، أحصل على:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

ومن المثير للاهتمام ، أنه كانت هناك محاولات أكثر من عدد المعاملات التي تم إجراؤها. هذا لأن STM لا تستخدم الأقفال ، لذلك إذا كان هناك تعارض ، (مثل خيطين يحاولان تغيير نفس القيمة) فسيتم إعادة تنفيذ المعاملة. لهذا السبب ، لا ينبغي أن يكون للعملية آثار جانبية. يمكننا أن نرى أن الوكيل الذي يتغير القيمة داخل المعاملة يتصرف بشكل متوقع. سيتم تقييم الوظيفة التي تغير قيمة الوكيل بقدر عدد المرات التي توجد فيها معاملات. والسبب هو أن الوكيل على علم بالمعاملة. إذا كان لا بد من أن يكون للصفقة آثار جانبية ، فيجب تفعيلها داخل الوكيل. بهذه الطريقة ، سيكون للبرنامج سلوك يمكن التنبؤ به. ربما تعتقد أنه يجب عليك دائمًا استخدام STM ، ولكن غالبًا ما يستخدم المبرمجون المتمرسون الذرات لأن الذرات أبسط وأسرع من STM. بالطبع ، هذا إذا كان من الممكن عمل برنامج بهذه الطريقة. إذا كانت لديك آثار جانبية ، فلا يوجد خيار آخر سوى استخدام STM والعوامل.

نموذج الممثل

النموذج التالي من التزامن هو نموذج الممثل. مبدأ هذا النموذج مشابه للعالم الحقيقي. إذا عقدنا صفقة لإنشاء شيء ما مع العديد من الأشخاص ، على سبيل المثال مبنى ، فكل رجل في موقع البناء له دوره الخاص. حشد من الناس تحت إشراف المشرف. إذا أصيب عامل في العمل ، فسيقوم المشرف بتكليف وظيفة الرجل المصاب للآخرين المتاحين. إذا لزم الأمر قد يؤدي إلى موقع رجل جديد. لدينا على الموقع المزيد من الأشخاص الذين يقومون بالعمل في وقت واحد (بشكل متزامن) ، ولكن أيضًا يتحدثون مع بعضهم البعض للمزامنة. إذا وضعنا العمل في موقع البناء في البرنامج ، فسيكون كل شخص ممثلًا له حالة وينفذ العملية الخاصة به ، وسيتم استبدال الحديث بالرسائل. لغة البرمجة الشعبية القائمة على هذا النموذج هي Erlang. تحتوي هذه اللغة الشيقة على أنواع ووظائف بيانات ثابتة لها نفس خصائص أنواع البيانات الأخرى. يمكن إنشاء الوظائف أثناء تنفيذ البرنامج وتمريرها كوسائط إلى وظيفة أخرى أو إعادتها كنتيجة لاستدعاء الوظيفة. سأقدم أمثلة في لغة Elixir التي تستخدم آلة Erlang الافتراضية ، لذلك سيكون لدي نفس نموذج البرمجة مثل Erlang فقط بناء جملة مختلف. أهم ثلاثة بدائل في الإكسير هي التكاثر والإرسال والاستلام. ينفذ spawn الوظيفة في العملية الجديدة ، ويرسل إرسال الرسالة إلى العملية ويستقبل الرسائل التي يتم إرسالها إلى العملية الحالية.

سيتم مواجهة المثال الأول مع نموذج الممثل بشكل متزامن. لإنشاء برنامج بهذا النموذج ، من الضروري جعل الممثل يمتلك قيمة العداد ويستقبل رسالة لتعيين واسترداد قيمة العداد ، ولديه ممثلان سيزيدان قيمة العداد في نفس الوقت.

 # # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

عندما أقوم بتنفيذ هذا المثال ، أحصل على:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

يمكننا أن نرى أن العداد في النهاية هو 516827 وليس 1000000 كما توقعنا. عندما قمت بتشغيل البرنامج النصي في المرة القادمة ، تلقيت 511010. سبب هذا السلوك هو أن العداد يتلقى رسالتين: استرداد القيمة الحالية وتعيين القيمة الجديدة. لزيادة العداد ، يحتاج البرنامج إلى الحصول على القيمة الحالية ، وزيادتها بمقدار 1 وتعيين القيمة المتزايدة. تقوم عمليتان بقراءة وكتابة قيمة العداد في نفس الوقت باستخدام الرسالة التي يتم إرسالها إلى عملية العداد. لا يمكن التنبؤ بترتيب الرسائل التي سيتلقاها العداد ، ولا يمكن للبرنامج التحكم فيه. يمكننا تخيل هذا السيناريو:

  1. قيمة العداد هي 115
  2. العملية أ تقرأ قيمة العداد (115)
  3. العملية B تقرأ قيمة العداد (115)
  4. العملية "ب" تزيد من القيمة محليًا (116)
  5. مجموعات العملية B زادت القيمة إلى العداد (116)
  6. العملية أ تزيد من قيمة العداد (116)
  7. العملية أ تعين القيمة المتزايدة للعداد (116)
  8. قيمة العداد هي 116

إذا نظرنا إلى السيناريو ، فإن عمليتين تزيدان العداد بمقدار 1 ، ويتم زيادة العداد في النهاية بمقدار 1 وليس بمقدار 2. يمكن أن تحدث مثل هذه العلاقات المتبادلة عددًا غير متوقع من المرات ، وبالتالي فإن قيمة العداد لا يمكن التنبؤ بها. لمنع هذا السلوك ، يجب أن تتم عملية الزيادة برسالة واحدة.

 # # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

من خلال تشغيل هذا البرنامج النصي ، أحصل على:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

يمكننا أن نرى أن العداد له القيمة الصحيحة. سبب السلوك المتوقع (الحتمي) هو أن قيمة العداد تزداد برسالة واحدة بحيث لا يؤثر تسلسل الرسائل لزيادة العداد على قيمته النهائية. من خلال العمل مع نموذج الممثل ، علينا الانتباه إلى كيفية تشابك الرسائل وتصميمها الدقيق للرسائل والإجراءات على الرسائل لتجنب عدم القدرة على التنبؤ العرضي (عدم الحتمية).

كيف يمكننا تحويل الأموال بين حسابين بهذا النموذج؟

 # # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end

عندما أقوم بتشغيل هذا البرنامج النصي ، أحصل على:

 Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

يمكننا أن نرى أن تحويل الأموال يعمل بدون تناقضات ، لأننا اخترنا تحويل الرسائل لتحويل الأموال ومبالغ الرسائل للحصول على قيمة الحسابات مما يمنحنا سلوكًا يمكن التنبؤ به للبرنامج. عندما نقوم بتحويل الأموال ، يجب أن يكون المبلغ الإجمالي للمال في أي وقت هو نفسه.

يمكن أن يتسبب النموذج الفاعل في قفل وبالتالي حدوث طريق مسدود ، لذا توخي الحذر عند تصميم البرنامج. يوضح البرنامج النصي التالي كيف يمكنك محاكاة سيناريو القفل وإيقاف التشغيل.

 # # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking B, A started Waiting for locking to be done

From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.

 # # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

And now, there is no longer a deadlock.

يتم إحتوائه

As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.

Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.

The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.

Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.

I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.

Related: Ruby Concurrency and Parallelism: A Practical Tutorial