동시 프로그래밍 소개: 초심자를 위한 안내서

게시 됨: 2022-03-11

동시 프로그래밍이란 무엇입니까? 간단히 설명하면, 그것은 당신이 동시에 하나 이상의 일을 할 때입니다. 병렬 처리와 혼동하지 않도록 동시성은 여러 작업 시퀀스가 ​​겹치는 기간에 실행되는 경우입니다. 프로그래밍 영역에서 동시성은 꽤 복잡한 주제입니다. 스레드 및 잠금과 같은 구성을 처리하고 경쟁 조건 및 교착 상태와 같은 문제를 방지하는 것은 상당히 번거로워서 동시 프로그램을 작성하기 어렵게 만듭니다. 동시성을 통해 프로그램은 특정 구성에서 함께 작동하는 독립적인 프로세스로 설계될 수 있습니다. 이러한 구조는 병렬로 만들 수도 있고 그렇지 않을 수도 있습니다. 그러나 프로그램에서 이러한 구조를 달성하면 많은 이점이 있습니다.

동시 프로그래밍 소개

이 기사에서는 다양한 동시성 모델을 살펴보고 동시성을 위해 설계된 다양한 프로그래밍 언어에서 이를 달성하는 방법을 살펴보겠습니다.

공유 가변 상태 모델

카운터와 카운터를 증가시키는 두 개의 스레드가 있는 간단한 예를 살펴보겠습니다. 프로그램이 너무 복잡하지 않아야 합니다. 메소드 증가에 따라 증가하는 카운터를 포함하는 객체가 있으며 get 메소드와 이를 증가시키는 두 개의 스레드로 이를 검색합니다.

 // // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

이 순진한 프로그램은 언뜻 보이는 것처럼 순진하지 않습니다. 이 프로그램을 더 많이 실행하면 다른 결과가 나타납니다. 내 랩톱에서 세 번 실행한 후 세 가지 값이 있습니다.

 java Counting 553706 java Counting 547818 java Counting 613014

이 예측할 수 없는 행동의 이유는 무엇입니까? 프로그램은 counter++ 명령을 사용하는 방법 증가에서 카운터를 한 곳에서 증가시킵니다. 명령 바이트 코드를 보면 여러 부분으로 구성되어 있음을 알 수 있습니다.

  1. 메모리에서 카운터 값 읽기
  2. 로컬에서 가치 증대
  3. 카운터 값을 메모리에 저장

이제 우리는 이 순서에서 무엇이 잘못될 수 있는지 상상할 수 있습니다. 카운터를 독립적으로 증가시키는 두 개의 스레드가 있는 경우 다음 시나리오가 있을 수 있습니다.

  1. 카운터 값은 115입니다.
  2. 첫 번째 스레드는 메모리(115)에서 카운터 값을 읽습니다.
  3. 첫 번째 스레드는 로컬 카운터 값을 증가시킵니다(116).
  4. 두 번째 스레드는 메모리(115)에서 카운터 값을 읽습니다.
  5. 두 번째 스레드는 로컬 카운터 값을 증가시킵니다(116).
  6. 두 번째 스레드는 로컬 카운터 값을 메모리(116)에 저장합니다.
  7. 첫 번째 스레드는 로컬 카운터 값을 메모리에 저장합니다(116).
  8. 카운터 값은 116입니다.

이 시나리오에서는 두 개의 스레드가 얽혀있어 카운터 값이 1 증가하지만 각 스레드가 1씩 증가하기 때문에 카운터 값은 2만큼 증가해야 합니다. 서로 다른 스레드가 얽혀 프로그램의 결과에 영향을 미칩니다. 프로그램이 예측할 수 없는 이유는 프로그램이 스레드 얽힌 제어가 아니라 운영 체제를 제어하기 때문입니다. 프로그램이 실행될 때마다 스레드는 다르게 얽힐 수 있습니다. 이런 식으로 우리는 프로그램에 우발적인 예측 불가능성(비결정성)을 도입했습니다.

