Crates.io | streamline |
lib.rs | streamline |
version | 0.0.7 |
source | src |
created_at | 2018-11-03 00:39:05.096901 |
updated_at | 2020-04-14 20:30:53.252219 |
description | Reversible futures::Stream-based state machines |
homepage | |
repository | https://github.com/NAlexPear/streamline |
max_upload_size | |
id | 94436 |
size | 24,494 |
streamline
Reversible Stream-based state machine library for Rust
Here's an example of how to use a Streamline
to create a GitHub repo, Tweet about it, and reverse the whole process if something goes wrong.
// recommended, but not required
use async_trait::async_trait;
// some example clients for communicating through third-party APIs
use clients::{Github, Twitter};
use futures::StreamExt;
use streamline::{State, Streamline};
const MY_USERNAME: &'static str = "my-github-username";
// clients are stored in context for shared access throughout the life of the streamline
#[derive(Debug)]
struct Context {
github: Github,
twitter: Twitter,
}
#[derive(Debug, PartialEq)]
// you should create better errors than this
type MyError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Clone, Debug, PartialEq)]
enum MyState {
Github { repo_name: String },
Twitter { repo_id: i32, repo_name: String }
Done { repo_id: i32, repo_name: String, tweet_id: i32 }
}
#[async_trait(?Send)]
impl State for MyState {
type Context = Context;
type Error = MyError;
// every state needs to be mapped to the next
async fn next(&self, context: Option<&mut Self::Context>) -> Result<Option<Self>, Self::Error> {
let context = context.ok_or_else(|_| Box::new("No context supplied!"))?;
let next_state = match self {
MyState::Github { repo_name } => {
context
.github
.add_repo(&repo_name)
.await?
.map(|response| Some(MyState::Twitter { repo_id: &response.repo_id, repo_url: repo_name }))
},
MyState::Twitter { repo_name, .. } => {
context
.twitter
.tweet(&format("Look at my new Github repo at https://github.com/{}/{}!", &repo_name))
.await?
.map(|response| Some(MyState::Done { tweet_id: response.tweet_id }))
},
MyState::Done { .. } => None // returning Ok(None) stops the stream!
};
Ok(next_state)
}
// optionally, old states can be cleaned up if something goes wrong
async fn revert(&self, context: Option<&mut Self::Context>) -> Result<Option<Self>, Self::Error> {
let context = context.ok_or_else(|_| Box::new("No context supplied!"))?;
let next_state = match self {
MyState::Done { tweet_id, repo_id, repo_name } => {
context
.twitter
.delete_tweet(tweet_id)
.await?;
Some(MyState::Twitter { repo_id, repo_name })
},
MyState::Twitter { repo_id, repo_name } => {
context
.github
.delete_repo(repo_id)
.await?;
Some(MyState::Github { repo_id, repo_name })
},
MyState::Github { .. } => None
};
Ok(next_state)
}
}
async fn handle_tweeting_repo(repo_name: String) {
let context = Context {
github: Github::new(),
twitter: Twitter::new(),
};
Streamline::build(MyState { repo_name })
.context(context)
.run()
.for_each(|state| println!("Next state {:?}", &state))
.await;
}
If one wants to move from one state to the next within a process, it makes sense in Rust to look towards some of the many state machine patterns available through the type system. enum
s, in particular, are a great way of modeling the progress of a process in a way that excludes impossible states along the way. But there's less certainty around handling state for the following scenarios:
enum
, it's much trickier to know when to handle updates to state that is not directly attached to a single variant of the state machine. How does one, for example, handle updating a value in a database as a state machine progresses? What about interacting with third-party services? When should these parts of state be handled?Stream
is easy... just drop the Stream
! But cleaning up after a stream that represents some in-progress state is much more difficult.streamline
solves addresses those problems in the following ways:
futures::Stream
-compatibility: rather than using side effects during state machine execution, this library models every update to a state machine as an Item
in a std::futures::Stream
.Context
: all super-variant state can be accessed with a consistent Context
.Err
, streamline
will (optionally) revert all the states up to that point, returning the original error.run_preemptible
method returns a Stream
and a Cancel
handler that can be used to trigger the revert process of a working stream.