#signal #async #completion-token

sync-tokens

sync-tokens provides ways to coordinate with running tasks. It provides a way to cleanly cancel a running task, and a way for a running task to communicate when it's ready

1 unstable release

0.1.0 Dec 3, 2020

#195 in #signal

MIT/Apache

22KB
354 lines

sync-tokens

Rust structs that assist with synchronization between tasks: Allows canceling long-running tasks, and allows communicating that something is ready

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.


lib.rs:

sync-tokens provides ways to coordinate with running tasks. It provides a way to cleanly cancel a running task, and a way for a running task to communicate when it's ready

# Example, use the version numbers you need
sync-tokens = "0.1.0"
async-std = { version = "1.7.0", features = ["attributes"] }

Examples

Accepts incoming sockets on a background task. Communicates when the listener is actively listening, and allows canceling the loop for incoming sockets

See on github

use std::io::{ Error, ErrorKind };

use async_std::io::Result;
use async_std::net::{IpAddr, Ipv4Addr, TcpListener, TcpStream, SocketAddr};
use async_std::task;
use async_std::task::JoinHandle;

use sync_tokens::cancelation_token::{ Cancelable, CancelationToken };
use sync_tokens::completion_token::{ Completable, CompletionToken };

// Starts running a server on a background task
pub fn run_server() -> (JoinHandle<Result<()>>, CompletionToken<Result<SocketAddr>>, CancelationToken) {
    // This CompletionToken allows the caller to wait until the server is actually listening
    // The caller gets completion_token, which it can await on
    // completable is used to signal to completion_token
    let (completion_token, completable) = CompletionToken::new();

    // This CancelationToken allows the caller to stop the server
    // The caller gets cancelation_token
    // cancelable is used to allow canceling a call to await
    let (cancelation_token, cancelable) = CancelationToken::new();

    // The server is started on a background task, and the future returned
    let server_future = task::spawn(run_server_int(completable, cancelable));

    (server_future, completion_token, cancelation_token)
}

async fn run_server_int(completable: Completable<Result<SocketAddr>>, cancelable: Cancelable) -> Result<()> {

    let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
    let listener = TcpListener::bind(socket_addr).await?;

    // Inform that the server is listening
    let local_addr = listener.local_addr();
    completable.complete(local_addr);

    // Create a future that waits for an incoming socket
    let mut incoming_future = task::spawn(accept(listener));
    
    loop {
        // Wait for either the incoming socket (via incoming_future) or for the CancelationToken
        // to be canceled.
        // When the CancelationToken is canceled, the error is returned
        let (listener, _) = cancelable.allow_cancel(
            incoming_future, 
            Err(Error::new(ErrorKind::Interrupted, "Server terminated")))
            .await?;

        incoming_future = task::spawn(accept(listener));
    }
}

async fn accept(listener: TcpListener) -> Result<(TcpListener, TcpStream)> {
    let (stream, _) = listener.accept().await?;
    Ok((listener, stream))
}

#[async_std::main]
async fn main() {
    let (server_future, completion_token, cancelation_token) = run_server();

    println!("Server is starting");

    // Wait for the server to start
    let local_addr = completion_token.await.unwrap();

    println!("Server is listening at {}", local_addr);
    println!("Push Return to stop the server");

    let _ = std::io::stdin().read_line(&mut String::new()).unwrap();

    // Stop the server
    cancelation_token.cancel();

    // Wait for the server to shut down
    let err = server_future.await.unwrap_err();

    println!("Server ended: {}", err);
}

Dependencies

~1MB
~15K SLoC