이 우발적인 예측 불가능성(비결정성)을 수정하려면 프로그램이 스레드 얽힘을 제어해야 합니다. 한 스레드가 메서드 증가에 있을 때 다른 스레드는 첫 번째 스레드가 나올 때까지 같은 메서드에 있으면 안 됩니다. 그런 식으로 우리는 메서드 증가에 대한 액세스를 직렬화합니다.

 // // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

또 다른 솔루션은 원자적으로 증가할 수 있는 카운터를 사용하는 것입니다. 즉, 작업을 여러 작업으로 분리할 수 없습니다. 이런 식으로 동기화해야 하는 코드 블록이 필요하지 않습니다. Java는 java.util.concurrent.atomic 네임스페이스에 원자 데이터 유형이 있으며 우리는 AtomicInteger를 사용할 것입니다.

 // // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Atomic integer에는 필요한 연산이 있으므로 Counter 클래스 대신 사용할 수 있습니다. atomicinteger의 모든 방법은 잠금을 사용하지 않으므로 교착 상태의 가능성이 없으므로 프로그램 설계가 용이합니다.

중요한 메소드를 동기화하기 위해 동기화된 키워드를 사용하면 모든 문제가 해결되어야 하지 않습니까? 다른 계정으로 입금, 출금 및 이체할 수 있는 두 개의 계정이 있다고 가정해 보겠습니다. 동시에 한 계정에서 다른 계정으로 또는 그 반대로 돈을 이체하려는 경우 어떻게 됩니까? 예를 들어 보겠습니다.

 // // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

랩톱에서 이 프로그램을 실행하면 일반적으로 멈춥니다. 왜 이런 일이 발생합니까? 자세히 살펴보면 돈을 이체할 때 동기화된 이체 방법을 입력하고 원본 계정의 모든 동기화된 방법에 대한 액세스를 잠근 다음 대상 계정을 잠그고 모든 동기화된 방법에 대한 액세스를 잠그는 것을 알 수 있습니다.

다음 시나리오를 상상해 보십시오.

  1. 첫 번째 스레드는 Bob의 계정에서 Joe의 계정으로 전송을 호출합니다.
  2. 두 번째 스레드는 Joe의 계정에서 Bob의 계정으로 전송을 호출합니다.
  3. 두 번째 스레드는 Joe의 계정에서 금액을 줄입니다.
  4. 두 번째 스레드는 Bob의 계좌로 입금 금액으로 이동하지만 첫 번째 스레드가 이체를 완료할 때까지 기다립니다.
  5. 첫 번째 스레드는 Bob의 계정에서 금액을 줄입니다.
  6. 첫 번째 스레드는 Joe의 계정으로 입금 금액으로 이동하지만 두 번째 스레드가 이체를 완료할 때까지 기다립니다.

이 시나리오에서 한 스레드는 다른 스레드가 전송을 완료하기를 기다리고 있으며 그 반대의 경우도 마찬가지입니다. 그들은 서로 붙어 프로그램을 계속할 수 없습니다. 이것을 교착 상태라고 합니다. 교착 상태를 방지하려면 동일한 순서로 계정을 잠글 필요가 있습니다. 프로그램을 수정하기 위해 각 계정에 고유 번호를 부여하여 돈을 이체할 때 동일한 순서로 계정을 잠글 수 있습니다.

 // // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

이러한 실수의 예측 불가능성으로 인해 때때로 발생하지만 항상 그런 것은 아니며 재현하기 어렵습니다. 프로그램이 예측할 수 없게 동작하는 경우 일반적으로 우연한 비결정성을 도입하는 동시성으로 인해 발생합니다. 우발적인 비결정론을 피하기 위해 우리는 모든 얽힘을 고려하도록 프로그램을 미리 설계해야 합니다.

우발적 비결정성이 있는 프로그램의 예.

 // // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

이 프로그램에는 우연한 비결정성이 있습니다. 컨테이너에 마지막으로 입력한 값이 표시됩니다.

 java NonDeterminism Slow

