#channel #data-channel #requests #chan

reqchan

This is a channel for requesting and receiving data. Each channel has only one requesting end, but it can have multiple responding ends. It is useful for implementing work sharing. The two ends of the channel are asynchronous with respect to each other, so it is kinda nonblocking. However, if multiple responding ends try to respond to the same request, only one will succeed; the rest will return errors.

7 releases

Uses old Rust 2015

0.5.8 Oct 10, 2017
0.5.7 Sep 29, 2017

#993 in Asynchronous

MIT/Apache

43KB
448 lines

reqchan-rs defines a channel for requesting and receiving data.

Documentation

Linux Status Build status

Introduction

Each channel has only one requesting end, but it can have multiple responding ends. It is useful for implementing work sharing.

The two ends of the channel are asynchronous with respect to each other, so it is kinda nonblocking. However, if multiple responding ends try to respond to the same request, only one will succeed; the rest will return errors.

Design

Overview

reqchan is built around the two halves of the channel: Requester and Responder. Both implement methods, Requester::try_request() and Responder::try_respond(), that, when succesful, lock their corresponding side of the channel and return contracts. RequestContract requires the user to either successfully receive a datum or cancel the request. ResponseContract requires the user to send a datum. These requirements prevent the system from losing data sent through the channel.

Locking

Responder::try_response() locks the responding side to prevent other potential responders from responding to the same request. However, Requester::try_request() locks the requesting side of the channel to prevent the user from trying to issue multiple outstanding requests. Both locks are dropped when their corresponding contract object is dropped.

Contracts

Requester::try_request() has to issue a RequestContract so the thread of execution does not block waiting for a response. However, that reason does not apply to Responder::try_response(). I originally made Responder::try_response() send the datum. However, that required the user to have the datum available to send even if it could not be sent, and it required the user to handle the returned datum if it could not be sent. If the datum was, say, half the contents of a Vec, this might entail lots of expensive memory allocation. Therefore, I made Responder::try_response() return a ResponseContract indicating that the responder could and would respond to the request. This way the user only has to perform the necessary steps to send the datum if the datum must be sent.

Examples

Simple Example

This simple, single-threaded example demonstrates most of the API. The only thing left out is RequestContract::try_cancel().

extern crate reqchan;

fn main() {
    // Create channel.
    let (requester, responder) = reqchan::channel::<u32>(); 
    
    // Issue request.
    let mut request_contract = requester.try_request().unwrap();
    
    // Respond with number.
    responder.try_respond().unwrap().send(5);
    
    // Receive and print number.
    println!("Number is {}", request_contract.try_receive().unwrap());
}

More Complex Example

This more complex example demonstrates more "real-world" usage. One thread requests a 'task' (i.e. a closure to run), and the other two threads fight over who gets to respond with their own personal task. Meanwhile, the requesting thread is polling for a task, and if it gets one in time, it runs it. Regardless of whether or not the receiver got a task or timed out, the receiver notifies other threads to stop running, and stops itself.

extern crate reqchan as chan;

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::{Duration, Instant};

// Stuff to make it easier to pass around closures.
trait FnBox {
    fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}
type Task = Box<FnBox + Send + 'static>;

