Hi, fellow Rustaceans! 🦀
Let’s get our hands dirty with Thread boundaries in Rust.
Thread boundaries separate the execution contexts of different threads. When data crosses these boundaries — moving from one thread to another — Rust’s type system ensures that this happens safely.
This is crucial because improper data sharing across threads can lead to race conditions, where two or more threads access shared data concurrently and at least one thread modifies the data, causing unexpected behavior.
Rust prevents such issues by ensuring that:
- Any data sent across threads is owned by the receiving thread.
- Shared access to mutable data is synchronized.
The Send Trait
The Send trait is a marker trait that indicates a type's values can be transferred across thread boundaries. In other words, if a type implements Send, it's safe to move it from one thread to another. This property is crucial for ensuring thread safety in concurrent Rust programs.
Most types in Rust are Send, but there are exceptions, especially with types that wrap around system resources or those that have non-thread-safe reference counting (e.g., Rc<T>). Types that involve raw pointers may also not be Send because they can lead to undefined behavior if not used carefully.
How Rust Uses the Send Trait
Rust’s compiler automatically implements Send for types that satisfy its requirements, which primarily involve ownership and thread safety. For example:
- Primitive types are
Send because they have no ownership semantics that could be violated across threads.
- Types composed entirely of
Send types are also Send.
- Unique ownership types like
Box<T> are Send if T is Send because ownership is transferred across threads.
However, shared ownership or non-thread-safe types like Rc<T> are not Send because they use non-atomic reference counting, which can lead to data races if accessed concurrently from multiple threads.
Using Send in Concurrent Programming
When writing concurrent code in Rust, you often work with APIs that require the Send trait. For example, when spawning a new thread using std::thread::spawn, the closure and its environment must be Send because they are moved to the new thread.
use std::thread;
let x = 5;
let handle = thread::spawn(move || {
println!("Value of x is: {}", x);
});
handle.join().unwrap();
In this example, x is moved into the closure, which is executed by a new thread. Rust ensures that x is Send, allowing this operation to be safe.
Dealing with Non-Send Types
When you encounter a type that isn’t Send, you must use synchronization primitives or thread-safe wrappers to safely share or send it across threads. For instance, replacing Rc<T> with Arc<T> makes a type thread-safe and Send because Arc<T> uses atomic reference counting.
Threads Boundaries in Practice
Let’s now move from theory to exercises around thread boundaries and the Send trait to help you deepen your understanding of concurrent programming in Rust.
These exercises will range from basic to more complex scenarios, helping you grasp how to safely manage data across thread boundaries.
Exercise 1: Basic Send Compliance
Objective: Understand which types are Send and practice transferring ownership between threads.
- Create a struct
MyStruct containing a Vec<i32> and a String.
- Implement a function
spawn_and_move that takes an instance of MyStruct, spawns a new thread, and moves the instance into the thread. Print the contents of MyStruct inside the thread.
- Verify that
MyStruct is Send and that your program compiles and runs correctly.
use std::thread;
fn spawn_and_move(data: MyStruct) {
thread::spawn(move || {
});
}
fn main() {
let my_data = MyStruct { };
spawn_and_move(my_data);
}
Exercise 2: Non-Send Type Challenge
Objective: Experience firsthand what happens when you try to use a non-Send type across threads and learn how to fix it.
- Create a struct
SharedData containing an Rc<Vec<i32>>.
- Write a function
attempt_to_share that tries to spawn a thread and use SharedData inside it.
- Observe the compiler error and refactor
SharedData to use Arc instead of Rc to fix the issue.
use std::rc::Rc;
use std::sync::Arc;
use std::thread;
fn attempt_to_share(data: SharedData) {
thread::spawn(move || {
});
}
fn main() {
let shared_data = SharedData { };
attempt_to_share(shared_data);
}
Exercise 3: Implementing Send Manually
Objective: Deepen your understanding of Send by manually implementing it for a custom type that wraps a non-thread-safe type.
- Create a struct
SafeWrapper<T> that wraps a *mut T (raw pointer).
- Implement
Send for SafeWrapper<T> manually, ensuring that it's only Send when T is Send.
- Demonstrate using
SafeWrapper to send a raw pointer across threads safely.
use std::marker::PhantomData;
use std::thread;
unsafe impl<T> Send for SafeWrapper<T> where T: Send {}
fn main() {
let raw_pointer = ;
let safe_wrapper = SafeWrapper::new(raw_pointer);
thread::spawn(move || {
});
}
Exercise 4: Advanced Message Passing
Objective: Practice advanced message passing techniques between threads, emphasizing thread boundaries.
- Create two threads,
producer and consumer.
- In
producer, generate a series of MyStruct instances (from Exercise 1) and send them to consumer via a channel.
- In
consumer, receive MyStruct instances and perform some computation on the data.
- Ensure proper synchronization and handling of the channel to prevent deadlocks.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let producer = thread::spawn(move || {
for _ in 0..10 {
let data = MyStruct { };
tx.send(data).unwrap();
}
});
let consumer = thread::spawn(move || {
for received in rx {
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
Exercise 5: Synchronizing Shared State
Objective: Learn to synchronize access to mutable shared state across threads using Mutex and Arc.
- Create a shared
counter wrapped in a Mutex and an Arc.
- Spawn several threads that each increment the
counter a fixed number of times.
- Ensure that each thread can safely access and modify the
counter, and print the final value of the counter in the main thread.
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final counter value: {}", *counter.lock().unwrap());
}
Exercise 6: Error Handling Across Threads
Objective: Practice error handling in a multi-threaded context, ensuring that errors from worker threads are properly communicated back to the main thread.
- Define a task that can result in an error, such as attempting to open a non-existent file in a thread.
- Use a channel to send the result (either the successful outcome or the error) back to the main thread.
- In the main thread, handle the result by either processing the success case or reporting the error.
use std::fs::File;
use std::sync::mpsc;
use std::thread;
use std::io::{self, Read};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let result = File::open("non_existent_file.txt")
.and_then(|mut file| {
let mut contents = String::new();
file.read_to_string(&mut contents).map(|_| contents)
});
tx.send(result).unwrap();
});
match rx.recv().unwrap() {
Ok(contents) => println!("File contents: {}", contents),
Err(e) => println!("Error reading file: {:?}", e),
}
}
Exercise 7: Implementing a Barrier
Objective: Understand the use of synchronization primitives to manage the execution order in concurrent programming by implementing a barrier.
- Use
Arc and Mutex to create a shared counter and a condition variable to implement a barrier that waits for all threads to reach a certain point before proceeding.
- Spawn multiple threads that perform some work, increment the counter, and then wait at the barrier.
- Once all threads have reached the barrier, allow them to proceed and complete their tasks.
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let counter = Arc::new((Mutex::new(0), Condvar::new()));
let total_threads = 10;
for _ in 0..total_threads {
let counter = Arc::clone(&counter);
thread::spawn(move || {
let (lock, cvar) = &*counter;
let mut count = lock.lock().unwrap();
*count += 1;
if *count < total_threads {
cvar.wait(count).unwrap();
} else {
cvar.notify_all();
}
});
}
}
Exercise 8: Custom Thread Pool Implementation
Objective: Gain deeper insights into concurrent execution by implementing a basic thread pool, understanding task scheduling, and worker thread lifecycle.
- Define a
ThreadPool struct with a vector of worker threads and a sender for a task channel.
- Implement a
Worker struct that holds a thread and listens for tasks coming through a channel.
- In
ThreadPool, implement a method to spawn worker threads and a method to send tasks to workers.
- Ensure that your thread pool can gracefully shut down, joining all threads and ensuring all tasks are completed.
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
fn main() {
let pool = ThreadPool::new(4);
for _ in 0..8 {
pool.execute(|| {
println!("Executing task");
});
}
}
Exercise 9: Atomic Operations for Synchronization
Objective: Explore the use of atomic operations to manage shared state without locking, understanding the performance implications and use cases.
- Use an
AtomicUsize to implement a shared counter that can be safely incremented by multiple threads without using a mutex.
- Spawn multiple threads that each increment the counter a certain number of times.
- Ensure the final count reflects the total number of increments from all threads.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() {
let counter = AtomicUsize::new(0);
let mut handles = vec![];
for _ in 0..10 {
let counter = &counter;
let handle = thread::spawn(move || {
for _ in 0..100 {
counter.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final counter value: {}", counter.load(Ordering::SeqCst));
}
Exercise 10: Exploring Thread Local Storage
Objective: Understand thread local storage and its use cases by creating thread-specific data structures that are not shared across threads.
- Use the
thread_local! macro to create a thread-local Vec<i32>.
- Spawn several threads, each manipulating its own thread-local vector (e.g., adding numbers).
- Demonstrate that each thread maintains its separate vector and that no data is shared between threads.
use std::thread;
thread_local!(static NUMBERS: std::cell::RefCell<Vec<i32>> = std::cell::RefCell::new(Vec::new()));
fn main() {
let handles: Vec<_> = (0..10).map(|i| {
thread::spawn(move || {
NUMBERS.with(|numbers| {
let mut numbers = numbers.borrow_mut();
numbers.push(i);
println!("Thread {:?}: {:?}", thread::current().id(), *numbers);
});
})
}).collect();
for handle in handles {
handle.join().unwrap();
}
}
Master Concurrency & Async hands-on
Go beyond reading — solve interactive exercises with AI-powered code review, track your progress, and get a Skill Radar assessment.