느린 스레드는 나중에 값을 입력하고 이 값이 인쇄됩니다(느림). 하지만 그럴 필요는 없습니다. 컴퓨터가 CPU 리소스를 많이 필요로 하는 다른 프로그램을 동시에 실행하면 어떻게 될까요? 프로그램이 아닌 운영 체제에 의해 제어되기 때문에 값을 마지막에 입력하는 느린 스레드가 될 것이라는 보장은 없습니다. 프로그램이 한 컴퓨터에서는 작동하고 다른 컴퓨터에서는 다르게 동작하는 상황이 있을 수 있습니다. 이러한 오류는 찾기 어렵고 개발자에게 골칫거리입니다. 이러한 모든 이유 때문에 이 동시성 모델은 올바르게 수행하기가 매우 어렵습니다.

기능적인 방법

병행

함수형 언어가 사용하는 또 다른 모델을 살펴보겠습니다. 예를 들어 Leiningen 도구를 사용하여 해석할 수 있는 Clojure를 사용할 것입니다. Clojure는 동시성을 잘 지원하는 매우 흥미로운 언어입니다. 이전 동시성 모델은 변경 가능한 공유 상태였습니다. 우리가 사용하는 클래스는 또한 API에서 분명하지 않기 때문에 우리가 알지 못하는 변경되는 숨겨진 상태를 가질 수 있습니다. 우리가 보았듯이 이 모델은 주의하지 않으면 우발적인 비결정론과 교착 상태를 유발할 수 있습니다. 기능 언어에는 변경되지 않는 데이터 유형이 있으므로 변경될 위험 없이 안전하게 공유할 수 있습니다. 함수에는 속성과 기타 데이터 유형이 있습니다. 함수는 프로그램 실행 중에 생성되어 다른 함수에 매개변수로 전달되거나 함수 호출의 결과로 반환될 수 있습니다.

동시 프로그래밍의 기본 기본 요소는 future와 promise입니다. Future는 다른 스레드에서 코드 블록을 실행하고 블록이 실행될 때 입력될 미래 값에 대한 개체를 반환합니다.

 ; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))

이 스크립트를 실행하면 출력은 다음과 같습니다.

 Started A Started B Waiting for futures Finished A Finished B 10

이 예제에는 독립적으로 실행되는 두 개의 future 블록이 있습니다. 프로그램은 아직 사용할 수 없는 미래 개체에서 값을 읽을 때만 차단합니다. 우리의 경우, 미래 블록의 두 결과가 모두 합산되기를 기다립니다. 동작은 예측 가능하고(결정적) 공유 변경 가능한 상태가 없기 때문에 항상 동일한 결과를 제공합니다.

동시성에 사용되는 또 다른 기본 요소는 약속입니다. 약속은 한 번 가치를 담을 수 있는 그릇입니다. Promise를 읽을 때 스레드는 Promise 값이 채워질 때까지 기다립니다.

 ; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)

이 예에서 future는 값을 저장하지 않겠다고 약속하는 한 결과를 인쇄하기를 기다립니다. 2초 후에 약속에 미래 스레드에서 인쇄할 값 42가 저장됩니다. Promise를 사용하면 미래와 달리 교착 상태가 발생할 수 있으므로 Promise를 사용할 때 주의해야 합니다.

 ; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)

이 예에서는 미래의 결과와 약속의 결과를 사용하고 있습니다. 값을 설정하고 읽는 순서는 메인 쓰레드는 퓨처 쓰레드의 값을 기다리고 퓨처 쓰레드는 메인 쓰레드의 값을 기다리는 것이다. 이 동작은 예측 가능하고(결정적) 프로그램이 실행될 때마다 재생되어 오류를 쉽게 찾고 제거할 수 있습니다.

미래를 사용하면 미래의 실행 결과가 필요할 때까지 프로그램이 연습을 계속할 수 있습니다. 결과적으로 프로그램 실행이 빨라집니다. 미래의 프로세서가 여러 개 있는 경우 예측 가능한(결정적) 동작(매번 동일한 결과를 제공함)이 있는 프로그램을 병렬로 실행할 수 있습니다. 그렇게 하면 컴퓨터의 능력을 더 잘 활용할 수 있습니다.

 ; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))

