#![allow( clippy::type_complexity, clippy::too_many_arguments, clippy::large_enum_variant )] use mpstthree::binary::struct_trait::end::End; use mpstthree::binary::struct_trait::session::Session; use mpstthree::binary_atmp::struct_trait::{recv::RecvTimed, send::SendTimed}; use mpstthree::generate_atmp; use mpstthree::role::broadcast::RoleBroadcast; use mpstthree::role::end::RoleEnd; use rand::{thread_rng, Rng}; use std::collections::HashMap; use std::error::Error; use std::thread::sleep; use std::time::{Duration, Instant}; // Create the new MeshedChannels for three participants and the close and fork functions generate_atmp!(MeshedChannels, Calculator1, Calculator2, Server); // Labels struct Data1 {} struct Data2 {} struct ProcesData2 {} struct ProcesData3 {} struct Complete {} struct Stop {} // Types // Calculator1 type OfferFromServertoCalculator1 = RecvTimed; type Calculator1toCalculator2Data = RecvTimed< ProcesData2, 'b', 3, true, 4, true, ' ', SendTimed, >; type Calculator1toServerData = RecvTimed< Data1, 'a', 0, true, 1, true, ' ', SendTimed, >; type Calculator1toCalculator2Stop = End; type Calculator1toServerStop = RecvTimed; enum BranchingFromServertoCalculator1 { Data( MeshedChannels< Calculator1toCalculator2Data, Calculator1toServerData, RoleServer>>>>, NameCalculator1, >, ), Stop( MeshedChannels< Calculator1toCalculator2Stop, Calculator1toServerStop, RoleServer, NameCalculator1, >, ), } // Calculator2 type OfferFromServertoCalculator2 = RecvTimed; type Calculator2toCalculator1Data = ::Dual; type Calculator2toServerData = RecvTimed< Data2, 'c', 0, true, 1, true, ' ', RecvTimed, >; type Calculator2toCalculator1Stop = ::Dual; type Calculator2toServerStop = RecvTimed; enum BranchingFromServertoCalculator2 { Data( MeshedChannels< Calculator2toCalculator1Data, Calculator2toServerData, RoleServer>>>>, NameCalculator2, >, ), Stop( MeshedChannels< Calculator2toCalculator1Stop, Calculator2toServerStop, RoleServer, NameCalculator2, >, ), } // Server type ChooseFromServertoCalculator1 = ::Dual; type ChooseFromServertoCalculator2 = ::Dual; type ServertoCalculator1Data = ::Dual; type ServertoCalculator2Data = ::Dual; type ServertoCalculator1Stop = ::Dual; type ServertoCalculator2Stop = ::Dual; // Endpoints // Calculator1 type Endpoint0Calculator1 = MeshedChannels, NameCalculator1>; // Calculator2 type Endpoint0Calculator2 = MeshedChannels, NameCalculator2>; // Server type Endpoint0Server = MeshedChannels< ChooseFromServertoCalculator1, ChooseFromServertoCalculator2, RoleBroadcast, NameServer, >; type Endpoint1ServerData = MeshedChannels< ServertoCalculator1Data, ServertoCalculator2Data, RoleCalculator1>>>, NameServer, >; type Endpoint1ServerStop = MeshedChannels< ServertoCalculator1Stop, ServertoCalculator2Stop, RoleCalculator1>, NameServer, >; // Functions ///////////////////////// // Calculator1 fn endpoint_calculator_1( s: Endpoint0Calculator1, all_clocks: &mut HashMap, ) -> Result<(), Box> { all_clocks.insert('a', Instant::now()); all_clocks.insert('b', Instant::now()); recurs_calculator_1(s, all_clocks) } fn recurs_calculator_1( s: Endpoint0Calculator1, all_clocks: &mut HashMap, ) -> Result<(), Box> { sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); offer_mpst!(s, all_clocks, { BranchingFromServertoCalculator1::Stop(s) => { sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let (_stop, s) = s.recv(all_clocks)?; s.close() }, BranchingFromServertoCalculator1::Data(s) => { sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let (_data_1, s) = s.recv(all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(3000..=3200))); let (_processed_data_2, s) = s.recv(all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(2000..=2200))); let s = s.send(ProcesData3 {}, all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let s = s.send(Complete {}, all_clocks)?; recurs_calculator_1(s, all_clocks) }, }) } ///////////////////////// // Calculator2 fn endpoint_calculator_2( s: Endpoint0Calculator2, all_clocks: &mut HashMap, ) -> Result<(), Box> { all_clocks.insert('b', Instant::now()); all_clocks.insert('c', Instant::now()); recurs_calculator_2(s, all_clocks) } fn recurs_calculator_2( s: Endpoint0Calculator2, all_clocks: &mut HashMap, ) -> Result<(), Box> { sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); offer_mpst!(s, all_clocks, { BranchingFromServertoCalculator2::Stop(s) => { sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let (_stop, s) = s.recv(all_clocks)?; s.close() }, BranchingFromServertoCalculator2::Data(s) => { sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let (_data_2, s) = s.recv(all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(3000..=3200))); let s = s.send(ProcesData2 {}, all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(2000..=2200))); let (_complete, s) = s.recv(all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let (_complete, s) = s.recv(all_clocks)?; recurs_calculator_2(s, all_clocks) }, }) } ///////////////////////// // Server fn endpoint_server( s: Endpoint0Server, all_clocks: &mut HashMap, ) -> Result<(), Box> { all_clocks.insert('a', Instant::now()); all_clocks.insert('c', Instant::now()); recurs_server(s, all_clocks, LOOPS) } fn recurs_server( s: Endpoint0Server, all_clocks: &mut HashMap, loops: i64, ) -> Result<(), Box> { sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); match loops { 0 => { let s: Endpoint1ServerStop = choose_mpst_server_to_all!( s, all_clocks, BranchingFromServertoCalculator1::Stop, BranchingFromServertoCalculator2::Stop, ); sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let s = s.send(Stop {}, all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let s = s.send(Stop {}, all_clocks)?; s.close() } i => { let s: Endpoint1ServerData = choose_mpst_server_to_all!( s, all_clocks, BranchingFromServertoCalculator1::Data, BranchingFromServertoCalculator2::Data, ); sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let s = s.send(Data1 {}, all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let s = s.send(Data2 {}, all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(5000..=5200))); let (_processed_data_3, s) = s.recv(all_clocks)?; sleep(Duration::from_millis(thread_rng().gen_range(0..=200))); let s = s.send(Complete {}, all_clocks)?; recurs_server(s, all_clocks, i - 1) } } } ///////////////////////// fn main() { let (thread_calculator_1, thread_calculator_2, thread_server) = fork_mpst( endpoint_calculator_1, endpoint_calculator_2, endpoint_server, ); thread_calculator_1.join().unwrap(); thread_calculator_2.join().unwrap(); thread_server.join().unwrap(); } static LOOPS: i64 = 100;