6 releases

0.0.7 Apr 14, 2020
0.0.6 Apr 14, 2020
0.0.1 Nov 3, 2018

#753 in Asynchronous

Download history 4/week @ 2024-06-29 8/week @ 2024-07-27 73/week @ 2024-09-28

73 downloads per month

MIT license

16KB
175 lines

streamline

Reversible Stream-based state machine library for Rust

Example

Here's an example of how to use a Streamline to create a GitHub repo, Tweet about it, and reverse the whole process if something goes wrong.

// recommended, but not required
use async_trait::async_trait;
// some example clients for communicating through third-party APIs
use clients::{Github, Twitter};
use futures::StreamExt;
use streamline::{State, Streamline};

const MY_USERNAME: &'static str = "my-github-username";

// clients are stored in context for shared access throughout the life of the streamline
#[derive(Debug)]
struct Context {
  github: Github,
  twitter: Twitter,
}

#[derive(Debug, PartialEq)]
// you should create better errors than this
type MyError = Box<dyn std::error::Error + Send + Sync>;

#[derive(Clone, Debug, PartialEq)]
enum MyState {
  Github { repo_name: String },
  Twitter { repo_id: i32, repo_name: String }
  Done { repo_id: i32, repo_name: String, tweet_id: i32 }
}

#[async_trait(?Send)]
impl State for MyState {
  type Context = Context;
  type Error = MyError;

  // every state needs to be mapped to the next
  async fn next(&self, context: Option<&mut Self::Context>) -> Result<Option<Self>, Self::Error> {
    let context = context.ok_or_else(|_| Box::new("No context supplied!"))?;

    let next_state = match self {
        MyState::Github { repo_name } => {
          context
            .github
            .add_repo(&repo_name)
            .await?
            .map(|response| Some(MyState::Twitter { repo_id: &response.repo_id, repo_url: repo_name }))
        },
        MyState::Twitter { repo_name, .. } => {
          context
            .twitter
            .tweet(&format("Look at my new Github repo at https://github.com/{}/{}!", &repo_name))
            .await?
            .map(|response| Some(MyState::Done { tweet_id: response.tweet_id }))
        },
        MyState::Done { .. } => None // returning Ok(None) stops the stream!
      };

      Ok(next_state)
  }

  // optionally, old states can be cleaned up if something goes wrong
  async fn revert(&self, context: Option<&mut Self::Context>) -> Result<Option<Self>, Self::Error> {
    let context = context.ok_or_else(|_| Box::new("No context supplied!"))?;

    let next_state = match self {
      MyState::Done { tweet_id, repo_id, repo_name } => {
        context
          .twitter
          .delete_tweet(tweet_id)
          .await?;

        Some(MyState::Twitter { repo_id, repo_name })
      },
      MyState::Twitter { repo_id, repo_name } => {
        context
          .github
          .delete_repo(repo_id)
          .await?;

        Some(MyState::Github { repo_id, repo_name })
      },
      MyState::Github { .. } => None
    };

    Ok(next_state)
  }
}

async fn handle_tweeting_repo(repo_name: String) {
  let context = Context {
    github: Github::new(),
    twitter: Twitter::new(),
  };

  Streamline::build(MyState { repo_name })
    .context(context)
    .run()
    .for_each(|state| println!("Next state {:?}", &state))
    .await;
}

Motivation

If one wants to move from one state to the next within a process, it makes sense in Rust to look towards some of the many state machine patterns available through the type system. enums, in particular, are a great way of modeling the progress of a process in a way that excludes impossible states along the way. But there's less certainty around handling state for the following scenarios:

  1. Non-blocking state updates: it's often the case that we care most about the final state of a state machine, but we would also like to be updated when the state changes along the way to that terminal state. In state machines, this is often implemented as a side effect (e.g. through channels or an event broker).
  2. Super-statefulness: while it's common to carry state around inside individual variants of an enum, it's much trickier to know when to handle updates to state that is not directly attached to a single variant of the state machine. How does one, for example, handle updating a value in a database as a state machine progresses? What about interacting with third-party services? When should these parts of state be handled?
  3. Reversibility: most processes need to know how to clean up after themselves. Modeling these fallible processes and their path towards full reversion to the original state (or failure to do so) is a complex and boilerplate-heavy process.
  4. Cancellation: interrupting the execution of a Stream is easy... just drop the Stream! But cleaning up after a stream that represents some in-progress state is much more difficult.

streamline solves addresses those problems in the following ways:

  1. futures::Stream-compatibility: rather than using side effects during state machine execution, this library models every update to a state machine as an Item in a std::futures::Stream.
  2. Consistent Context: all super-variant state can be accessed with a consistent Context.
  3. Automatic Reversion: whenever a process returns an Err, streamline will (optionally) revert all the states up to that point, returning the original error.
  4. Manual Cancellation: the run_preemptible method returns a Stream and a Cancel handler that can be used to trigger the revert process of a working stream.

Dependencies

~0.9–1.5MB
~32K SLoC