21 releases (breaking)
0.16.1 | Aug 6, 2020 |
---|---|
0.15.0 | Jul 14, 2020 |
0.14.0 | Mar 28, 2020 |
#1963 in Asynchronous
48 downloads per month
28KB
439 lines
stream_multiplexer
This crate provides natural backpressure to classes of streams
Streams are gathered into 'channels' that can be polled via recv()
. Channels are indpendent
of each other and have their own backpressure.
lib.rs
:
This crate provides natural backpressure to classes of streams.
Streams are gathered into 'channels' that can be polled via recv()
. Channels are indpendent
of each other and have their own backpressure.
Example
With a TCP server you may have two different classes of connections: Authenticated and Unauthenticated. By grouping each class of connection into it's own channel, you can favor the Authenticated connections over the Unauthenticated. This would provide a better experience for those that have been able to authenticate.
Code Example
use futures_util::stream::StreamExt;
use tokio_util::compat::*;
smol::block_on(async move {
const CHANNEL_ONE: usize = 1;
const CHANNEL_TWO: usize = 2;
// Initialize a multiplexer
let mut multiplexer = stream_multiplexer::Multiplexer::new();
// Set up the recognized channels
multiplexer.add_channel(CHANNEL_ONE);
multiplexer.add_channel(CHANNEL_TWO);
// Bind to a random port on localhost
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = listener.local_addr().unwrap();
// Set up a task to add incoming connections into multiplexer
let mut incoming_multiplexer = multiplexer.clone();
smol::Task::spawn(async move {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let stream = async_io::Async::new(stream).unwrap();
let codec = tokio_util::codec::LinesCodec::new();
let framed = tokio_util::codec::Framed::new(stream.compat(), codec);
let (sink, stream) = framed.split();
let _stream_id = incoming_multiplexer.add_stream_pair(sink, stream, CHANNEL_ONE);
}
Err(_) => unimplemented!()
}
}
}).detach();
// test clients to put into channels
let mut client_1 = std::net::TcpStream::connect(local_addr).unwrap();
let mut client_2 = std::net::TcpStream::connect(local_addr).unwrap();
let mut multiplexer_ch_1 = multiplexer.clone();
// Simple server that echos the data back to the stream and moves the stream to channel 2.
smol::Task::spawn(async move {
while let Ok((stream_id, message)) = multiplexer_ch_1.recv(CHANNEL_ONE).await {
match message {
Some(Ok(data)) => {
// echo the data back and move it to channel 2
multiplexer_ch_1.send(vec![stream_id], data);
multiplexer_ch_1.change_stream_channel(stream_id, CHANNEL_TWO);
}
Some(Err(err)) => {
// the stream had an error
}
None => {
// stream_id has been dropped
}
}
}
}).detach();
});
Dependencies
~2.1–3MB
~59K SLoC