#future #async-stream #non-blocking #async #output-stream

stream-map-any

Allows merging async Streams of different output type

4 releases

0.2.2 May 2, 2020
0.2.1 May 1, 2020
0.2.0 May 1, 2020
0.1.0 May 1, 2020

#2064 in Asynchronous

MIT license

12KB
141 lines

stream-map-any

Allows merging async Streams of different output type.

It's very similar to Tokio's StreamMap, except that it doesn't require the streams to have the same output type. This can be useful when you don't know what type of streams should be combined, acting as a runtime dynamic select.

Not a zero-cost-abstraction

Since we don't know what types of outputs the streams will generate, the generated output will be a StreamMapAnyVariant, a newtype around Box<dyn Any>. As a result, we rely on dynamic dispatching to transform it back into the desired output. Benching shows that it's 2x as slow as a StreamMap or Tokio's select macro.

Example

To get started, add the following to Cargo.toml.

stream-map-any = "0.2"

Merging of 2 streams:

use futures::channel::mpsc::channel;
use futures::executor::block_on;
use futures::stream::{self, StreamExt};
use stream_map_any::StreamMapAny;

fn main() {
    let int_stream = stream::iter(vec![1; 10]);
    let (mut tx, rx) = channel::<String>(100);

    let mut merge = StreamMapAny::new();
    merge.insert(0, int_stream);
    merge.insert(1, rx);

    std::thread::spawn(move || {
        tx.try_send("hello world".into()).unwrap();
    });

    block_on(async move {
        loop {
            match merge.next().await {
                Some((0, val)) => {
                    let _val: i32 = val.value().unwrap();
                }
                Some((1, val)) => {
                    let _val: String = val.value().unwrap();
                }
                Some(_) => panic!("unexpected key"),
                None => break,
            }
        }
    });
}

Further info in the API Docs.

License

Licensed under MIT license.

Dependencies

~1MB
~15K SLoC