이 예에서 future를 사용하면 컴퓨터의 속도를 더 잘 활용할 수 있는 방법을 알 수 있습니다. 두 개의 피보나치 수를 합산합니다. 프로그램이 결과를 두 번 계산하는 것을 볼 수 있습니다. 첫 번째는 단일 스레드에서 순차적으로, 두 번째는 두 스레드에서 병렬로 계산합니다. 내 랩톱에는 멀티코어 프로세서가 있으므로 병렬 실행은 순차 계산보다 두 배 빠르게 작동합니다.

내 랩톱에서 이 스크립트를 실행한 결과:

 Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"

동시성

Clojure 프로그래밍 언어에서 동시성과 예측 불가능성을 지원하려면 다른 스레드가 변경 사항을 볼 수 있도록 가변 데이터 유형을 사용해야 합니다. 가장 간단한 변수 데이터 유형은 원자입니다. Atom은 항상 다른 값으로 대체될 수 있는 값을 갖는 컨테이너입니다. 새 값을 입력하거나 이전 값을 가져와 더 자주 사용되는 새 값을 반환하는 함수를 호출하여 값을 바꿀 수 있습니다. 아톰이 잠금 없이 구현되고 스레드에서 사용하는 것이 안전하다는 점이 흥미롭습니다. 즉, 교착 상태에 도달하는 것이 불가능하다는 의미입니다. 내부적으로 atom은 java.util.concurrent.AtomicReference 라이브러리를 사용합니다. 원자로 구현된 카운터 예제를 살펴보겠습니다.

 ; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

내 랩톱에서 스크립트 실행 결과:

 The counter is: 1000000 Number of attempts: 1680212

이 예에서는 카운터 값을 포함하는 원자를 사용합니다. 카운터는 (swap! counter inc)와 함께 증가합니다. Swap 기능은 다음과 같이 작동합니다. 1. 카운터 값을 가져와서 유지합니다. 2. 이 값에 대해 새 값을 계산하는 주어진 함수를 호출합니다. 3. 새 값을 저장하기 위해 이전 값이 변경되었는지 확인하는 원자적 연산을 사용합니다. 3a. 값이 변경되지 않은 경우 새 값 3b를 입력합니다. 그 동안 값이 변경되면 1단계로 이동합니다. 그 동안 값이 변경되면 함수를 다시 호출할 수 있음을 알 수 있습니다. 값은 다른 스레드에서만 변경할 수 있습니다. 따라서 새 값을 계산하는 함수는 더 많은 호출을 받아도 상관없도록 부작용이 없어야 합니다. atom의 한 가지 제한 사항은 변경 사항을 하나의 값으로 동기화한다는 것입니다.

 ; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)

이 스크립트를 실행하면 다음을 얻습니다.

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

이 예에서 우리는 더 많은 원자를 변경하는 방법을 볼 수 있습니다. 어느 시점에서 불일치가 발생할 수 있습니다. 어떤 시간에 두 계정의 합계가 같지 않습니다. 여러 값의 변경을 조정해야 하는 경우 두 가지 솔루션이 있습니다.

  1. 하나의 원자에 더 많은 값 배치
  2. 나중에 보게 될 참조 및 소프트웨어 트랜잭션 메모리 사용
 ; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)

내 컴퓨터에서 이 스크립트를 실행하면 다음과 같은 결과가 나타납니다.

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

예제에서는 지도를 사용하여 더 많은 가치를 부여하도록 조정을 해결했습니다. 계좌에서 돈을 이체할 때, 우리는 돈의 합계가 동일하지 않은 일이 결코 일어나지 않도록 모든 계좌를 그 당시에 변경합니다.

다음 변수 데이터 유형은 에이전트입니다. 에이전트는 값을 변경하는 함수가 다른 스레드에서 실행된다는 점에서만 원자처럼 작동하므로 변경 사항이 표시되는 데 약간의 시간이 걸립니다. 따라서 에이전트의 값을 읽을 때 에이전트의 값을 변경하는 모든 함수가 실행될 때까지 기다리는 함수를 호출해야 합니다. 값을 변경하는 원자와 달리 함수는 한 번만 호출되므로 부작용이 있을 수 있습니다. 이 유형은 또한 하나의 값을 동기화할 수 있으며 교착 상태가 될 수 없습니다.

 ; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

