Pengantar Pemrograman Bersamaan: Panduan Pemula
Diterbitkan: 2022-03-11Apa itu pemrograman bersamaan? Sederhananya, ini adalah saat Anda melakukan lebih dari satu hal pada saat yang bersamaan. Jangan bingung dengan paralelisme, konkurensi adalah ketika beberapa urutan operasi dijalankan dalam periode waktu yang tumpang tindih. Dalam bidang pemrograman, konkurensi adalah subjek yang cukup kompleks. Berurusan dengan konstruksi seperti utas dan kunci dan menghindari masalah seperti kondisi balapan dan kebuntuan bisa sangat rumit, membuat program bersamaan sulit untuk ditulis. Melalui konkurensi, program dapat dirancang sebagai proses independen yang bekerja bersama dalam komposisi tertentu. Struktur seperti itu dapat dibuat paralel atau tidak; namun, mencapai struktur seperti itu dalam program Anda menawarkan banyak keuntungan.
Pada artikel ini, kita akan melihat sejumlah model konkurensi yang berbeda, bagaimana mencapainya dalam berbagai bahasa pemrograman yang dirancang untuk konkurensi.
Model Status Mutable Bersama
Mari kita lihat contoh sederhana dengan penghitung dan dua utas yang meningkatkannya. Programnya tidak boleh terlalu rumit. Kami memiliki objek yang berisi penghitung yang meningkat dengan peningkatan metode, dan mengambilnya dengan metode get dan dua utas yang meningkatkannya.
// // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }
Program naif ini tidak naif seperti yang terlihat pada pandangan pertama. Ketika saya menjalankan program ini lebih sering saya mendapatkan hasil yang berbeda. Ada tiga nilai setelah tiga eksekusi di laptop saya.
java Counting 553706 java Counting 547818 java Counting 613014
Apa alasan dari perilaku yang tidak terduga ini? Program meningkatkan penghitung di satu tempat, dalam peningkatan metode yang menggunakan perintah penghitung ++. Jika kita melihat kode byte perintah kita akan melihat bahwa itu terdiri dari beberapa bagian:
- Baca nilai penghitung dari memori
- Tingkatkan nilai secara lokal
- Simpan nilai penghitung dalam memori
Sekarang kita bisa membayangkan apa yang bisa salah dalam urutan ini. Jika kami memiliki dua utas yang secara independen meningkatkan penghitung maka kami dapat memiliki skenario ini:
- Nilai penghitung adalah 115
- Utas pertama membaca nilai penghitung dari memori (115)
- Utas pertama meningkatkan nilai penghitung lokal (116)
- Utas kedua membaca nilai penghitung dari memori (115)
- Utas kedua meningkatkan nilai penghitung lokal (116)
- Utas kedua menyimpan nilai penghitung lokal ke memori (116)
- Utas pertama menyimpan nilai penghitung lokal ke memori (116)
- Nilai penghitung adalah 116
Dalam skenario ini, dua utas terjalin sehingga nilai penghitung bertambah 1, tetapi nilai penghitung harus ditingkatkan 2 karena setiap utas meningkatkannya 1. Benang yang berbeda saling terkait mempengaruhi hasil program. Alasan program tidak dapat diprediksi adalah karena program tidak memiliki kendali atas jalinan benang melainkan sistem operasi. Setiap kali program dijalankan, utas dapat terjalin secara berbeda. Dengan cara ini kami memperkenalkan ketidakpastian yang tidak disengaja (non-determinisme) ke program.
Untuk memperbaiki ketidakpastian yang tidak disengaja ini (non-determinisme), program harus memiliki kontrol jalinan utas. Ketika satu utas dalam metode meningkatkan utas lain tidak boleh berada dalam metode yang sama sampai yang pertama keluar darinya. Dengan cara itu kami membuat serial akses ke metode peningkatan.
// // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
Solusi lain adalah dengan menggunakan pencacah yang dapat bertambah secara atomik, artinya operasi tidak dapat dipisahkan menjadi beberapa operasi. Dengan cara ini, kita tidak perlu memiliki blok kode yang perlu disinkronkan. Java memiliki tipe data atom dalam ruang nama java.util.concurrent.atomic, dan kita akan menggunakan AtomicInteger.
// // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
Integer atom memiliki operasi yang kita butuhkan, sehingga kita dapat menggunakannya sebagai pengganti kelas Counter. Sangat menarik untuk dicatat bahwa semua metode atomicinteger tidak menggunakan penguncian, sehingga tidak ada kemungkinan kebuntuan, yang memfasilitasi desain program.
Menggunakan kata kunci yang disinkronkan untuk menyinkronkan metode penting harus menyelesaikan semua masalah, bukan? Bayangkan kita memiliki dua akun yang dapat melakukan deposit, withdraw dan transfer ke akun lain. Apa yang terjadi jika pada saat yang sama kita ingin mentransfer uang dari satu rekening ke rekening lain dan sebaliknya? Mari kita lihat sebuah contoh.
// // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
Ketika saya menjalankan program ini di laptop saya, biasanya macet. Mengapa ini terjadi? Jika kita perhatikan lebih dekat, kita dapat melihat bahwa ketika kita mentransfer uang, kita masuk ke metode transfer yang disinkronkan dan mengunci akses ke semua metode yang disinkronkan pada akun sumber, dan kemudian mengunci akun tujuan yang mengunci akses ke semua metode yang disinkronkan di dalamnya.
Bayangkan skenario berikut:
- Transfer panggilan utas pertama di akun Bob ke akun Joe
- Transfer panggilan utas kedua di akun Joe ke akun Bob
- Utas kedua mengurangi jumlah dari akun Joe
- Utas kedua masuk ke jumlah setoran ke akun Bob tetapi menunggu utas pertama menyelesaikan transfer.
- Utas pertama mengurangi jumlah dari akun Bob
- Utas pertama masuk ke jumlah setoran ke akun Joe tetapi menunggu utas kedua menyelesaikan transfer.
Dalam skenario ini, satu utas menunggu utas lain untuk menyelesaikan transfer dan sebaliknya. Mereka terjebak satu sama lain dan program tidak dapat dilanjutkan. Ini disebut kebuntuan. Untuk menghindari kebuntuan, perlu untuk mengunci akun dalam urutan yang sama. Untuk memperbaiki program, kami akan memberikan nomor unik pada setiap akun sehingga kami dapat mengunci akun dengan urutan yang sama saat mentransfer uang.
// // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
Karena kesalahan yang tidak dapat diprediksi, terkadang terjadi, tetapi tidak selalu dan sulit untuk direproduksi. Jika program berperilaku tidak terduga, biasanya disebabkan oleh konkurensi yang memperkenalkan non-determinisme yang tidak disengaja. Untuk menghindari non-determinisme yang tidak disengaja, kita harus terlebih dahulu merancang program untuk memperhitungkan semua jalinan.
Contoh program yang memiliki non-determinisme tak disengaja.
// // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }
Program ini memiliki non-determinisme yang tidak disengaja di dalamnya. Nilai terakhir yang dimasukkan dalam wadah akan ditampilkan.
java NonDeterminism Slow
Benang yang lebih lambat akan memasukkan nilainya nanti, dan nilai ini akan dicetak (Lambat). Tapi ini tidak perlu terjadi. Bagaimana jika komputer secara bersamaan menjalankan program lain yang membutuhkan banyak sumber daya CPU? Kami tidak memiliki jaminan bahwa itu akan menjadi utas yang lebih lambat yang memasukkan nilai terakhir karena dikendalikan oleh sistem operasi, bukan program. Kita dapat memiliki situasi di mana program bekerja di satu komputer dan di komputer lain berperilaku berbeda. Kesalahan seperti itu sulit ditemukan dan menyebabkan sakit kepala bagi pengembang. Untuk semua alasan ini model konkurensi ini sangat sulit dilakukan dengan benar.
Cara Fungsional
Paralelisme
Mari kita lihat model lain yang digunakan bahasa fungsional. Misalnya kita akan menggunakan Clojure, yang bisa diartikan menggunakan tool Leiningen. Clojure adalah bahasa yang sangat menarik dengan dukungan yang baik untuk konkurensi. Model konkurensi sebelumnya adalah dengan status yang dapat diubah bersama. Kelas yang kita gunakan juga bisa memiliki status tersembunyi yang bermutasi yang tidak kita ketahui, karena tidak terlihat dari API-nya. Seperti yang telah kita lihat, model ini dapat menyebabkan non-determinisme dan kebuntuan yang tidak disengaja jika kita tidak berhati-hati. Bahasa fungsional memiliki tipe data yang tidak bermutasi sehingga dapat dibagikan dengan aman tanpa risiko berubah. Fungsi memiliki properti seperti halnya tipe data lainnya. Fungsi dapat dibuat selama eksekusi program dan diteruskan sebagai parameter ke fungsi lain atau dikembalikan sebagai hasil dari pemanggilan fungsi.
Primitif dasar untuk pemrograman bersamaan adalah masa depan dan janji. Future mengeksekusi blok kode di utas lain dan mengembalikan objek untuk nilai masa depan yang akan dimasukkan saat blok dieksekusi.
; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))
Ketika saya menjalankan skrip ini, hasilnya adalah:
Started A Started B Waiting for futures Finished A Finished B 10
Dalam contoh ini kami memiliki dua blok masa depan yang dieksekusi secara independen. Program hanya memblokir saat membaca nilai dari objek masa depan yang belum tersedia. Dalam kasus kami, menunggu kedua hasil blok masa depan dijumlahkan. Perilaku dapat diprediksi (deterministik) dan akan selalu memberikan hasil yang sama karena tidak ada keadaan yang dapat diubah bersama.
Primitif lain yang digunakan untuk konkurensi adalah janji. Janji adalah wadah di mana seseorang dapat memberi nilai satu kali. Saat membaca janji, utas akan menunggu sampai nilai janji terisi.
; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)
Dalam contoh ini, masa depan akan menunggu untuk mencetak hasilnya selama janji untuk tidak menyimpan nilai. Setelah dua detik, dalam janji akan disimpan nilai 42 untuk dicetak di utas mendatang. Menggunakan janji dapat menyebabkan kebuntuan yang bertentangan dengan masa depan, jadi berhati-hatilah saat menggunakan janji.
; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)
Dalam contoh ini, kita menggunakan hasil dari masa depan dan hasil dari janji. Urutan pengaturan dan pembacaan nilai adalah bahwa utas utama menunggu nilai dari utas mendatang dan utas mendatang menunggu nilai dari utas utama. Perilaku ini akan dapat diprediksi (deterministik) dan akan dimainkan setiap kali program dijalankan yang membuatnya lebih mudah untuk menemukan dan menghapus kesalahan.
Menggunakan masa depan memungkinkan program untuk melanjutkan latihan sampai membutuhkan hasil eksekusi masa depan. Ini menghasilkan eksekusi program yang lebih cepat. Jika Anda memiliki beberapa prosesor dengan masa depan, Anda dapat membuat eksekusi paralel program yang memiliki perilaku yang dapat diprediksi (deterministik) (setiap kali memberikan hasil yang sama). Dengan begitu kita lebih baik memanfaatkan kekuatan komputer.
; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))
Dalam contoh ini Anda dapat melihat bagaimana penggunaan masa depan dapat memanfaatkan kecepatan komputer dengan lebih baik. Kami memiliki dua angka Fibonacci yang dijumlahkan. Kita dapat melihat bahwa program menghitung hasilnya dua kali, pertama kali secara berurutan dalam satu utas, dan yang kedua secara paralel dalam dua utas. Karena laptop saya memiliki prosesor multicore, eksekusi paralel bekerja dua kali lebih cepat dari perhitungan berurutan.
Hasil eksekusi script ini di laptop saya:
Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"
Konkurensi
Untuk mendukung concurrency dan unpredictability dalam bahasa pemrograman Clojure, kita harus menggunakan tipe data yang variabel sehingga thread lain dapat melihat perubahannya. Tipe data variabel yang paling sederhana adalah atom. Atom adalah wadah yang selalu memiliki nilai yang dapat diganti dengan nilai lain. Nilai dapat diganti dengan memasukkan nilai baru atau dengan memanggil fungsi yang mengambil nilai lama dan mengembalikan nilai baru yang lebih sering digunakan. Sangat menarik bahwa atom diimplementasikan tanpa penguncian dan aman digunakan di utas, yang berarti tidak mungkin mencapai kebuntuan. Secara internal, atom menggunakan perpustakaan java.util.concurrent.AtomicReference. Mari kita lihat contoh penghitung yang diimplementasikan dengan atom.
; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
Hasil eksekusi script di laptop saya :
The counter is: 1000000 Number of attempts: 1680212
Dalam contoh ini kita menggunakan atom yang berisi nilai counter. Penghitung meningkat dengan (swap! counter inc). Fungsi swap bekerja seperti ini: 1. mengambil nilai penghitung dan menyimpannya 2. untuk nilai ini memanggil fungsi yang diberikan yang menghitung nilai baru 3. untuk menyimpan nilai baru, ia menggunakan operasi atom yang memeriksa apakah nilai lama telah berubah 3a. jika nilainya belum berubah maka masuk nilai baru 3b. jika nilai berubah sementara itu, lanjutkan ke langkah 1 Kita melihat bahwa fungsi dapat dipanggil lagi jika nilai diubah sementara itu. Nilai hanya dapat diubah dari utas lain. Oleh karena itu, penting bahwa fungsi yang menghitung nilai baru tidak memiliki efek samping sehingga tidak masalah jika dipanggil lebih banyak. Salah satu keterbatasan atom adalah bahwa ia menyinkronkan perubahan ke satu nilai.
; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)
Ketika saya menjalankan skrip ini saya mendapatkan:

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525
Dalam contoh ini kita dapat melihat bagaimana kita mengubah lebih banyak atom. Pada satu titik, inkonsistensi bisa terjadi. Jumlah dua akun pada suatu waktu tidak sama. Jika kita harus mengoordinasikan perubahan beberapa nilai, ada dua solusi:
- Tempatkan lebih banyak nilai dalam satu atom
- Gunakan referensi dan memori transaksional perangkat lunak, seperti yang akan kita lihat nanti
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)
Ketika saya menjalankan skrip ini di komputer saya, saya mendapatkan:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
Dalam contoh, koordinasi telah diselesaikan sehingga kami memberi nilai lebih menggunakan peta. Ketika kami mentransfer uang dari rekening, kami mengubah semua rekening pada saat itu sehingga tidak akan pernah terjadi jumlah uang yang tidak sama.
Tipe data variabel selanjutnya adalah agen. Agen berperilaku seperti atom hanya dalam fungsi yang mengubah nilai dieksekusi di utas yang berbeda, sehingga perlu beberapa waktu agar perubahan terlihat. Oleh karena itu, ketika membaca nilai agen perlu memanggil fungsi yang menunggu sampai semua fungsi yang mengubah nilai agen dieksekusi. Berbeda dengan fungsi atom yang perubahan nilainya disebut hanya sekali dan karena itu dapat menimbulkan efek samping. Tipe ini juga dapat menyinkronkan satu nilai dan tidak dapat menemui jalan buntu.
; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
Ketika saya menjalankan skrip ini di laptop saya, saya mendapatkan:
The counter is: 1000000 Number of attempts: 1000000
Contoh ini sama dengan penerapan pencacah dengan atom. Satu-satunya perbedaan adalah bahwa di sini kita menunggu semua perubahan agen selesai sebelum membaca nilai akhir menggunakan menunggu.
Tipe data variabel terakhir adalah referensi. Tidak seperti atom, referensi dapat menyinkronkan perubahan ke beberapa nilai. Setiap operasi pada referensi harus dalam transaksi menggunakan dosync. Cara mengubah data ini disebut software transactional memory atau disingkat STM. Mari kita lihat contoh dengan transfer uang di rekening.
; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)
Ketika saya menjalankan skrip ini, saya mendapatkan:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000
Menariknya, ada lebih banyak upaya daripada jumlah transaksi yang dilakukan. Hal ini karena STM tidak menggunakan kunci, sehingga jika terjadi konflik (seperti dua utas mencoba mengubah nilai yang sama) transaksi akan dilakukan kembali. Untuk itu, transaksi tidak boleh menimbulkan efek samping. Kita dapat melihat bahwa agen yang nilainya berubah dalam transaksi berperilaku dapat diprediksi. Fungsi yang mengubah nilai agen akan dievaluasi sebanyak ada transaksi. Alasannya karena agen sadar transaksi. Jika transaksi harus memiliki efek samping, mereka harus berfungsi di dalam agen. Dengan cara ini, program akan memiliki perilaku yang dapat diprediksi. Anda mungkin berpikir bahwa Anda harus selalu menggunakan STM, tetapi programmer berpengalaman akan sering menggunakan atom karena atom lebih sederhana dan lebih cepat daripada STM. Tentu saja, itu jika memungkinkan untuk membuat program dengan cara itu. Jika Anda memiliki efek samping, maka tidak ada pilihan lain selain menggunakan STM dan agen.
Model Aktor
Model konkurensi berikut adalah model aktor. Prinsip model ini mirip dengan dunia nyata. Jika kita membuat kesepakatan untuk membuat sesuatu dengan banyak orang, misalnya sebuah bangunan, maka setiap orang di lokasi konstruksi memiliki perannya masing-masing. Kerumunan orang diawasi oleh pengawas. Jika seorang pekerja terluka di tempat kerja, penyelia akan menyerahkan pekerjaan orang yang terluka itu kepada orang lain yang tersedia. Jika perlu, dia dapat membawa orang baru ke situs itu. Di situs kami memiliki lebih banyak orang yang melakukan pekerjaan secara bersamaan (bersamaan), tetapi juga berbicara satu sama lain untuk menyinkronkan. Jika kita memasukkan pekerjaan di lokasi konstruksi ke dalam program, maka setiap orang akan menjadi aktor yang memiliki keadaan dan mengeksekusi dalam prosesnya sendiri, dan pembicaraan akan diganti dengan pesan. Bahasa pemrograman populer berdasarkan model ini adalah Erlang. Bahasa yang menarik ini memiliki tipe data yang tidak dapat diubah dan fungsi yang memiliki properti yang sama dengan tipe data lainnya. Fungsi dapat dibuat selama eksekusi program dan diteruskan sebagai argumen ke fungsi lain atau dikembalikan sebagai hasil pemanggilan fungsi. Saya akan memberikan contoh dalam bahasa Elixir yang menggunakan mesin virtual Erlang, jadi saya akan memiliki model pemrograman yang sama dengan Erlang hanya sintaks yang berbeda. Tiga primitif terpenting dalam Elixir adalah spawn, send dan accept. spawn menjalankan fungsi dalam proses baru, mengirim mengirim pesan ke proses dan menerima menerima pesan yang dikirim ke proses saat ini.
Contoh pertama dengan model aktor akan counter meningkat secara bersamaan. Untuk membuat program dengan model ini, diperlukan sebuah aktor yang memiliki nilai penghitung dan menerima pesan untuk mengatur dan mengambil nilai penghitung, serta memiliki dua aktor yang secara bersamaan akan meningkatkan nilai penghitung.
# # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
Ketika saya menjalankan contoh ini saya mendapatkan:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827
Kita dapat melihat bahwa pada akhirnya penghitungnya adalah 516827 dan bukan 1000000 seperti yang kita harapkan. Ketika saya menjalankan skrip lain kali, saya menerima 511010. Alasan untuk perilaku ini adalah bahwa penghitung menerima dua pesan: mengambil nilai saat ini dan menetapkan nilai baru. Untuk meningkatkan penghitung, program perlu mendapatkan nilai saat ini, meningkatkannya dengan 1 dan mengatur nilai yang meningkat. Dua proses membaca dan menulis nilai pencacah secara bersamaan dengan menggunakan pesan yang dikirim ke proses pencacah. Urutan pesan yang akan diterima penghitung tidak dapat diprediksi, dan program tidak dapat mengontrolnya. Kita bisa membayangkan skenario ini:
- Nilai penghitung adalah 115
- Proses A membaca nilai counter (115)
- Proses B membaca nilai counter (115)
- Proses B meningkatkan nilai secara lokal (116)
- Proses B menetapkan peningkatan nilai ke penghitung (116)
- Proses A meningkatkan nilai penghitung (116)
- Proses A menetapkan peningkatan nilai ke penghitung (116)
- Nilai penghitung adalah 116
Jika kita melihat skenario, dua proses meningkatkan penghitung sebesar 1, dan penghitung meningkat pada akhirnya sebesar 1 dan bukan 2. Jalinan seperti itu dapat terjadi berkali-kali yang tidak dapat diprediksi dan oleh karena itu nilai penghitung tidak dapat diprediksi. Untuk mencegah perilaku ini, operasi peningkatan harus dilakukan oleh satu pesan.
# # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
Dengan menjalankan skrip ini saya mendapatkan:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000
Kita dapat melihat bahwa penghitung memiliki nilai yang benar. Alasan untuk perilaku yang dapat diprediksi (deterministik) adalah bahwa nilai penghitung bertambah satu pesan sehingga urutan pesan untuk meningkatkan penghitung tidak akan mempengaruhi nilai akhirnya. Bekerja dengan model aktor, kita harus memperhatikan bagaimana pesan dapat terjalin dan desain pesan dan tindakan yang cermat pada pesan untuk menghindari ketidakpastian yang tidak disengaja (non-determinisme).
Bagaimana kita bisa mentransfer uang antara dua rekening dengan model ini?
# # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end
Ketika saya menjalankan skrip ini saya mendapatkan:
Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
Kami dapat melihat bahwa transfer uang bekerja tanpa inkonsistensi, karena kami telah memilih transfer pesan untuk mentransfer uang dan jumlah pesan untuk mendapatkan nilai akun yang memberi kami perilaku program yang dapat diprediksi. Setiap kali kita melakukan transfer uang, jumlah total uang setiap saat harus sama.
Model aktor dapat menyebabkan lock dan deadlock, jadi berhati-hatilah saat merancang program. Skrip berikut menunjukkan bagaimana Anda dapat mensimulasikan skenario kunci dan kebuntuan.
# # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking B, A started Waiting for locking to be done
From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.
# # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished
And now, there is no longer a deadlock.
Bungkus
As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.
Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.
The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.
Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.
I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.