21 releases (breaking)

0.16.1 Aug 6, 2020
0.15.0 Jul 14, 2020
0.14.0 Mar 28, 2020

#1963 in Asynchronous

48 downloads per month

MIT/Apache

28KB
439 lines

stream_multiplexer

Build Status Latest Version Rust Documentation

This crate provides natural backpressure to classes of streams

Streams are gathered into 'channels' that can be polled via recv(). Channels are indpendent of each other and have their own backpressure.


lib.rs:

This crate provides natural backpressure to classes of streams.

Streams are gathered into 'channels' that can be polled via recv(). Channels are indpendent of each other and have their own backpressure.

Example

With a TCP server you may have two different classes of connections: Authenticated and Unauthenticated. By grouping each class of connection into it's own channel, you can favor the Authenticated connections over the Unauthenticated. This would provide a better experience for those that have been able to authenticate.

Code Example

use futures_util::stream::StreamExt;
use tokio_util::compat::*;

smol::block_on(async move {
    const CHANNEL_ONE: usize = 1;
    const CHANNEL_TWO: usize = 2;

    // Initialize a multiplexer
    let mut multiplexer = stream_multiplexer::Multiplexer::new();

    // Set up the recognized channels
    multiplexer.add_channel(CHANNEL_ONE);
    multiplexer.add_channel(CHANNEL_TWO);

    // Bind to a random port on localhost
    let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
    let local_addr = listener.local_addr().unwrap();

    // Set up a task to add incoming connections into multiplexer
    let mut incoming_multiplexer = multiplexer.clone();
    smol::Task::spawn(async move {
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let stream = async_io::Async::new(stream).unwrap();
                    let codec = tokio_util::codec::LinesCodec::new();
                    let framed = tokio_util::codec::Framed::new(stream.compat(), codec);
                    let (sink, stream) = framed.split();
                    let _stream_id = incoming_multiplexer.add_stream_pair(sink, stream, CHANNEL_ONE);
                }
                Err(_) => unimplemented!()
            }
        }
    }).detach();

    // test clients to put into channels
    let mut client_1 = std::net::TcpStream::connect(local_addr).unwrap();
    let mut client_2 = std::net::TcpStream::connect(local_addr).unwrap();

    let mut multiplexer_ch_1 = multiplexer.clone();

    // Simple server that echos the data back to the stream and moves the stream to channel 2.
    smol::Task::spawn(async move {
        while let Ok((stream_id, message)) = multiplexer_ch_1.recv(CHANNEL_ONE).await {
            match message {
                Some(Ok(data)) => {
                    // echo the data back and move it to channel 2
                    multiplexer_ch_1.send(vec![stream_id], data);
                    multiplexer_ch_1.change_stream_channel(stream_id, CHANNEL_TWO);
                }
                Some(Err(err)) => {
                    // the stream had an error
                }
                None => {
                    // stream_id has been dropped
                }
            }
        }
    }).detach();
});

Dependencies

~2.1–3MB
~59K SLoC