8 releases
0.1.7 | Apr 9, 2024 |
---|---|
0.1.6 | Apr 1, 2024 |
0.1.3 | Mar 18, 2024 |
#298 in Concurrency
76KB
2K
SLoC
mpmc-async
A multi-producer, multi-consumer async channel with reservations for Rust.
For more information see:
LICENSE
MIT.
lib.rs
:
A multi-producer, multi-consumer async channel with reservations.
Example usage:
tokio_test::block_on(async {
let (tx1, rx1) = mpmc_async::channel(2);
let task = tokio::spawn(async move {
let rx2 = rx1.clone();
assert_eq!(rx1.recv().await.unwrap(), 2);
assert_eq!(rx2.recv().await.unwrap(), 1);
});
let tx2 = tx1.clone();
let permit = tx1.reserve().await.unwrap();
tx2.send(1).await.unwrap();
permit.send(2);
task.await.unwrap();
});
A more complex example with multiple sender and receiver tasks:
use std::collections::BTreeSet;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
tokio_test::block_on(async {
let (tx, rx) = mpmc_async::channel(1);
let num_workers = 10;
let count = 10;
let mut tasks = Vec::with_capacity(num_workers);
for i in 0..num_workers {
let mut tx = tx.clone();
let task = tokio::spawn(async move {
for j in 0..count {
let val = i * count + j;
tx.reserve().await.expect("no error").send(val);
}
});
tasks.push(task);
}
let total = count * num_workers;
let values = Arc::new(Mutex::new(BTreeSet::new()));
for _ in 0..num_workers {
let values = values.clone();
let rx = rx.clone();
let task = tokio::spawn(async move {
for _ in 0..count {
let val = rx.recv().await.expect("Failed to recv");
values.lock().unwrap().insert(val);
}
});
tasks.push(task);
}
for task in tasks {
task.await.expect("failed to join task");
}
let exp = (0..total).collect::<Vec<_>>();
let got = std::mem::take(values.lock().unwrap().deref_mut())
.into_iter()
.collect::<Vec<_>>();
assert_eq!(exp, got);
});