fn main() {
    // Variable used to test calling a `Task` sent between threads.
    let test_var = Arc::new(AtomicUsize::new(0));
    let test_var2 = test_var.clone();
    let test_var3 = test_var.clone();
    
    // Variable needed to stop `responder` thread if `requester` times out
    let should_exit = Arc::new(AtomicBool::new(false));
    let should_exit_copy_1 = should_exit.clone();
    let should_exit_copy_2 = should_exit.clone();
    
    let (requester, responder) = chan::channel::<Task>();
    let responder2 = responder.clone();
    
    // requesting thread
    let requester_handle = thread::spawn(move || {
        let start_time = Instant::now();
        let timeout = Duration::new(0, 1000000);
        
        let mut contract = requester.try_request().unwrap();
    
        loop {
            // Try to cancel request and stop threads if runtime
            // has exceeded `timeout`.
            if start_time.elapsed() >= timeout {
                // Try to cancel request.
                // This should only fail if `responder` has started responding.
                if contract.try_cancel() {
                    // Notify other threads to stop.
                    should_exit.store(true, Ordering::SeqCst);
                    break;
                }
            }
    
            // Try getting `task` from `responder`.
            match contract.try_receive() {
                // `contract` received `task`.
                Ok(task) => {
                    task.call_box();
                    // Notify other threads to stop.
                    should_exit.store(true, Ordering::SeqCst);
                    break;
                },
                // Continue looping if `responder` has not yet sent `task`.
                Err(chan::TryReceiveError::Empty) => {},
                // The only other error is `chan::TryReceiveError::Done`.
                // This only happens if we call `contract.try_receive()`
                // after either receiving data or cancelling the request.
                _ => unreachable!(),
            }
        }
    });
    
    // responding thread 1
    let responder_1_handle = thread::spawn(move || {
        let mut tasks = vec![Box::new(move || {
            test_var2.fetch_add(1, Ordering::SeqCst);
        }) as Task];
        
        loop {
            // Exit loop if `receiver` has timed out.
            if should_exit_copy_1.load(Ordering::SeqCst) {
                break;
            }
            
            // Send `task` to `receiver` if it has issued a request.
            match responder2.try_respond() {
                // `responder2` can respond to request.
                Ok(contract) => {
                    contract.send(tasks.pop().unwrap());
                    break;
                },
                // Either `requester` has not yet made a request,
                // or `responder2` already handled the request.
                Err(chan::TryRespondError::NoRequest) => {},
                // `responder2` is processing request..
                Err(chan::TryRespondError::Locked) => { break; },
            }
        }
    });
    
    // responding thread 2
    let responder_2_handle = thread::spawn(move || {
        let mut tasks = vec![Box::new(move || {
            test_var3.fetch_add(2, Ordering::SeqCst);
        }) as Task];
        
        loop {
            // Exit loop if `receiver` has timed out.
            if should_exit_copy_2.load(Ordering::SeqCst) {
                break;
            }
            
            // Send `task` to `receiver` if it has issued a request.
            match responder.try_respond() {
                // `responder2` can respond to request.
                Ok(contract) => {
                    contract.send(tasks.pop().unwrap());
                    break;
                },
                // Either `requester` has not yet made a request,
                // or `responder` already handled the request.
                Err(chan::TryRespondError::NoRequest) => {},
                // `responder` is processing request.
                Err(chan::TryRespondError::Locked) => { break; },
            }
        }
    });
    
    requester_handle.join().unwrap();
    responder_1_handle.join().unwrap();
    responder_2_handle.join().unwrap();
    
    // `num` can be 0, 1 or 2.
    let num = test_var.load(Ordering::SeqCst);
    println!("Number is {}", num);
}

Platforms

reqchan-rs should Work on Windows and any POSIX compatible system (Linux, Mac OSX, etc.).

reqchan-rs is continuously tested on:

  • x86_64-unknown-linux-gnu (Linux)
  • i686-unknown-linux-gnu
  • x86_64-unknown-linux-musl (Linux w/ MUSL)
  • i686-unknown-linux-musl
  • x86_64-apple-darwin (Mac OSX)
  • i686-apple-darwin
  • x86_64-pc-windows-msvc (Windows)
  • i686-pc-windows-msvc
  • x86_64-pc-windows-gnu
  • i686-pc-windows-gnu

reqchan-rs is continuously cross-compiled for:

  • arm-unknown-linux-gnueabihf
  • aarch64-unknown-linux-gnu
  • mips-unknown-linux-gnu
  • aarch64-unknown-linux-musl
  • i686-linux-android
  • x86_64-linux-android
  • arm-linux-androideabi
  • aarch64-linux-android
  • i386-apple-ios
  • x86_64-apple-ios
  • i686-unknown-freebsd
  • x86_64-unknown-freebsd
  • x86_64-unknown-netbsd
  • asmjs-unknown-emscripten

No runtime deps