Working with Any Number of Futures
When we switched from using two futures to three in the previous section, we
also had to switch from using join
to using join3
. It would be annoying to
have to call a different function every time we changed the number of futures we
wanted to join. Happily, we have a macro form of join
to which we can pass an
arbitrary number of arguments. It also handles awaiting the futures itself.
Thus, we could rewrite the code from Listing 17-13 to use join!
instead of
join3
, as in Listing 17-14.
join!
to wait for multiple futuresThis is definitely an improvement over swapping between join
and
join3
and join4
and so on! However, even this macro form only works
when we know the number of futures ahead of time. In real-world Rust,
though, pushing futures into a collection and then waiting on some or
all the futures of them to complete is a common pattern.
To check all the futures in some collection, we’ll need to iterate over and
join on all of them. The trpl::join_all
function accepts any type that
implements the Iterator
trait, which you learned about back in The Iterator
Trait and the next
Method Chapter 13, so
it seems like just the ticket. Let’s try putting our futures in a vector and
replacing join!
with join_all
as show in Listing 17-15.
join_all
Unfortunately, this code doesn’t compile. Instead, we get this error:
error[E0308]: mismatched types
--> src/main.rs:45:37
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a
different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
This might be surprising. After all, none of the async blocks returns anything,
so each one produces a Future<Output = ()>
. Remember that Future
is a trait,
though, and that the compiler creates a unique enum for each async block. You
can’t put two different hand-written structs in a Vec
, and the same rule
applies to the different enums generated by the compiler.
To make this work, we need to use trait objects, just as we did in “Returning
Errors from the run function” in Chapter 12. (We’ll cover
trait objects in detail in Chapter 18.) Using trait objects lets us treat each
of the anonymous futures produced by these types as the same type, because all
of them implement the Future
trait.
Note: In the Chapter 8 section Using an Enum to Store Multiple
Values, we discussed another way to include multiple
types in a Vec
: using an enum to represent each type that can appear in the
vector. We can’t do that here, though. For one thing, we have no way to name
the different types, because they are anonymous. For another, the reason we
reached for a vector and join_all
in the first place was to be able to work
with a dynamic collection of futures where we only care that they have the
same output type.
We start by wrapping each future in the vec!
in a Box::new
, as shown in
Listing 17-16.
Box::new
to align the types of the futures in a Vec
Unfortunately, this code still doesn’t compile. In fact, we get the same basic
error we got before for both the second and third Box::new
calls, as well as
new errors referring to the Unpin
trait. We’ll come back to the Unpin
errors
in a moment. First, let’s fix the type errors on the Box::new
calls by
explicitly annotating the type of the futures
variable (see Listing 17-17).
This type declaration is a little involved, so let’s walk through it:
- The innermost type is the future itself. We note explicitly that the output
of the future is the unit type
()
by writingFuture<Output = ()>
. - Then we annotate the trait with
dyn
to mark it as dynamic. - The entire trait reference is wrapped in a
Box
. - Finally, we state explicitly that
futures
is aVec
containing these items.
That already made a big difference. Now when we run the compiler, we get only
the errors mentioning Unpin
. Although there are three of them, their contents
are very similar.
error[E0308]: mismatched types
--> src/main.rs:46:46
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0308]: mismatched types
--> src/main.rs:46:64
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
30 | let tx_fut = async move {
| ---------- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:30:22: 30:32}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:24
|
48 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `join_all`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:9
|
48 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
That is a lot to digest, so let’s pull it apart. The first part of the message
tell us that the first async block (src/main.rs:8:23: 20:10
) does not
implement the Unpin
trait and suggests using pin!
or Box::pin
to resolve
it. Later in the chapter, we’ll dig into a few more details about Pin
and
Unpin
. For the moment, though, we can just follow the compiler’s advice to get
unstuck. In Listing 17-18, we start by updating the type annotation for
futures
, with a Pin
wrapping each Box
. Second, we use Box::pin
to pin
the futures themselves.
Pin
and Box::pin
to make the Vec
type checkIf we compile and run this, we finally get the output we hoped for:
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
Phew!
There’s a bit more to explore here. For one thing, using Pin<Box<T>>
adds a
small amount of overhead from putting these futures on the heap with Box
—and
we’re only doing that to get the types to line up. We don’t actually need the
heap allocation, after all: these futures are local to this particular function.
As noted before, Pin
is itself a wrapper type, so we can get the benefit of
having a single type in the Vec
—the original reason we reached for
Box
—without doing a heap allocation. We can use Pin
directly with each
future, using the std::pin::pin
macro.
However, we must still be explicit about the type of the pinned reference;
otherwise, Rust will still not know to interpret these as dynamic trait objects,
which is what we need them to be in the Vec
. We therefore pin!
each future
when we define it, and define futures
as a Vec
containing pinned mutable
references to the dynamic future type, as in Listing 17-19.
Pin
directly with the pin!
macro to avoid unnecessary heap allocationsWe got this far by ignoring the fact that we might have different Output
types. For example, in Listing 17-20, the anonymous future for a
implements
Future<Output = u32>
, the anonymous future for b
implements Future<Output = &str>
, and the anonymous future for c
implements Future<Output = bool>
.
We can use trpl::join!
to await them, because it allows us to pass in multiple
future types and produces a tuple of those types. We cannot use
trpl::join_all
, because it requires all of the futures passed in to have the
same type. Remember, that error is what got us started on this adventure with
Pin
!
This is a fundamental tradeoff: we can either deal with a dynamic number of
futures with join_all
, as long as they all have the same type, or we can deal
with a set number of futures with the join
functions or the join!
macro,
even if they have different types. This is the same scenario we’d face when
working with any other types in Rust. Futures are not special, even though we
have some nice syntax for working with them, and that’s a good thing.
Racing Futures
When we “join” futures with the join
family of functions and macros, we
require all of them to finish before we move on. Sometimes, though, we only
need some future from a set to finish before we move on—kind of similar to
racing one future against another.
In Listing 17-21, we once again use trpl::race
to run two futures, slow
and
fast
, against each other.
race
to get the result of whichever future finishes firstEach future prints a message when it starts running, pauses for some amount of
time by calling and awaiting sleep
, and then prints another message when it
finishes. Then we pass both slow
and fast
to trpl::race
and wait for one
of them to finish. (The outcome here isn’t too surprising: fast
wins.) Unlike
when we used race
back in “Our First Async Program”, we just ignore the Either
instance it returns here, because all of
the interesting behavior happens in the body of the async blocks.
Notice that if you flip the order of the arguments to race
, the order of the
“started” messages changes, even though the fast
future always completes
first. That’s because the implementation of this particular race
function is
not fair. It always runs the futures passed in as arguments in the order in
which they’re passed. Other implementations are fair and will randomly choose
which future to poll first. Regardless of whether the implementation of race
we’re using is fair, though, one of the futures will run up to the first
await
in its body before another task can start.
Recall from Our First Async Program that at each await point, Rust gives a runtime a chance to pause the task and switch to another one if the future being awaited isn’t ready. The inverse is also true: Rust only pauses async blocks and hands control back to a runtime at an await point. Everything between await points is synchronous.
That means if you do a bunch of work in an async block without an await point, that future will block any other futures from making progress. You may sometimes hear this referred to as one future starving other futures. In some cases, that may not be a big deal. However, if you are doing some kind of expensive setup or long-running work, or if you have a future that will keep doing some particular task indefinitely, you’ll need to think about when and where to hand control back to the runtime.
By the same token, if you have long-running blocking operations, async can be a useful tool for providing ways for different parts of the program to relate to each other.
But how would you hand control back to the runtime in those cases?
Yielding Control to the Runtime
Let’s simulate a long-running operation. Listing 17-22 introduces a slow
function.
thread::sleep
to simulate slow operationsThis code uses std::thread::sleep
instead of trpl::sleep
so that calling
slow
will block the current thread for some number of milliseconds. We can use
slow
to stand in for real-world operations that are both long-running and
blocking.
In Listing 17-23, we use slow
to emulate doing this kind of CPU-bound work in
a pair of futures.
thread::sleep
to simulate slow operationsTo begin, each future only hands control back to the runtime after carrying out a bunch of slow operations. If you run this code, you will see this output:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
As with our earlier example, race
still finishes as soon as a
is done.
There’s no interleaving between the two futures, though. The a
future does all
of its work until the trpl::sleep
call is awaited, then the b
future does
all of its work until its own trpl::sleep
call is awaited, and finally the a
future completes. To allow both futures to make progress between their slow
tasks, we need await points so we can hand control back to the runtime. That
means we need something we can await!
We can already see this kind of handoff happening in Listing 17-23: if we
removed the trpl::sleep
at the end of the a
future, it would complete
without the b
future running at all. Let’s try using the sleep
function as
a starting point for letting operations switch off making progress, as shown in
Listing 17-24.
sleep
to let operations switch off making progressIn Listing 17-24, we add trpl::sleep
calls with await points between each call
to slow
. Now the two futures’ work is interleaved:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
The a
future still runs for a bit before handing off control to b
, because
it calls slow
before ever calling trpl::sleep
, but after that the futures
swap back and forth each time one of them hits an await point. In this case, we
have done that after every call to slow
, but we could break up the work in
whatever way makes the most sense to us.
We don’t really want to sleep here, though: we want to make progress as fast
as we can. We just need to hand back control to the runtime. We can do that
directly, using the yield_now
function. In Listing 17-25, we replace all those
sleep
calls with yield_now
.
yield_now
to let operations switch off making progressThis code is both clearer about the actual intent and can be significantly
faster than using sleep
, because timers such as the one used by sleep
often
have limits on how granular they can be. The version of sleep
we are using,
for example, will always sleep for at least a millisecond, even if we pass it a
Duration
of one nanosecond. Again, modern computers are fast: they can do a
lot in one millisecond!
You can see this for yourself by setting up a little benchmark, such as the one in Listing 17-26. (This isn’t an especially rigorous way to do performance testing, but it suffices to show the difference here.)
sleep
and yield_now
Here, we skip all the status printing, pass a one-nanosecond Duration
to
trpl::sleep
, and let each future run by itself, with no switching between the
futures. Then we run for 1,000 iterations and see how long the future using
trpl::sleep
takes compared to the future using trpl::yield_now
.
The version with yield_now
is way faster!
This means that async can be useful even for compute-bound tasks, depending on what else your program is doing, because it provides a useful tool for structuring the relationships between different parts of the program. This is a form of cooperative multitasking, where each future has the power to determine when it hands over control via await points. Each future therefore also has the responsibility to avoid blocking for too long. In some Rust-based embedded operating systems, this is the only kind of multitasking!
In real-world code, you won’t usually be alternating function calls with await points on every single line, of course. While yielding control in this way is relatively inexpensive, it’s not free. In many cases, trying to break up a compute-bound task might make it significantly slower, so sometimes it’s better for overall performance to let an operation block briefly. Always measure to see what your code’s actual performance bottlenecks are. The underlying dynamic is important to keep in mind, though, if you are seeing a lot of work happening in serial that you expected to happen concurrently!
Building Our Own Async Abstractions
We can also compose futures together to create new patterns. For example, we can
build a timeout
function with async building blocks we already have. When
we’re done, the result will be another building block we could use to create
still more async abstractions.
Listing 17-27 shows how we would expect this timeout
to work with a slow
future.
timeout
to run a slow operation with a time limitLet’s implement this! To begin, let’s think about the API for timeout
:
- It needs to be an async function itself so we can await it.
- Its first parameter should be a future to run. We can make it generic to allow it to work with any future.
- Its second parameter will be the maximum time to wait. If we use a
Duration
, that will make it easy to pass along totrpl::sleep
. - It should return a
Result
. If the future completes successfully, theResult
will beOk
with the value produced by the future. If the timeout elapses first, theResult
will beErr
with the duration that the timeout waited for.
Listing 17-28 shows this declaration.
timeout
That satisfies our goals for the types. Now let’s think about the behavior we
need: we want to race the future passed in against the duration. We can use
trpl::sleep
to make a timer future from the duration, and use trpl::race
to
run that timer with the future the caller passes in.
We also know that race
is not fair, polling arguments in the order in which
they are passed. Thus, we pass future_to_try
to race
first so it gets a
chance to complete even if max_time
is a very short duration. If
future_to_try
finishes first, race
will return Left
with the output from
future_to_try
. If timer
finishes first, race
will return Right
with the
timer’s output of ()
.
In Listing 17-29, we match on the result of awaiting trpl::race
.
timeout
with race
and sleep
If the future_to_try
succeeds and we get a Left(output)
, we return
Ok(output)
. If the sleep timer elapses instead and we get a Right(())
, we
ignore the ()
with _
and return Err(max_time)
instead.
With that, we have a working timeout
built out of two other async helpers. If
we run our code, it will print the failure mode after the timeout:
Failed after 2 seconds
Because futures compose with other futures, you can build really powerful tools using smaller async building blocks. For example, you can use this same approach to combine timeouts with retries, and in turn use those with operations such as network calls (one of the examples from the beginning of the chapter).
In practice, you’ll usually work directly with async
and await
, and
secondarily with functions and macros such as join
, join_all
, race
, and so
on. You’ll only need to reach for pin
now and again to use futures with those
APIs.
We’ve now seen a number of ways to work with multiple futures at the same time. Up next, we’ll look at how we can work with multiple futures in a sequence over time with streams. Here are a couple more things you might want to consider first, though:
-
We used a
Vec
withjoin_all
to wait for all of the futures in some group to finish. How could you use aVec
to process a group of futures in sequence instead? What are the tradeoffs of doing that? -
Take a look at the
futures::stream::FuturesUnordered
type from thefutures
crate. How would using it be different from using aVec
? (Don’t worry about the fact that it’s from thestream
part of the crate; it works just fine with any collection of futures.)