1 unstable release
0.1.0 | Apr 11, 2020 |
---|
#13 in #bounded-channel
17KB
200 lines
async-local-bounded-channel
A same-producer, same-consumer bounded channel, for a single async task. See the docs for an explanation and examples.
lib.rs
:
A same-producer, same-consumer channel, bounded to a single async task.
Implementation details
Internally, this uses the generic-array
crate, which utilizes types from
typenum
to specify the capacity at compile time, allowing the space for
the queue to be allocated inline. Thus, this channel also requires
specifying the capacity upfront at compile time.
Examples
Used together with futures::future::select
, this can implement something
like a coroutine, where two asynchronous generators cooperate producing
and consuming values.
futures::executor::block_on(async move {
// create a new channel with a capacity of 8 items
let mut channel = channel::<_, U8>();
let (mut tx, mut rx) = channel.split();
let producer = async move {
for i in 0..100 {
tx.send(i).await.expect("consumer still alive");
}
};
let consumer = async move {
let mut expected = 0;
loop {
if let Ok(v) = rx.receive().await {
assert_eq!(v, expected);
expected += 1;
} else {
break;
}
}
};
pin_mut!(producer, consumer);
let remaining = select(producer, consumer).await.factor_first().1;
match remaining {
Either::Left(f) => f.await,
Either::Right(f) => f.await,
}
});
This can be useful, for example, when implementing a server. One task can handle each client, where the producer waits for incoming requests and writes responses; and the consumer waits for requests, handles them, and then generates a response.
Usage notes
Once the transmission endpoints have been acquired via split()
, the
channel cannot be moved. This is required for safety, since each endpoint
contains a reference back to the channel; thus, if the channel were to move,
those references would become dangling.
let mut channel = channel::<isize, U8>();
let (tx, rx) = channel.split();
std::thread::spawn(move || {
// nope!
let channel = channel;
let tx = tx;
let rx = rx;
});
Further, endpoints must remain anchored to a single thread, since access to the underlying data structures is not thread-safe. Unfortunately, this isn't enforced by the compiler, and scoped thread libraries can allow unsafe usage. For example:
// shouldn't compile, but unfortunately does.
let mut channel = channel::<isize, U8>();
crossbeam::thread::scope(|s| {
let (tx, rx) = channel.split();
// don't do this!
s.spawn(move |_| {
let tx = tx;
});
s.spawn(move |_| {
let rx = rx;
});
});
If there are no open endpoints, though, a channel can be safely moved and sent. A channel can even be re-used after the endpoints are dropped.
type C = async_local_bounded_channel::Channel<isize, U8>;
async fn test_channel(mut channel: C) -> C {
// run the producer-consumer example above.
# {
# let (mut tx, mut rx) = channel.split();
# let producer = async move {
# for i in 0..100 {
# tx.send(i).await.expect("consumer still alive");
# }
# };
# let consumer = async move {
# let mut expected = 0;
# loop {
# if let Ok(v) = rx.receive().await {
# assert_eq!(v, expected);
# expected += 1;
# } else {
# break;
# }
# }
# };
# pin_mut!(producer, consumer);
# let remaining = select(producer, consumer).await.factor_first().1;
# match remaining {
# Either::Left(f) => f.await,
# Either::Right(f) => f.await,
# }
# }
channel
}
let channel = channel();
let t = std::thread::spawn(move || {
let channel = block_on(async move {
test_channel(channel).await
});
block_on(async move {
test_channel(channel).await
});
});
t.join().expect("test to pass");
Dependencies
~1MB
~22K SLoC