//! Check multiport capabilities on Outputs. //! //! Ported from LF https://github.com/lf-lang/lingua-franca/blob/master/test/C/src/concurrent/ThreadedMultiport.lf use boomerang::builder::prelude::*; use boomerang::{runtime, Reaction, Reactor}; pub struct State { s: i32, } mod source { use super::*; #[derive(Reactor)] #[reactor(state = "State", reaction = "ReactionT")] pub struct Source { #[reactor(timer(period = "200 msec"))] t: TimerActionKey, pub out: [TypedPortKey; WIDTH], } #[derive(Reaction)] #[reaction(reactor = "Source", triggers(action = "t"))] struct ReactionT<'a, const WIDTH: usize> { out: [runtime::OutputRef<'a, i32>; WIDTH], } impl Trigger> for ReactionT<'_, WIDTH> { fn trigger(mut self, _ctx: &mut runtime::Context, state: &mut State) { for o in self.out.iter_mut() { **o = Some(state.s); } state.s += 1; } } } mod computation { use super::*; #[derive(Reactor, Debug)] #[reactor(state = "()", reaction = "ReactionIn")] pub struct Computation { pub in_: TypedPortKey, pub out: TypedPortKey, } #[derive(Reaction)] #[reaction(reactor = "Computation", bound = "const ITERS: usize")] struct ReactionIn<'a> { in_: runtime::InputRef<'a, i32>, out: runtime::OutputRef<'a, i32>, } impl Trigger> for ReactionIn<'_> { fn trigger(mut self, _ctx: &mut runtime::Context, _state: &mut ()) { let mut offset = 0; for _ in 0..ITERS { offset += 1; //std::thread::sleep(std::time::Duration::from_nanos(1)); } *self.out = self.in_.map(|x| x + offset); } } } mod destination { use super::*; #[derive(Reactor)] #[reactor( state = "State", reaction = "ReactionIn", reaction = "ReactionShutdown" )] pub struct Destination { pub in_: [TypedPortKey; WIDTH], } #[derive(Reaction)] #[reaction(reactor = "Destination")] struct ReactionIn<'a, const WIDTH: usize, const ITERS: usize> { in_: [runtime::InputRef<'a, i32>; WIDTH], } impl Trigger> for ReactionIn<'_, WIDTH, ITERS> { fn trigger(self, _ctx: &mut runtime::Context, state: &mut State) { let expected = ITERS as i32 * WIDTH as i32 + state.s; let sum = self.in_.iter().filter_map(|x| x.as_ref()).sum::(); println!("Sum of received: {}.", sum); assert_eq!(sum, expected, "Expected {}.", expected); state.s += WIDTH as i32; } } #[derive(Reaction)] #[reaction( reactor = "Destination", bound = "const WIDTH: usize", bound = "const ITERS: usize", triggers(shutdown) )] struct ReactionShutdown; impl Trigger> for ReactionShutdown { fn trigger(self, _ctx: &mut runtime::Context, state: &mut State) { assert!(state.s > 0, "ERROR: Destination received no input!"); println!("Success."); } } } #[derive(Reactor)] #[reactor( state = "()", connection(from = "a.out", to = "t.in_"), connection(from = "t.out", to = "b.in_") )] struct ThreadedMultiport { #[reactor(child = "State{s: 0}")] a: source::Source, #[reactor(child = "()")] t: [computation::Computation; WIDTH], #[reactor(child = "State{s: 0}")] b: destination::Destination, #[reactor(child = "runtime::Duration::from_secs(2)")] _timeout: boomerang_util::timeout::Timeout, } #[test] fn threading_multiport() { tracing_subscriber::fmt::init(); let _ = boomerang_util::runner::build_and_test_reactor::>( "threaded_multiport", (), true, false, ) .unwrap(); }