랩톱에서 이 스크립트를 실행하면 다음과 같은 결과가 나타납니다.

 The counter is: 1000000 Number of attempts: 1000000

이 예제는 원자로 카운터를 구현한 것과 동일합니다. 유일한 차이점은 여기서 wait를 사용하여 최종 값을 읽기 전에 모든 에이전트 변경이 완료되기를 기다리고 있다는 것입니다.

마지막 변수 데이터 유형은 참조입니다. 원자와 달리 참조는 변경 사항을 여러 값으로 동기화할 수 있습니다. 참조에 대한 각 작업은 dosync를 사용하는 트랜잭션에 있어야 합니다. 데이터를 변경하는 이러한 방식을 소프트웨어 트랜잭션 메모리 또는 축약된 STM이라고 합니다. 계좌의 송금에 대한 예를 살펴보겠습니다.

 ; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)

이 스크립트를 실행하면 다음을 얻습니다.

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

흥미롭게도 거래 건수보다 시도 횟수가 더 많았습니다. 이는 STM이 잠금을 사용하지 않기 때문에 충돌이 있는 경우(예: 동일한 값을 변경하려는 두 스레드) 트랜잭션이 다시 실행됩니다. 이러한 이유로 트랜잭션에는 부작용이 없어야 합니다. 트랜잭션 내에서 값이 변경되는 에이전트가 예측 가능하게 동작하는 것을 볼 수 있습니다. 에이전트의 값을 변경하는 함수는 트랜잭션이 있는 횟수만큼 평가됩니다. 그 이유는 에이전트가 트랜잭션을 인식하기 때문입니다. 트랜잭션에 부작용이 있어야 하는 경우 에이전트 내에서 기능을 실행해야 합니다. 이러한 방식으로 프로그램은 예측 가능한 동작을 갖게 됩니다. 항상 STM을 사용해야 한다고 생각할 수도 있지만 숙련된 프로그래머는 원자가 STM보다 간단하고 빠르기 때문에 종종 원자를 사용합니다. 물론 그런 식으로 프로그램을 만드는 것이 가능하다면 말이다. 부작용이 있다면 STM과 약제를 사용하는 것 외에는 선택의 여지가 없습니다.

배우 모델

다음 동시성 모델은 액터 모델입니다. 이 모델의 원리는 현실 세계와 유사합니다. 예를 들어 건물과 같이 많은 사람들과 함께 무언가를 만드는 거래를 하면 건설 현장의 각 사람은 자신의 역할이 있습니다. 많은 사람들이 감독관의 감독을 받습니다. 작업자가 작업 중 부상을 입은 경우 감독자는 부상자의 작업을 가능한 다른 사람에게 할당합니다. 필요한 경우 그는 현장으로 새로운 사람을 인도할 수 있습니다. 사이트에는 동시에(동시에) 작업을 수행하지만 동기화하기 위해 서로 대화하는 더 많은 사람들이 있습니다. 건설현장의 일을 프로그램에 담으면 모든 사람이 자신의 상태를 갖고 실행하는 행위자가 되고 말은 메시지로 대체될 것이다. 이 모델을 기반으로 하는 인기 있는 프로그래밍 언어는 Erlang입니다. 이 흥미로운 언어에는 다른 데이터 유형과 동일한 속성을 가진 변경할 수 없는 데이터 유형과 함수가 있습니다. 함수는 프로그램 실행 중에 생성되어 다른 함수에 인수로 전달되거나 함수 호출의 결과로 반환될 수 있습니다. Erlang 가상 머신을 사용하는 Elixir 언어로 예제를 제공할 것이므로 Erlang과 동일한 프로그래밍 모델을 사용하지만 구문만 다를 뿐입니다. Elixir에서 가장 중요한 3가지 기본 요소는 spawn, send, receive입니다. spawn 은 새 프로세스에서 기능을 실행하고 send 는 메시지를 프로세스로 보내고 receive는 현재 프로세스로 보낸 메시지를 받습니다.

