#tokio #sink #stream #api #io #interract

tokio-simplified

A simplified API to interract with tokio sinks and streams

20 releases

0.2.2 Dec 10, 2019
0.2.1 Jun 5, 2019
0.2.0 May 22, 2019
0.1.15 Apr 25, 2019

#48 in #sink

Download history 167/week @ 2024-07-27 17/week @ 2024-09-21 3/week @ 2024-09-28

57 downloads per month

Apache-2.0

17KB
269 lines

A Simplified API to work with Tokio's SplitSink and SplitStream

Motivation

Although Tokio is extremely powerful, somme of its features have been less than intuitive to me. So I built this crate to simplify interracting with Tokio in the ways that I usually do:

  • Writing to an IO without really wanting to do much with what happens then
  • Subscribing one or several callbacks to an IO.

Usage

This API should only be used from inside a Tokio Runtime: it will try to spawn Tokio Tasks and will thus panic if it's not the case.

Standard Usage: Multiple Callbacks

fn tokio_main() {
    let (sink, stream) = LineCodec.framed(tcp_stream).split();
    let io = IoManagerBuilder::new(sink, stream).build();
    let writer = io.get_writer();
    io.subscribe(move |frame| {
        writer.write(frame);
    });
    io.subscribe(move |frame| {
        println!("{}", frame);
    })
}

Filtering

You can use filters to have your callbacks only be called when the frame matches some criterion.

fn tokio_main() {
    let (sink, stream) = LineCodec.framed(tcp_stream).split();
    let io = IoManagerBuilder::new(sink, stream).with_filter(|frame, writer| {
        if frame.to_ascii_lowercase().contains("hello there") {
            writer.write("General Kenobi!");
            return None;
        }
        Some(frame)
    }).build();
    let writer = io.get_writer();
    io.subscribe(move |frame| {
        writer.write(frame);
    });
    io.subscribe(move |frame| {
        println!("{}", frame);
    })
}

Single Callback Tip

Every time you use subscribe(callback), you endure the cost of one more futures::sync::mpsc::channel, and of one frame.clone() per callback call. It's not a high cost, but if you only have one callback, you can cut these costs by passing your callback as a filter that always returns None.

fn tokio_main() {
    let (sink, stream) = LineCodec.framed(tcp_stream).split();
    let io = IoManagerBuilder::new(sink, stream).with_filter(|frame, writer| {
        writer.write(frame);
        None
    });
}

Error Handling

By default, IoManager will ignore errors on the assigned Stream. If you wish to handle errors, you can specify a function for error handling by passing it to the builder using with_error_handler().

fn tokio_main() {
    let (sink, stream) = LineCodec.framed(tcp_stream).split();
    let io = IoManagerBuilder::new(sink, stream)
        .with_filter(|frame, writer| {
            if frame.to_ascii_lowercase().contains("hello there") {
                writer.write("General Kenobi!");
                return None;
            }
            Some(frame)
        })
        .with_error_handler(move |error| {
            println!("{}", error);
        })
        .build();
    let writer = io.get_writer();
    io.subscribe(move |frame| {
        writer.write(frame);
    });
    io.subscribe(move |frame| {
        println!("{}", frame);
    })
}

Dependencies

~7.5MB
~54K SLoC