I saw Steve Klabnik’s fantastic, and highly recommended talk on parallel, concurrent, and async programming in Rust’s Journey to Async/Await.
After watching, I decided to condense the main points to solidify my grasp of it and add pieces that might be helpful.
Table of contents
Open Table of contents
Definitions
- Parallel Computing => Doing multiple tasks at the same time, only possible with multiple cores or CPUs.
- Concurrent Computing => Doing multiple tasks but not at the same time. Imagine a juggler always working on one task but having multiple in the air at once.
- Synchronous Code => Code that is executed sequentially, one task at a time per core or CPU.
- Asynchronous Code => Code that may be executed concurrently and/or in parallel.
Synchronous code
When working with synchronous code we can think of it as a simple calculation function. Let’s call this a task
.
function add(first: number, second: number): number {
return first + second;
}
It will run on a single thread, and use that thread by itself for the computation.
Similarly, we can do I/O operations in a synchronous fashion. Below are some examples of tasks in TypeScript, Rust, and Python.
import fs from "fs";
function readFileSyncExample(filePath: string): string {
// This will block the event loop until the file is read
// We'll talk more about the event loop later
return fs.readFileSync(filePath, "utf8");
}
function writeFileSyncExample(filePath: string, data: string): void {
// This will block the event loop until the file is written
// We'll talk more about the event loop later
fs.writeFileSync(filePath, data);
}
def read_file_sync(file_path: str) -> str:
with open(file_path, 'r') as file:
return file.read()
def write_file_sync(file_path: str, data: str) -> None:
with open(file_path, 'w') as file:
file.write(data)
use std::fs::File;
use std::io;
fn read_file_sync(file_path: &str) -> io::Result<String> {
let mut file = File::open(file_path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
Ok(contents)
}
fn write_file_sync(file_path: &str, data: &str) -> io::Result<()> {
let mut file = File::create(file_path)?;
file.write_all(data.as_bytes())?;
Ok(())
}
In this case, our CPU will read or write by issuing an I/O request to the disk. The disk, in turn, has its own hardware designed solely for processing commands from the CPU. This means, that we don’t necessarily need to wait for the I/O request to finish before we move on to the next task. The same goes for network requests, where the command from the CPU gets handled by the network interface card.
When any of these external devices are done with the request, they store the data in RAM by a feature called DMA - Direct memory access, where they can bypass the CPU and write directly to RAM.
When done, they send back control to the CPU by letting it know that the data has been loaded into RAM.
All of this is to say, that there is no need for us to let our CPU sit and idle while waiting for the I/O to finish. We could initiate an I/O task, and then move on with another task, like running the add
function calculation above, before continuing with the I/O.
So let’s dive into concurrency, meaning we can do multiple things, but not at the exact same time.
Asynchronous code
We’ve looked at a simple case where it would be performant to initiate an I/O operation, hand over its responsibility, and continue with a CPU-bound task, before finishing the I/O.
Think of the code earlier added together
import fs from "fs";
function add(first: number, second: number): number {
return first + second;
}
function readFileSyncExample(filePath: string): string {
// This will block the event loop until the file is read
// We'll talk more about the event loop later
return fs.readFileSync(filePath, "utf8");
}
function runTasksSequentially() {
// Start and wait for I/O task to finish
const fileContents = readFileSyncExample("path/to/your/file.txt");
console.log("File contents:", fileContents);
// Now start the CPU-bound task
const total = add(1, 2);
console.log("Total:", total);
}
runTasksSequentially();
// File contents: <content>
// Total: 3
def add(first, second):
return first + second
def read_file_sync_example(file_path):
with open(file_path, 'r', encoding='utf8') as file:
return file.read()
def run_tasks_sequentially():
file_contents = read_file_sync_example('path/to/your/file.txt')
print("File contents:", file_contents)
total = add(1, 2)
print("Total:", total)
run_tasks_sequentially()
# File contents: <content>
# Total: 3
use std::fs;
use std::io;
fn add(first: i32, second: i32) -> i32 {
first + second
}
fn read_file_sync_example(file_path: &str) -> io::Result<String> {
fs::read_to_string(file_path)
}
fn run_tasks_sequentially() -> io::Result<()> {
let file_contents = read_file_sync_example("path/to/your/file.txt")?;
println!("File contents: {}", file_contents);
let total = add(1, 2);
println!("Total: {}", total);
Ok(())
}
fn main() {
match run_tasks_sequentially() {
Ok(()) => (),
Err(e) => eprintln!("An error occurred: {}", e),
}
}
// File contents: <content>
// Total: 3
Instead of waiting for the file content to be loaded, we can use different methods to let our thread continue with the CPU work instead of waiting for the I/O.
Two common ways are to either spawn a new virtual thread
or to use an event loop
. There are also green threads
, which often gets confused with virtual threads
.
These implementations depend on their respective runtime
. For example, node.js has an event loop
, just like the server nginx
. And golang
uses virtual threads
in their goroutines
.
Python is an example where we have:
- Native threading in multiprocessing
- Virtual threading in threading
- Event loop in asyncio
Don’t worry, we’ll explain all in more detail later.
Let’s get back to our example, and this time make sure we initialize the I/O and then continue with the CPU-bound calculation during the I/Os work.
import fs from "fs/promises"; // this is the async import
function add(first: number, second: number): number {
return first + second;
}
// A Promise means something in the future. In below case, it will
// either return a `string` or throw an error.
function readFileSyncExample(filePath: string): Promise<string> {
// We'll talk more about the event loop later
return fs.readFileSync(filePath, "utf8");
}
// we mark the function as async
async function runTasksConcurrently() {
// Initiate and hand over the I/O task
const fileContentPromise = readFileSyncExample("path/to/your/file.txt");
// Now start the CPU-bound task performCpuIntensiveTask();
const total = add(1, 2);
console.log("Total:", total);
// Grab the results from the I/O task
const fileContents = await fileContentPromise;
console.log("File contents:", fileContents);
}
runTasksConcurrently();
// Total: 3
// File contents: <content>
import asyncio
import aiofiles
async def add(first, second):
return first + second
async def read_file_async_example(file_path):
# Asynchronously read a file and return its contents
async with aiofiles.open(file_path, mode='r', encoding='utf8') as file:
return await file.read()
async def run_tasks_concurrently():
# Initiate the file reading but don't wait for it yet
file_content_future = read_file_async_example('path/to/your/file.txt')
# Perform the CPU-bound task
total = await add(1, 2)
print("Total:", total)
# Now wait for the I/O task to complete
file_contents = await file_content_future
print("File contents:", file_contents)
asyncio.run(run_tasks_concurrently())
# Total: 3
# File contents: <content>
use tokio::fs;
use tokio::io;
async fn add(first: i32, second: i32) -> i32 {
first + second
}
async fn read_file_async_example(file_path: &str) -> io::Result<String> {
// Asynchronously read a file and return its contents
fs::read_to_string(file_path).await
}
async fn run_tasks_concurrently() -> io::Result<()> {
// Initiate the file reading but don't wait for it yet
let file_content_future = read_file_async_example("path/to/your/file.txt");
// Perform the CPU-bound task
let total = add(1, 2).await;
println!("Total: {}", total);
// Now wait for the I/O task to complete
let file_contents = file_content_future.await?;
println!("File contents: {}", file_contents);
Ok(())
}
#[tokio::main]
async fn main() {
// Run the async main function and handle errors
match run_tasks_concurrently().await {
Ok(()) => (),
Err(e) => eprintln!("An error occurred: {}", e),
}
}
This is a very simple and fast example, but imagine if the I/O is a 500ms call to an external API, and you can quickly see how much time we’d waste being blocked by the I/O.
We’ve now successfully moved from synchronous, blocking code, into asynchronous concurrent code.
Let’s talk a bit more about the different kinds of concurrency. We’ll look at the event loop and the virtual (or green) threads. There are more ways of doing this, but I will limit it to these two as they are the most prevalent.
Threading
Definitions are difficult, and it seems to me that the community has not decided on the exact definition of a virtual thread vs a green thread. (insert links)
Here, we will call a virtual thread a thread that is controlled by the runtime where the runtime can also use the underlying OS threads. So we can spawn as many M (virtual) threads as we want on any of the available OS threads (N).
Green threads are run by the runtime only, and not using any additional underlying OS threads.
Native threads are the threads the OS has access to. In other words, bound by the CPUs and cores on the hardware level.
This gives us the below (slightly modified) table from user Reg
at stackoverflow
Thread Type | Description | Java Thread Type (M) : Native Threads(N) |
---|---|---|
Native Threads | A wrapper for OS Threads. | 1:1 |
Green Threads | Runs multiple “Green Threads” on a single OS Thread. | M:1 |
Virtual Threads | Runs multiple Virtual Threads on Multiple OS threads | M:N (M > N) |
Using Pythons’s threading, we get below running on a thread, finishing the CPU add function before the I/O.
import threading
def add(first, second):
return first + second
# Read file content in a separate thread
def read_file(file_path):
with open(file_path, 'r') as file:
return file.read()
# The main function that will run tasks concurrently
def run_tasks_concurrently(file_path):
# Create a thread for the file reading
file_thread = threading.Thread(target=read_file, args=(file_path,))
file_thread.start()
# Perform the CPU-bound task
total = add(1, 2)
print("Total:", total)
# Wait for the file reading thread to finish
file_thread.join()
# Assuming the file reading function stores the result in a shared variable
file_contents = read_file(file_path)
print("File contents:", file_contents)
# Example file path, replace with your file path
file_path = 'path/to/your/file.txt'
run_tasks_concurrently(file_path)
# Total: 3
# File contents: <content>
There are different pros and cons to each threading style. But in essence, you can think of native threads as being very straightforward and where you can actually run multiple CPU processes at once. But they’re limited to the amount of available threads on an OS level.
Green threads, on the other hand, are limited to a single OS thread and not suitable for multiple CPU processes. But they’re fantastic when wanting to do multiple I/O operations where you can spawn huge amounts of threads.
Finally, virtual threads handle both cases. Where the runtime can handle multiple CPU processing tasks given its access to OS threads, and also spawn large amounts of its own threads for I/O. The downside is that it takes extra overhead to have its own runtime and scheduler, like with green threads. One can also run into problems with too many threads being spawned at once and additional latency when calling into the underlying OS threads.
Moving on, let’s have a look at the event loop
mentioned in the TypeScript code.
Event Loop
There are other ways to work around the need to have one thread or process per I/O operation, with an event loop we can coalesce many of them in a single process.
In practice, you can think of it as each I/O task
is an event
that gets pushed to a queue. And each event
has a handler.
So the code looks synchronous again, but each operation is not blocking another.
This is because the queue is being handled by a separate OS thread “under the hood”. In node.js case, it uses libuv
and you can read more about it in their docs. There are also some excellent detailed docs in the nodejs docs site about their event loop and why one should not block the event loop.
This way, we can write concurrent asynchronous code where all our I/O is non-blocking.
Parallel programming
As we’re starting to wrap up, let’s add an example of actual parallel
code. Where we run the add function on two native threads
using Rust’s standard library.
use std::thread;
fn add(first: i32, second: i32) -> i32 {
first + second
}
fn main() {
// Example: a list of tuples to add
let number_pairs = vec![(1, 2), (3, 4), (5, 6), (7, 8)];
// Spawn a thread for each addition operation
let mut handles = Vec::new();
for (a, b) in number_pairs {
handles.push(thread::spawn(move || {
add(a, b)
}));
}
// Wait for all threads to complete and collect results
let results: Vec<_> = handles.into_iter().map(|handle| handle.join().unwrap()).collect();
println!("Results: {:?}", results);
}
If we tried this with a green thread
as defined earlier, they would still not execute in parallel.
Wrap-up
To wrap it all up, let’s showcase an example where are running both concurrent and parallel code in one snippet. We’ll make use of native threads for the CPU-bound tasks and event loops for the IO-bound tasks. In TypeScript we use node.js worker threads and the node runtime’s event loop. In Python we use multiprocessing and asyncio. Finally, in the Rust snippet we use native threads for the CPU-bound tasks and an event-loop provided by the tokio runtime for IO.
const {
Worker: NodeWorker,
isMainThread,
parentPort,
} = require("worker_threads");
// Function to perform a simulated asynchronous task
async function fetchData(seconds: number): Promise<[number, number]> {
const startTime = new Date().getTime();
await new Promise((resolve) => setTimeout(resolve, seconds * 1000));
const endTime = new Date().getTime();
return [startTime, endTime];
}
// Function to calculate the Fibonacci number
function calculateFibonacci(n: number): number {
if (n <= 1) return n;
return calculateFibonacci(n - 1) + calculateFibonacci(n - 2);
}
interface WorkerData {
start: number;
end: number;
}
// Main thread logic
if (isMainThread) {
(async () => {
// Asynchronous tasks
let promises: Promise<[number, number]>[] = [];
for (let i = 0; i < 3; i++) {
promises.push(fetchData(3));
}
// Parallel tasks
const workers = [];
for (let i = 0; i < 3; i++) {
const worker = new NodeWorker(__filename, {
workerData: 30,
});
workers.push(worker);
worker.on("message", (message: WorkerData) => {
console.log(
`Parallel: started at: ${message.start} ms, ended at: ${message.end} ms`
);
});
}
await Promise.all(
promises.map(async (promise) => {
const [start, end] = await promise;
console.log(`Async: started at: ${start} ms, ended at: ${end} ms`);
})
);
await Promise.all(
workers.map(
(worker) => new Promise((resolve) => worker.on("exit", resolve))
)
);
})();
} else {
// Worker thread logic
if (parentPort) {
const startTime = new Date().getTime();
calculateFibonacci(30);
const endTime = new Date().getTime();
const message: WorkerData = {
start: startTime,
end: endTime,
};
parentPort.postMessage(message);
}
}
// Parallel: started at: 1701900169892 ms, ended at: 1701900169903 ms
// Parallel: started at: 1701900169892 ms, ended at: 1701900169903 ms
// Parallel: started at: 1701900169894 ms, ended at: 1701900169905 ms
// Async: started at: 1701900169868 ms, ended at: 1701900172871 ms
// Async: started at: 1701900169869 ms, ended at: 1701900172871 ms
// Async: started at: 1701900169869 ms, ended at: 1701900172871 ms
import asyncio
import multiprocessing
import time
# Asynchronous function simulating data fetching with a delay
async def fetch_data(seconds):
start_time = int(time.time() * 1000)
await asyncio.sleep(seconds)
end_time = int(time.time() * 1000)
return start_time, end_time
# Synchronous function for calculating the Fibonacci number
def calculate_fibonacci(n):
start_time = int(time.time() * 1000)
def fib(n):
if n <= 1:
return n
else:
return fib(n-1) + fib(n-2)
fib(n) # The result is not used, just the timing
end_time = int(time.time() * 1000)
return start_time, end_time
# Function to be executed in a separate process
def process_task(n, queue):
start, end = calculate_fibonacci(n)
queue.put((start, end))
# Main function
async def main():
# Spawn three I/O bound tasks
async_tasks = [asyncio.create_task(fetch_data(3)) for _ in range(3)]
# Spawn three CPU bound tasks
processes = []
queue = multiprocessing.Queue()
for _ in range(3):
p = multiprocessing.Process(target=process_task, args=(30, queue))
processes.append(p)
p.start()
# Await all asynchronous tasks
for task in async_tasks:
start, end = await task
print(f"Async: started at: {start} ms, ended at: {end} ms")
# Get results from processes
for p in processes:
p.join()
while not queue.empty():
start, end = queue.get()
print(f"Parallel: started at: {start} ms, ended at: {end} ms")
if __name__ == '__main__':
# Run the main function
asyncio.run(main())
# Async: started at: 1701893167911 ms, ended at: 1701893170914 ms
# Async: started at: 1701893167912 ms, ended at: 1701893170914 ms
# Async: started at: 1701893167912 ms, ended at: 1701893170914 ms
# Parallel: started at: 1701893167958 ms, ended at: 1701893168062 ms
# Parallel: started at: 1701893167958 ms, ended at: 1701893168062 ms
# Parallel: started at: 1701893167960 ms, ended at: 1701893168064 ms
use std::time::{SystemTime, UNIX_EPOCH};
use tokio;
// Asynchronous function simulating data fetching with a delay
async fn fetch_data(seconds: i32) -> (u128, u128) {
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
// Simulate a variable delay for each task
tokio::time::sleep(tokio::time::Duration::from_secs(seconds as u64)).await;
let end_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
(start_time, end_time)
}
// Synchronous function for calculating the Fibonacci number
fn calculate_fibonacci(n: u32) -> (u32, u128, u128) {
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
let result = match n {
0 => 0,
1 => 1,
_ => calculate_fibonacci(n - 1).0 + calculate_fibonacci(n - 2).0,
};
let end_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
(result, start_time, end_time)
}
#[tokio::main]
async fn main() {
// Spawn three I/O bound tasks
let async_tasks = vec![3, 3, 3]
.into_iter()
.map(|id| tokio::spawn(async move { fetch_data(id).await }))
.collect::<Vec<_>>();
// Spawn three CPU bound tasks
let parallel_tasks = vec![30, 30, 30]
.into_iter()
.map(|n| std::thread::spawn(move || calculate_fibonacci(n)))
.collect::<Vec<_>>();
// Await all asynchronous tasks and join all parallel tasks
for task in async_tasks {
let (start, end) = task.await.expect("Failed to complete async task");
println!("Async: started at: {} ms, ended at: {} ms", start, end);
}
for task in parallel_tasks {
let (_, start, end) = task.join().expect("Failed to complete parallel task");
println!("Parallel: started at: {} ms, ended at: {} ms", start, end);
}
}
// Async: started at: 1701890312805 ms, ended at: 1701890315806 ms
// Async: started at: 1701890312805 ms, ended at: 1701890315806 ms
// Async: started at: 1701890312805 ms, ended at: 1701890315806 ms
// Parallel: started at: 1701890312805 ms, ended at: 1701890312997 ms
// Parallel: started at: 1701890312805 ms, ended at: 1701890312997 ms
// Parallel: started at: 1701890312805 ms, ended at: 1701890312997 ms
As you can see, in all of the above examples we start and end all concurrent and parallel tasks around the same time. Proving we can do both.
I’m hoping this will all have helped categorize what is what in the world of asynchronous programming.
There are more topics if you are curious to go deeper, e.g. Cooperative Multitasking and Preemptive Multitasking. But for brevity’s sake, I’ll stop here with this condensed and general overview.
Thanks for reading and please feel free to reach out if you have questions, spot some errors, or simply have any general feedback. I would love to hear from you!