Asynchronous programming can be a bit challenging to grasp, but it’s incredibly useful for improving the performance of programs that deal with data over time, like network responses or file streams.
What Are Asynchronous Streams?
Asynchronous Nature: In Rust, operations are typically synchronous, meaning they block the thread until they complete. Asynchronous operations, on the other hand, allow other tasks to run while waiting for an operation to finish. This is particularly useful for IO-bound tasks (like network or file operations) where you don’t want to block the execution while waiting for a response from a disk read or a network request.
Streams: A stream in Rust can be thought of as an asynchronous version of an iterator. While an iterator yields a sequence of items synchronously, a stream yields a sequence of items asynchronously. This means that each item in the stream can be processed as it becomes available, without waiting for the entire sequence to be ready.
Under the Hood: Asynchronous streams in Rust are primarily facilitated through the futures and tokio crates. The futures crate provides the foundational traits and functions for asynchronous operations, including streams, while tokio offers a runtime environment where these asynchronous tasks can be executed.
When and Why Use Asynchronous Streams?
- Handling Large or Infinite Data: If you’re dealing with a large amount of data that comes in over time (like logs from a server) or an infinite sequence (like stock market ticks), asynchronous streams are ideal. They allow you to process each piece of data as soon as it arrives, rather than waiting for the entire dataset to be available.
- Improving Throughput in IO-bound Applications: In scenarios where your application spends a lot of time waiting for IO operations (like network requests or disk reads), asynchronous streams can significantly improve throughput. While one part of the stream is waiting for IO, other parts can continue processing, making the application more efficient.
- Real-Time Data Processing: Applications that require real-time processing of data, such as chat applications, real-time analytics, or live data feeds, benefit greatly from asynchronous streams. They can handle incoming data as soon as it’s available without blocking other operations.
- Resource Efficiency: Since asynchronous operations don’t block threads, they allow for more efficient use of system resources. This is particularly important in scenarios where resources are limited, such as in embedded systems or serverless environments.
- Scalability: Applications that need to scale to handle high levels of traffic, such as web servers, can benefit from asynchronous streams. They allow these applications to handle more requests concurrently, improving scalability.
Implementation example
Let’s build a simple asynchronous web server using Rust with the hyper crate. This server will listen for HTTP requests and respond with a streamed message. The message will be broken into multiple parts and sent asynchronously, showcasing how streams work in Rust.
Step-by-Step Guide
Setting Up Dependencies
First, you need to set up your Rust environment and Cargo.toml. Add the following dependencies:
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
hyper = "0.14"
Here, tokio is our async runtime, futures provides utilities for streams and futures, and hyper is for building HTTP servers and clients.
Writing the Stream Function
Let’s write a function that creates a stream. This stream will asynchronously send a series of strings to the client.
use futures::stream::{self, Stream};
fn create_message_stream() -> impl Stream<Item = Result<&'static str, std::io::Error>> {
stream::iter(vec![
Ok("Hello"),
Ok(", "),
Ok("world"),
Ok("!"),
Ok("\n"),
])
}
This function returns a stream of Result<&'static str, std::io::Error>. The stream::iter function turns a vector into a stream, where each vector element is yielded in sequence.
Setting Up the Async Web Server
Now, let’s set up the web server using hyper. We'll define a function to handle incoming HTTP requests and use our create_message_stream function to respond.
use hyper::{Body, Response, Server, Request, service::{make_service_fn, service_fn}};
use std::convert::Infallible;
async fn handle_request(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
let message_stream = create_message_stream();
let response_body = Body::wrap_stream(message_stream);
Ok(Response::new(response_body))
}
#[tokio::main]
async fn main() {
let make_svc = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(handle_request))
});
let addr = ([127, 0, 0, 1], 3000).into();
let server = Server::bind(&addr).serve(make_svc);
println!("Listening on http://{}", addr);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
In this code:
- The
handle_requestfunction is an asynchronous function that responds to each HTTP request. It usescreate_message_streamto generate a stream of messages. Body::wrap_streamconverts our stream into ahyperbody, which can be sent as an HTTP response.- The
mainfunction sets up the server usingtokioas the async runtime. We define a service that handles incoming connections using ourhandle_requestfunction.
Running the Server
To run the server, use cargo run. Once it's running, you can access http://127.0.0.1:3000 from a web browser or use a tool like curl to see the streamed response.
Consuming Streams
To consume items from a stream, you use methods like next, for_each, or collect. Let’s see another example, writing a simple function that returns a stream that asynchronously yields numbers from 1 to 5:
use futures::stream::{self, StreamExt};
async fn simple_number_stream() -> impl futures::Stream<Item = i32> {
stream::iter(1..=5)
}
To consume items from a stream, you use methods like next, for_each, or collect. Here's an example using for_each:
#[tokio::main]
async fn main() {
let number_stream = simple_number_stream();
number_stream
.for_each(|number| {
println!("Received number: {}", number);
futures::future::ready(())
})
.await;
}
Combining Streams with Other Async Operations
Streams can be combined with other async operations. For instance, you might want to perform an asynchronous operation for each item in the stream:
async fn process_number(number: i32) -> i32 {
// Simulate an async operation
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
number * 2
}
#[tokio::main]
async fn main() {
let number_stream = simple_number_stream();
number_stream
.map(|number| process_number(number))
.buffer_unordered(5)
.for_each(|processed_number| {
println!("Processed number: {:?}", processed_number);
futures::future::ready(())
})
.await;
}
Merging and Combining Streams in Rust
Merging and combining streams is a powerful feature in Rust’s asynchronous programming model, allowing developers to efficiently process multiple sequences of data concurrently. This capability is particularly useful in scenarios where data from different sources needs to be aggregated, processed, or transformed in a unified way.
Understanding the Concept
In Rust, streams can be merged or combined using various combinators provided by the futures crate. These combinators allow for operations like chaining, zipping, selecting, and merging multiple streams. Each combinator has its specific use case:
- Chain: Concatenates two streams end-to-end.
- Zip: Pairs elements from two streams one-by-one.
- Merge: Interleaves elements from two streams as they become available.
- Select: Picks elements from whichever stream yields first.
A Working Example: Merging Temperature Readings
Suppose you have two sensors providing temperature readings, and you want to process these readings concurrently. We’ll use the merge combinator to achieve this.
First, add the necessary dependencies to your Cargo.toml:
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
Now, let’s implement the stream merging:


