10 releases (6 breaking)
new 0.6.0 | Jan 12, 2025 |
---|---|
0.5.0 | May 11, 2022 |
0.4.1 | May 11, 2022 |
0.4.0 | Jan 6, 2022 |
0.2.0 | Jun 24, 2020 |
#95 in Concurrency
172 downloads per month
Used in 2 crates
41KB
530 lines
async_nursery
Primitive for structured concurrency.
The nursery allows writing concurrent programs adhering to structured concurrency. If you are new to the concept, there are some excellent resources on the dedicated structured concurrency forum. The name of the library is inspired by the excellent python Trio library.
Table of Contents
Description
async_nursery brings a structured concurrency primitive to Rust. There are three main goals in structured concurrency:
1. A sane control flow for concurrent programs.
Notes on structured concurrency, or: Go statement considered harmful by Nathaniel J. Smith explains this exquisitely. To summarize, if a function wants to split off and do some work concurrently, make sure that all its child tasks are finished when the function returns. That way it functions as the black box we are used to from synchronous code. A function has inputs and a return value, and when it is done, no code it created is running anymore.
You could already do this by stuffing JoinHandle
s from async_executors in a FuturesUnordered
, but as we will see below, async_nursery is a bit more flexible and convenient. As opposed to the JoinHandle
s from tokio or async-std directly, the ones from async_executors do not detach on drop by default.
2. Prevent resource leakage.
Orphaned tasks, spawned without a JoinHandle
can potentially stay alive forever, either running in loops, or deadlocking. Structured concurrency makes sure there are no leaks, putting all resources neatly in a call tree, very similar to a call stack. In a call tree, a stack frame can be several stack frames sitting side by side doing things concurrently, but when we return to the previous stack frame, all of them are done.
3. Propagate errors
In Rust it is common to propagate errors up the call stack. If you spawn a task and let it run off into the void, you need out-of-band error handling like channels. In structured concurrency, since all tasks get joined before their parent returns, you can return the errors just like in sync code. It is also possible to cancel all sibling tasks if one task runs into an error.
Properties of async_nursery:
Nursery
acts as spawner.NurseryStream
implementsStream<Out>
of the results of all the futures it nurses.NurseryStream
implementsFuture<Output=()>
if you just want to wait for everything to finish, but don't care for returned values.NurseryStream
basically managesJoinHandle
s for you.- Can be backed by any executor that implements
SpawnHandle
orLocalSpawnHandle
. - Cancels all running futures on dropping
NurseryStream
. Nursery
implements Sink forFutureObj
and/orLocalFutureObj
as well asNurse
andNurseExt
.Nursery
forwards async_executor traits from the wrapped executor. This works forTimer
,TokioIo
,YieldNow
andSpawnBlocking
. Note that when usingSpawnBlocking
like this, the nursery does not manage the tasks, it just let's you use the wrapped executor.
Missing features
-
No API provided for cooperative cancellation. Since there is no support for that in
std::task::Context
, you must basically pass some cancellation token into a task that needs to do cleanup and doesn't support being dropped at all await points. Since it requires specific support of the spawned task, I leave this to the user. An example using anAtomicBool
is included in the examples directory. The advantage is flexibility. You could cancel just certain tasks in the nursery and leave others running, or let the others be canceled by drop if they support it, etc. Async drop will most likely alleviate this pain one day, but it's not there yet. -
No API is provided for running non-
'static
futures. This is not possible in safe rust becausestd::mem::forget
could be used to leak the nursery and trick it to outlive it's parent stack frame, at which point it would hold an invalid reference. If you really want to go there, I suggest you look at the async-scoped crate which allows it by requiring you to use unsafe.
Install
With cargo add:
cargo add async_nursery
With cargo yaml:
dependencies:
async_nursery: ^0.6
With Cargo.toml
[dependencies]
async_nursery = "0.6"
Upgrade
Please check out the changelog when upgrading.
Dependencies
This crate has few dependencies (futures and async_executors). Cargo will automatically handle it's dependencies for you. You will have to choose executors from the async_executors crate and set the correct feature on that crate to enable it.
There are no optional features.
Security
The crate uses forbid(unsafe)
, but depends on futures
which has quite some unsafe. There are no security issues I'm aware of specific to using this crate.
Performance
Currently the implementation is simple. Nursery
just sends the JoinHandle
to NurseryStream
over an unbounded channel. This is convenient, because it means NurseExt::nurse
doesn't have to be async, but it has some overhead compared to using the underlying executor directly. In the future I hope to optimize the implementation.
Usage
Warning: If ever you wait on the stream to finish, remember it will only finish if there are no Nursery
's alive anymore. You must drop the Nursery before awaiting the NurseryStream
. If your program deadlocks, this should be the first place to look.
All tasks spawned on a nursery must have the same Future::Output
type.
Basic example
There is an extensive list of examples for all kinds of patterns of using async_nursery in the examples directory. Please have a look at them.
use
{
async_nursery :: { Nursery, NurseExt } ,
async_executors :: { AsyncStd } ,
};
pub type DynResult<T> = Result<T, Box< dyn std::error::Error + Send + Sync + 'static >>;
async fn self_contained() -> DynResult<()>
{
let (nursery, output) = Nursery::new( AsyncStd );
for _ in 0..5
{
nursery.nurse( async { /* do something useful */ } )?;
}
// This is necessary. Since we could keep spawning tasks even after starting to poll
// the output, it can't know that we are done, unless we drop all senders or call
// `close_nursery`. If we don't, the await below deadlocks.
//
drop(nursery);
// Resolves when all spawned tasks are done.
//
output.await;
Ok(())
}
Returning errors
The functionality of TryStreamExt::try_next
can be used to bail early if all concurrent tasks need to complete successfully. You can now drop the NurseryStream
and cancel all running sibling tasks.
Recover other return types
It's possible to return useful data sometimes from spawned tasks. You can effectively see them as function calls or closures that can run concurrently. The nursery let's you recover these as you go. It could be used to implement a progress bar for example.
Another possibility is using collect
to gain a collection of all returned values when everything is done.
Panics
Nursery has no special handling of panics. If your task panics, it depends on the executor what happens. Currently tokio is different from other executors in that it will catch_unwind
your spawned tasks. Other executors propagate the panic to the thread that awaits the JoinHandle
s (eg. that awaits the NurseryStream
). If you want a resilient application that works on all executors, use the catch_unwind
combinator from the futures library. Again using TryStreamExt::try_next
you can bail early if one task panics.
Differences with FuturesUnordered
Nursery
and NurseryStream
wrap a FuturesUnordered internally. The main feature this gives us is that it allows to us to start polling the stream of outputs and still continue to spawn more subtasks. FuturesUnordered
has a very strict two phase API. First spawn, then get output. This allows us to use NuseryStream
as a long-lived container. Eg. if you are going to spawn network requests, you can continuously listen to NurseryStream
for errors that happened during processing while continuing to spawn further requests. Then when the connection closes, we want to stop processing outstanding requests for this connection. By dropping the NurseryStream
, we can do that.
Further a few conveniences are added:
Nursery
does the spawning for you, never need to handleJoinHandle
s manually.NurseryStream
not only implementsStream
, but alsoFuture
, if you just want to wait for everything to finish and don't care for the Outputs.Nursery
can be cloned and send around, into function calls and spawned subtasks. You don't have to send back theJoinHandle
s through a channel manually to push them into theFuturesUnordered
.
API
API documentation can be found on docs.rs.
Contributing
Please check out the contribution guidelines.
Testing
cargo test
and rustup run nightly wasm-pack test --firefox --headless -- --features "implementation" --no-default-features
.
Code of conduct
Any of the behaviors described in point 4 "Unacceptable Behavior" of the Citizens Code of Conduct are not welcome here and might get you banned. If anyone, including maintainers and moderators of the project, fail to respect these/your limits, you are entitled to call them out.
License
Dependencies
~80–450KB