//! A transition in a CPN. pub mod guard; use crate::arc::{Arc, PlaceToTransition, TransitionToPlace}; use crate::error::{Error, Result}; use crate::marking::Marking; use crate::place::Capacity; use crate::token::Token; pub use guard::Guard; use itertools::Itertools; use std::any::Any; use std::collections::HashMap; /// A transition in a CPN. /// /// Transitions have a name, and optionally a guard. #[derive(Debug)] pub struct Transition { pub(crate) name: String, pub(crate) guard: Option, pub(crate) in_arcs: Vec>, pub(crate) out_arcs: Vec>, meta: Option>, } impl Transition { /// Create a new, unguarded transition with the given `name`. pub fn new(name: S) -> Self where S: Into, { Self { name: name.into(), guard: None, in_arcs: Vec::new(), out_arcs: Vec::new(), meta: None, } } /// Add a guard expression that must evaluate to `True` for every input token. /// /// If the guard evaluates to `False`, the transition does not fire. pub fn with_guard(mut self, guard: Guard) -> Self { self.guard = Some(guard); self } /// Attach the metadata object `meta` to this transition. pub fn with_metadata(mut self, meta: Box) -> Self { self.meta = Some(meta); self } /// Get the name of this transition. pub fn name(&self) -> &str { &self.name } /// Get the metadata attached to this transition, if any. pub fn metadata(&self) -> Option<&dyn Any> { self.meta.as_ref().map(AsRef::as_ref) } pub(crate) fn add_in_arc(&mut self, arc: Arc) -> Result<()> { for existing_arc in self.in_arcs.iter() { if existing_arc.binding() == arc.binding() { return Err(Error::InvalidArcExpression); } if existing_arc.place == arc.place { return Err(Error::DuplicateArc); } } self.in_arcs.push(arc); Ok(()) } pub(crate) fn add_out_arc(&mut self, arc: Arc) -> Result<()> { for existing_arc in self.out_arcs.iter() { if existing_arc.place == arc.place { return Err(Error::DuplicateArc); } } self.out_arcs.push(arc); Ok(()) } /// Check if a sufficient number of tokens satsify the guard expression, if there is a guard. pub(crate) fn check_guard(&self, marking: &Marking) -> Result { let guard = match self.guard.as_ref() { Some(guard) => guard, None => return Ok(true), }; // this construction is a bit clumsy, because Guard::evaluate needs an iterator over values // of type (&str, &PyObject) let mut token_candidates: Vec> = Vec::new(); for arc in self.in_arcs.iter() { let name = arc.binding(); let tokens = marking .get(&arc.place)? .iter() .map(|token| (name, token)) .collect(); token_candidates.push(tokens); } // check all combinations of tokens for token_combination in token_candidates.iter().multi_cartesian_product() { if guard.evaluate( token_combination .iter() .map(|(name, token)| (*name, &token.object)), )? { return Ok(true); } } Ok(false) } /// Check if the transition can fire, but **does not** check the transition guard! pub(crate) fn can_fire(&self, marking: &Marking) -> Result { if self.in_arcs.is_empty() && self.out_arcs.is_empty() { // transitions with no arcs can never fire return Ok(false); } for arc in self.in_arcs.iter() { if marking.get(&arc.place)?.is_empty() { return Ok(false); } } for arc in self.out_arcs.iter() { let mut free = marking.free_capacity(&arc.place)?; // handle edge-case where a place has both an input- and an output-arc to/from a // transition if self.in_arcs.iter().any(|a| a.place == arc.place) { free += 1; } if let Capacity::Limited(cap) = free { if cap < arc.weight() { return Ok(false); } } } Ok(true) } pub(crate) fn fire(&self, marking: &mut Marking) -> Result<()> { let mut out_tokens: HashMap = HashMap::new(); let mut token_candidates = Vec::new(); for arc in self.in_arcs.iter() { let name = arc.binding(); let tokens = marking .get(&arc.place)? .iter() .enumerate() .map(|(idx, _)| (name.to_owned(), arc, idx)) .collect::>(); token_candidates.push(tokens); } let mut guard_check_succeeded = self.guard.is_none(); for token_combination in token_candidates.iter().multi_cartesian_product() { if let Some(guard) = self.guard.as_ref() { if !guard.evaluate(token_combination.iter().map(|(name, arc, idx)| { ( name.as_ref(), &marking.get(&arc.place).unwrap()[*idx].object, ) }))? { continue; } } for (name, arc, idx) in token_combination { let place_tokens = marking.get_mut(&arc.place)?; let token = place_tokens.remove(*idx); out_tokens.insert(name.to_owned(), token); } // one working combination was found, done guard_check_succeeded = true; break; } if !guard_check_succeeded { return Err(Error::GuardCheckFailed); } for arc in self.out_arcs.iter() { let mut tokens = Vec::new(); for name in arc.input_variables() { match out_tokens.get(name.as_str()) { Some(token) => tokens.push(token.clone()), None => return Err(Error::TokenNotFound(name.to_owned())), } } let token = arc.evaluate(tokens)?; let place_tokens = marking.get_mut(&arc.place)?; place_tokens.push(token); } Ok(()) } }