액터 모델이 있는 첫 번째 예는 동시에 카운터가 증가합니다. 이 모델로 프로그램을 만들려면 액터가 카운터 값을 갖도록 하고 카운터 값을 설정 및 검색하라는 메시지를 수신하고 카운터 값을 동시에 증가시킬 두 액터가 있어야 합니다.

 # # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

이 예제를 실행하면 다음을 얻습니다.

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

결국 카운터는 우리가 예상한 1000000이 아니라 516827임을 알 수 있습니다. 다음에 스크립트를 실행했을 때 511010이 수신되었습니다. 이 동작의 이유는 카운터가 현재 값을 검색하고 새 값을 설정하는 두 가지 메시지를 수신하기 때문입니다. 카운터를 증가시키려면 프로그램이 현재 값을 가져와 1씩 증가시키고 증가된 값을 설정해야 합니다. 두 프로세스는 카운터 프로세스로 보낸 메시지를 사용하여 카운터 값을 동시에 읽고 씁니다. 카운터가 수신하는 메시지의 순서는 예측할 수 없으며 프로그램에서 제어할 수 없습니다. 우리는 다음과 같은 시나리오를 상상할 수 있습니다.

  1. 카운터 값은 115입니다.
  2. 프로세스 A는 카운터(115)의 값을 읽습니다.
  3. 프로세스 B는 카운터(115)의 값을 읽습니다.
  4. 프로세스 B는 로컬에서 값을 증가시킵니다(116).
  5. 프로세스 B는 카운터(116)에 증가된 값을 설정합니다.
  6. 프로세스 A는 카운터 값을 증가시킵니다(116).
  7. 프로세스 A는 카운터(116)에 증가된 값을 설정합니다.
  8. 카운터 값은 116입니다.

시나리오를 보면 두 개의 프로세스가 카운터를 1만큼 증가시키고 결국 카운터는 2가 아니라 1만큼 증가합니다. 이러한 얽힘은 예측할 수 없는 횟수로 발생할 수 있으므로 카운터 값은 예측할 수 없습니다. 이 동작을 방지하려면 하나의 메시지로 증가 작업을 수행해야 합니다.

 # # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

이 스크립트를 실행하면 다음을 얻습니다.

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

카운터가 올바른 값을 가지고 있음을 알 수 있습니다. 예측 가능한(결정적) 동작의 이유는 카운터 값이 한 메시지만큼 증가하여 카운터를 늘리는 메시지 시퀀스가 ​​최종 값에 영향을 미치지 않기 때문입니다. 액터 모델로 작업할 때 메시지가 어떻게 얽힐 수 있는지 주의를 기울여야 하며 우발적인 예측 불가능성(비결정성)을 피하기 위해 메시지와 메시지에 대한 작업을 신중하게 설계해야 합니다.

이 모델을 사용하여 두 계정 간에 어떻게 돈을 이체할 수 있습니까?

 # # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end

이 스크립트를 실행하면 다음을 얻습니다.

 Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

우리는 프로그램의 예측 가능한 동작을 제공하는 계정의 가치를 얻기 위해 돈과 메시지 금액을 전송하기 위해 메시지 전송을 선택했기 때문에 송금이 불일치 없이 작동한다는 것을 알 수 있습니다. 우리가 돈을 이체할 때마다 그 때의 총액은 같아야 합니다.

액터 모델은 잠금 및 교착 상태를 유발할 수 있으므로 프로그램을 설계할 때 주의하십시오. 다음 스크립트는 잠금 및 교착 상태 시나리오를 시뮬레이션하는 방법을 보여줍니다.

 # # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking B, A started Waiting for locking to be done

From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.

 # # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

And now, there is no longer a deadlock.

마무리

As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.

Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.

The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.

Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.

I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.

Related: Ruby Concurrency and Parallelism: A Practical Tutorial