3 unstable releases
0.6.1 | Nov 21, 2022 |
---|---|
0.6.0 | Dec 9, 2019 |
0.5.0 | Dec 6, 2019 |
#348 in Asynchronous
11,529 downloads per month
Used in 2 crates
14KB
206 lines
futures-batch
An adaptor that chunks up completed futures in a stream and flushes them after a timeout or when the buffer is full.
It is based on the Chunks
adaptor of futures-util, to which we added a timeout.
(The project was initially called tokio-batch
, but was renamed as it has no dependency on Tokio anymore.)
Usage
Either as a standalone stream operator or directly as a combinator:
use std::time::Duration;
use futures::{stream, StreamExt};
use futures_batch::ChunksTimeoutStreamExt;
#[tokio::main]
async fn main() {
let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter();
let results = stream::iter(iter)
.chunks_timeout(5, Duration::new(10, 0))
.collect::<Vec<_>>();
assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], results.await);
}
The above code iterates over a stream and creates chunks of size 5 with a timeout of 10 seconds.
Note: This is using the futures 0.3
crate.
Performance
futures-batch
imposes very low overhead on your application. For example, it is even used to batch syscalls.
Under the hood, we are using futures-timer
, which allows for a microsecond timer resolution.
If you find a use-case which is not covered, don't be reluctant to open an issue.
Credits
Thanks to arielb1, alexcrichton, doyoubi, leshow, spebern, and wngr for their contributions!
Dependencies
~1MB
~16K SLoC