#mqtt-client #event-loop #stream #robust #tokio #requests #behavior

svc-rumq-client

An efficient and robust mqtt client for your connected devices

1 unstable release

0.1.0-alpha.11 Jul 25, 2020

#8 in #behaviour

MIT license

155KB
3K SLoC

A pure rust mqtt client which strives to be robust, efficient and easy to use.

  • Eventloop is just an async Stream which can be polled by tokio
  • Requests to eventloop is also a Stream. Solves both bounded an unbounded usecases
  • Robustness just a loop away
  • Flexible access to the state of eventloop to control its behaviour

Accepts any stream of Requests

Build bounded, unbounded, interruptible or any other stream (that fits your need) to feed the eventloop.

Few of our real world use cases

  • A stream which orchestrates data between disk and memory by detecting backpressure and never (practically) loose data
  • A stream which juggles data between several channels based on priority of the data
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);
    let mut stream = eventloop.connect().await.unwrap();
    while let Some(item) = stream.next().await {
        println!("Received = {:?}", item);
    }
}

Robustness a loop away

Networks are unreliable. But robustness is easy

  • Just create a new stream from the existing eventloop
  • Resumes from where it left
  • Access the state of the eventloop to customize the behaviour of the next connection
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // loop to reconnect and resume
    loop {
        let mut stream = eventloop.connect().await.unwrap();
        while let Some(item) = stream.next().await {
            println!("Received = {:?}", item);
        }

        time::delay_for(Duration::from_secs(1)).await;
    }
}

Eventloop is just a stream which can be polled with tokio

  • Plug it into select! join! to interleave with other streams on the the same thread
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // plug it into tokio ecosystem
    let mut stream = eventloop.connect().await.unwrap();
}

Powerful notification system to control the runtime

Eventloop stream yields all the interesting event ranging for data on the network to disconnections and reconnections. Use it the way you see fit

  • Resubscribe after reconnection
  • Stop after receiving Nth puback
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let (requests_tx, requests_rx) = channel(10);

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // loop to reconnect and resume
    loop {
        let mut stream = eventloop.connect().await.unwrap();
        while let Some(notification) = stream.next().await {
            println!("Received = {:?}", item);
            match notification {
                Notification::Connect => requests_tx.send(subscribe).unwrap(),
            }
        }

        time::delay_for(Duration::from_secs(1)).await;
    }
}

Dependencies

~15MB
~357K SLoC