Table of Contents
Introduction
Concurrency involves doing or appearing to do multiple tasks simultaneously, while parallelism involves literally executing tasks at the same time using multiple CPU cores. In sequential execution, tasks are completed one after another. Concurrent execution interleaves tasks, potentially improving responsiveness but not overall completion time. Parallel execution uses multiple processors to perform tasks simultaneously, speeding up the process.
In this guide, we'll cover:
- Basic thread management and synchronization
- Thread safety with Send and Sync traits
- Shared state and inter-thread communication
- Async programming with futures and tasks
- Common pitfalls and best practices
Let's start with the fundamental building blocks: threads.
Threads
Rust provides two types of threads:
- Plain threads: Standard threads that run independently
- Scoped threads: Share scope from parent thread, allowing access to local variables
use std::thread;
// Plain thread example
thread::spawn(|| {
println!("Running in a separate thread");
});
// Scoped thread example
thread::scope(|scope| {
scope.spawn(|| {
println!("Can access parent thread's variables");
});
});
Channels
Channels enable safe communication between threads. Rust provides both bounded
and unbounded channels through mpsc
(Multi-Producer, Single-Consumer).
use std::sync::mpsc;
// Unbounded channel
let (tx, rx) = mpsc::channel();
// Bounded channel (with capacity 3)
let (tx, rx) = mpsc::sync_channel(3);
// Usage example
thread::spawn(move || {
tx.send("Hello from thread").unwrap();
});
println!("Received: {}", rx.recv().unwrap());
Key differences:
- Unbounded: Never blocks on
send()
- Bounded: Blocks when channel is full
- Both support multiple producers via
tx.clone()
Send and Sync
How does Rust know if a type is safe to send to another thread?
The answer is Send
and Sync
unsafe traits.
A type T
is Send
if it is safe to move a T
value to another thread.
A type T
is Sync
if it is safe to access a T
value from multiple threads
at the same time. More precisely, a type T
is Sync
if and only if &T
is
Send
. Send.
Shared State
Rust provides several types for safe shared state:
- Arc (Atomic Reference Counting)
Arc<T>
allows shared read-only access via Arc::clone
.
use std::sync::Arc;
let shared = Arc::new(vec![1, 2, 3]);
let clone = Arc::clone(&shared);
- Mutex (Mutual Exclusion)
Mutex<T>
provides a thread-safe way to access immutable data.
use std::sync::{Arc, Mutex};
let counter = Arc::new(Mutex::new(0));
let counter_clone = Arc::clone(&counter);
thread::spawn(move || {
*counter_clone.lock().unwrap() += 1;
});
- RwLock (Read-Write Lock)
RwLock<T>
provides safe read-write locks for concurrent access across threads.
use std::sync::RwLock;
let data = RwLock::new(5);
// Multiple readers
let _r1 = data.read().unwrap();
// Single writer
let mut _w = data.write().unwrap();
Async Programming
Async Rust allows concurrent execution without threads, using an async runtime to manage tasks. This approach is efficient for handling many concurrent tasks with low memory overhead. Async programming provides fine-grained control over task execution, offering features like cancellation and various concurrency constructs.
Async/Await Syntax
In Rust, async
is used to define asynchronous functions, and await
is used
to pause execution until a future is ready. This allows writing asynchronous
code in a sequential style.
async fn process_data() -> Result<(), Error> {
let data = fetch_data().await?;
process(data).await?;
Ok(())
}
Runtimes
Rust's async runtimes manage task execution and scheduling, interacting with the OS for async IO. Unlike other languages, Rust doesn't have a built-in runtime, offering flexibility with options like:
- Tokio: Feature-rich and production-ready.
- async-std: Similar to the standard library.
- smol: Lightweight alternative.
#[tokio::main]
async fn main() {
let task = tokio::spawn(async {
println!("Running async task");
});
task.await.unwrap();
}
Concurrency Primitives and Tasks
Concurrency in Rust involves composing futures and tasks. Unlike threads, tasks are parallelizable futures. Rust's async runtimes, like Tokio, provide tools for managing these tasks.
-
Join: Waits for all futures to complete, similar to
Promise.all
in JavaScript.use anyhow::Result; use futures::future; use reqwest; use std::collections::HashMap; async fn size_of_page(url: &str) -> Result<usize> { let resp = reqwest::get(url).await?; Ok(resp.text().await?.len()) } #[tokio::main] async fn main() { let urls = ["https://google.com", "https://httpbin.org/ip", "https://play.rust-lang.org/", "BAD_URL"]; let futures_iter = urls.iter().map(size_of_page); let results = future::join_all(futures_iter).await; let page_sizes: HashMap<_, _> = urls.iter().zip(results).collect(); println!("{:?}", page_sizes); }
-
Select: Waits for any future to complete, similar to
Promise.race
.use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(32); let listener = tokio::spawn(async move { tokio::select! { Some(msg) = rx.recv() => println!("got: {msg}"), _ = sleep(Duration::from_millis(50)) => println!("timeout"), }; }); sleep(Duration::from_millis(10)).await; tx.send(String::from("Hello!")).await.expect("Failed to send greeting"); listener.await.expect("Listener failed"); }
Futures & Tasks
Futures represent deferred computations and are the core of async concurrency. They can be combined to form larger futures. An async task is a future that is executed, often managed by a runtime like Tokio.
- Future Example:
use futures::future;
// Join (like Promise.all)
let results = future::join_all(futures).await;
// Select (like Promise.race)
tokio::select! {
result = future1 => handle_result(result),
_ = timeout => handle_timeout(),
}
// Task spawning
use tokio::task;
let handle = task::spawn(async {
println!("Running async task");
});
handle.await.unwrap();
Common Pitfalls
-
Blocking the Executor: Avoid blocking the executor with CPU-bound tasks. Use
tokio::task::spawn_blocking
for such tasks.// Bad: Blocks the async executor std::thread::sleep(Duration::from_secs(1)); // Good: Async sleep tokio::time::sleep(Duration::from_secs(1)).await;
-
Async Cancellation: Futures can be dropped at any
.await
point. UseJoinHandle
to prevent premature cancellation.use tokio::time; async fn count_to(count: i32) { for i in 0..count { println!("Count in task: {i}!"); time::sleep(time::Duration::from_millis(100)).await; } } #[tokio::main] async fn main() { let handle = tokio::spawn(count_to(10)); // Hold onto the handle time::sleep(time::Duration::from_millis(250)).await; println!("Main task finished!"); handle.await.unwrap(); // Wait for the spawned task to finish }
-
Async Traits: Use the
async_trait
crate for async traits, but be aware of performance overhead due to heap allocations.#[async_trait] trait AsyncProcessor { async fn process(&self) -> Result<(), Error>; }
Further Reading
For more advanced topics and detailed explanations, consider exploring:
- The Rust Async Book: https://rust-lang.github.io/async-book/
- Tokio documentation: https://tokio.rs/