스레드 간 데이터 전송을 위한 메시지 패싱 활용
스레드 간 안전한 동시성을 보장하는 점점 더 인기 있는 접근 방식은 _메시지 패싱_이다. 이 방식에서는 스레드나 액터가 데이터를 포함한 메시지를 서로 주고받으며 통신한다. Go 언어 문서에 나온 슬로건으로 이 개념을 설명할 수 있다: “메모리를 공유하여 통신하지 말고, 통신을 통해 메모리를 공유하라.”
메시지 전송을 통해 동시성을 구현하기 위해 Rust의 표준 라이브러리는 채널(channel)을 제공한다. _채널_은 데이터를 한 스레드에서 다른 스레드로 전송하는 일반적인 프로그래밍 개념이다.
프로그래밍에서 채널은 강이나 하천과 같은 방향성이 있는 물줄기로 비유할 수 있다. 강에 고무 오리를 떨어뜨리면 물줄기를 따라 하류로 이동하는 것처럼, 채널도 데이터를 한쪽에서 다른쪽으로 전달한다.
채널은 두 부분으로 구성된다: 송신자(transmitter)와 수신자(receiver). 송신자는 강의 상류에 고무 오리를 떨어뜨리는 위치에 해당하고, 수신자는 고무 오리가 도착하는 하류에 해당한다. 코드의 한 부분은 전송할 데이터와 함께 송신자의 메서드를 호출하고, 다른 부분은 수신자에서 도착한 메시지를 확인한다. 송신자나 수신자 중 하나가 제거되면 채널은 닫힌 상태가 된다.
여기서는 한 스레드가 값을 생성해 채널로 보내고, 다른 스레드가 그 값을 받아 출력하는 프로그램을 만들어 볼 것이다. 채널을 사용해 스레드 간에 간단한 값을 전송하는 예제를 통해 이 기능을 설명한다. 이 기술에 익숙해지면, 채팅 시스템이나 여러 스레드가 계산의 일부를 수행한 후 결과를 하나의 스레드로 집계하는 시스템과 같이 서로 통신해야 하는 스레드에 채널을 활용할 수 있다.
먼저, 리스트 16-6에서 채널을 생성하지만 아무 작업도 하지 않는다. Rust가 채널을 통해 어떤 타입의 값을 전송할지 알 수 없기 때문에 이 코드는 아직 컴파일되지 않는다.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx
와 rx
에 두 부분 할당mpsc::channel
함수를 사용해 새로운 채널을 생성한다. mpsc
는 _multiple producer, single consumer_의 약자이다. Rust의 표준 라이브러리가 채널을 구현하는 방식은 여러 개의 송신 끝이 값을 생성할 수 있지만, 수신 끝은 하나만 있어 값을 소비한다는 것을 의미한다. 여러 개의 물줄기가 하나의 큰 강으로 합쳐지는 것을 상상해보라. 어떤 물줄기에든 떨어뜨린 것은 결국 하나의 강에 도달한다. 지금은 단일 송신자로 시작하지만, 이 예제가 동작하면 여러 송신자를 추가할 것이다.
mpsc::channel
함수는 튜플을 반환한다. 첫 번째 요소는 송신 끝인 송신자(transmitter)이고, 두 번째 요소는 수신 끝인 수신자(receiver)이다. tx
와 rx
는 각각 _transmitter_와 _receiver_를 나타내는 전통적인 약어이므로, 각 끝을 나타내기 위해 변수명을 이렇게 짓는다. 튜플을 분해하는 패턴과 함께 let
문을 사용한다. let
문에서 패턴을 사용하는 방법과 튜플 분해에 대해서는 19장에서 다룰 것이다. 지금은 mpsc::channel
이 반환한 튜플의 조각을 추출하는 편리한 방법으로 let
문을 사용한다는 것만 알아두자.
이제 송신자를 새로운 스레드로 옮겨 문자열 하나를 보내도록 하여, 생성된 스레드가 메인 스레드와 통신하도록 해보자. 리스트 16-7과 같다. 이는 강의 상류에 고무 오리를 떨어뜨리거나 한 스레드에서 다른 스레드로 채팅 메시지를 보내는 것과 같다.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
tx
를 생성된 스레드로 옮기고 "hi"
보내기다시 thread::spawn
을 사용해 새로운 스레드를 생성하고, move
를 사용해 tx
를 클로저로 이동시켜 생성된 스레드가 tx
를 소유하도록 한다. 생성된 스레드는 채널을 통해 메시지를 보내기 위해 송신자를 소유해야 한다.
송신자는 전송할 값을 인수로 받는 send
메서드를 갖는다. send
메서드는 Result<T, E>
타입을 반환하므로, 수신자가 이미 제거되어 값을 보낼 곳이 없다면 전송 작업은 에러를 반환한다. 이 예제에서는 에러가 발생하면 패닉을 일으키기 위해 unwrap
을 호출한다. 하지만 실제 애플리케이션에서는 적절히 처리할 것이다: 적절한 에러 처리 전략을 복습하려면 9장으로 돌아가보자.
리스트 16-8에서는 메인 스레드에서 수신자로부터 값을 가져온다. 이는 강의 끝에서 고무 오리를 꺼내거나 채팅 메시지를 받는 것과 같다.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {received}"); }
"hi"
값을 받아 출력하기수신자는 recv
와 try_recv
라는 두 가지 유용한 메서드를 제공한다. 여기서는 _receive_의 약자인 recv
를 사용한다. 이 메서드는 메인 스레드의 실행을 블로킹하고 채널로 값이 전송될 때까지 기다린다. 값이 전송되면 recv
는 Result<T, E>
로 값을 반환한다. 송신자가 닫히면 recv
는 더 이상 값이 오지 않음을 알리는 에러를 반환한다.
try_recv
메서드는 블로킹하지 않고 즉시 Result<T, E>
를 반환한다: 메시지가 있으면 Ok
값을, 이번에 메시지가 없으면 Err
값을 반환한다. 이 스레드가 메시지를 기다리는 동안 다른 작업을 해야 한다면 try_recv
를 사용하는 것이 유용하다: 주기적으로 try_recv
를 호출하는 루프를 작성하고, 메시지가 있으면 처리하고, 그렇지 않으면 잠시 다른 작업을 수행한 후 다시 확인할 수 있다.
이 예제에서는 간단히 recv
를 사용했다. 메인 스레드가 메시지를 기다리는 것 외에 다른 작업을 할 필요가 없으므로 메인 스레드를 블로킹하는 것이 적절하다.
리스트 16-8의 코드를 실행하면 메인 스레드에서 값이 출력되는 것을 볼 수 있다:
Got: hi
완벽하다!
채널과 소유권 이전
소유권 규칙은 메시지 전송에서 중요한 역할을 한다. 이 규칙은 안전하고 동시성 있는 코드를 작성하는 데 도움을 준다. 동시성 프로그래밍에서 오류를 방지하는 것은 Rust 프로그램 전반에서 소유권을 고려하는 것의 장점이다. 채널과 소유권이 어떻게 함께 작동해 문제를 방지하는지 실험을 통해 확인해 보자. 채널로 값을 보낸 후, 스폰된 스레드에서 val
값을 사용하려고 시도할 것이다. 이 코드가 왜 허용되지 않는지 확인하기 위해 Listing 16-9의 코드를 컴파일해 보자.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
val
을 사용하려는 시도여기서는 tx.send
를 통해 채널로 값을 보낸 후 val
을 출력하려고 한다. 이를 허용하는 것은 좋지 않은 아이디어다. 값이 다른 스레드로 전송된 후, 그 스레드가 값을 수정하거나 삭제할 수 있기 때문이다. 이로 인해 데이터가 일관되지 않거나 존재하지 않아 오류나 예상치 못한 결과가 발생할 수 있다. 그러나 Listing 16-9의 코드를 컴파일하려고 하면 Rust가 오류를 발생시킨다:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
동시성 실수로 인해 컴파일 타임 오류가 발생했다. send
함수는 파라미터의 소유권을 가져가고, 값이 이동되면 수신자가 그 소유권을 가진다. 이는 값을 보낸 후 실수로 다시 사용하는 것을 막아준다. 소유권 시스템이 모든 것이 올바른지 확인한다.
여러 값을 보내고 수신자가 기다리는 모습 관찰하기
리스트 16-8의 코드는 컴파일되고 실행되었지만, 두 개의 스레드가 채널을 통해 서로 통신하는 모습을 명확히 보여주지 못했다. 리스트 16-10에서는 리스트 16-8의 코드가 동시에 실행되고 있음을 증명하기 위해 몇 가지 수정을 가했다. 이제 생성된 스레드는 여러 메시지를 보내고 각 메시지 사이에 1초간 일시 정지한다.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
이번에는 생성된 스레드가 메인 스레드로 보낼 문자열 벡터를 가지고 있다. 이를 순회하며 각 문자열을 개별적으로 보내고, thread::sleep
함수를 호출해 1초간 일시 정지한다.
메인 스레드에서는 더 이상 recv
함수를 명시적으로 호출하지 않는다. 대신 rx
를 이터레이터로 취급한다. 받은 각 값을 출력하고, 채널이 닫히면 이터레이션이 종료된다.
리스트 16-10의 코드를 실행하면 각 줄 사이에 1초의 간격을 두고 다음과 같은 출력을 확인할 수 있다:
Got: hi
Got: from
Got: the
Got: thread
메인 스레드의 for
루프에는 일시 정지나 지연을 발생시키는 코드가 없기 때문에, 메인 스레드가 생성된 스레드로부터 값을 받기 위해 기다리고 있음을 알 수 있다.
여러 생산자 생성하기: 송신자 복제하기
이전에 mpsc
가 multiple producer, single consumer(다중 생산자, 단일 소비자)의 약자라고 언급했다. 이제 mpsc
를 활용해 Listing 16-10의 코드를 확장하여 여러 스레드가 동일한 수신자에게 값을 보내는 예제를 만들어보자. Listing 16-11에서 보여주는 것처럼 송신자를 복제하여 이를 구현할 수 있다.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
이번에는 첫 번째 스레드를 생성하기 전에 송신자에 clone
메서드를 호출한다. 이렇게 하면 첫 번째 스레드에 전달할 새로운 송신자를 얻을 수 있다. 원본 송신자는 두 번째 스레드에 전달한다. 이렇게 하면 두 개의 스레드가 각기 다른 메시지를 동일한 수신자에게 보내게 된다.
이 코드를 실행하면 다음과 같은 출력이 나타날 것이다:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
시스템에 따라 값의 순서가 달라질 수 있다. 이는 동시성의 흥미로운 점이자 어려운 점이다. thread::sleep
을 사용해 각 스레드에 다양한 값을 주면서 실험해보면, 실행마다 더 비결정적이 되어 매번 다른 출력을 생성할 것이다.
이제 채널이 어떻게 동작하는지 살펴봤으니, 다른 동시성 방법을 알아보자.