/* * AquaVM Workflow Engine * * Copyright (C) 2024 Fluence DAO * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation version 3 of the * License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use air_test_utils::prelude::*; use futures::FutureExt; use futures::StreamExt; use std::cell::RefCell; use std::collections::HashSet; use std::rc::Rc; fn parse_peers() -> Vec { use csv::ReaderBuilder; let data = include_str!("dashboard/peers"); let mut rdr = ReaderBuilder::new() .delimiter(b',') .has_headers(false) .from_reader(data.as_bytes()); let mut result = Vec::new(); while let Some(record) = rdr.records().next() { let record = record.unwrap(); result.push(record.as_slice().to_string()); } result } fn into_hashset(peers: Vec) -> HashSet { peers.into_iter().collect() } fn client_host_function( known_peers: Vec, client_id: String, relay_id: String, ) -> (CallServiceClosure<'static>, Rc>) { let all_info = Rc::new(RefCell::new(String::new())); let known_peers = serde_json::Value::Array(known_peers.iter().cloned().map(Into::into).collect()); let client_id = serde_json::Value::String(client_id); let relay_id = serde_json::Value::String(relay_id); let to_ret_value = Box::new( move |service_name: &str, function_name: &str, arguments: Vec| -> serde_json::Value { if !service_name.is_empty() || function_name != "load" || arguments.len() != 1 { return serde_json::Value::Null; } match arguments[0].as_str() { "relayId" => relay_id.clone(), "knownPeers" => known_peers.clone(), "clientId" => client_id.clone(), _ => serde_json::Value::Null, } }, ); let all_info_inner = all_info.clone(); let host_function: CallServiceClosure = Box::new(move |params| { let ret_value = match serde_json::from_value(serde_json::Value::Array(params.arguments.clone())) { Ok(args) => to_ret_value(params.service_id.as_str(), params.function_name.as_str(), args), Err(_) => { *all_info_inner.borrow_mut() = serde_json::Value::Array(params.arguments).to_string(); serde_json::Value::Null } }; let result = CallServiceResult::ok(ret_value); async move { result }.boxed_local() }); (host_function, all_info) } fn peer_host_function( known_peers: Vec, blueprints: Vec, modules: Vec, interfaces: Vec, ident: String, ) -> CallServiceClosure<'static> { let known_peers = serde_json::Value::Array(known_peers.into_iter().map(serde_json::Value::String).collect()); let blueprints = serde_json::Value::Array(blueprints.into_iter().map(serde_json::Value::String).collect()); let modules = serde_json::Value::Array(modules.into_iter().map(serde_json::Value::String).collect()); let interfaces = serde_json::Value::Array(interfaces.into_iter().map(serde_json::Value::String).collect()); let identify = serde_json::Value::String(ident.clone()); let ident = serde_json::Value::String(ident); let to_ret_value = Box::new( move |service_name: &str, function_name: &str, arguments: Vec<&str>| -> serde_json::Value { match (service_name, function_name, arguments.as_slice()) { ("op", "identity", _) => ident.clone(), ("op", "identify", _) => identify.clone(), ("dist", "get_blueprints", _) => blueprints.clone(), ("dist", "get_modules", _) => modules.clone(), ("srv", "get_interfaces", _) => interfaces.clone(), ("dht", "neighborhood", _) => known_peers.clone(), _ => serde_json::Value::Null, } }, ); Box::new(move |params| { let args: Vec = serde_json::from_value(serde_json::Value::Array(params.arguments)).unwrap(); let t_args = args.iter().map(|s| s.as_str()).collect::>(); let ret_value = to_ret_value(params.service_id.as_str(), params.function_name.as_str(), t_args); let result = CallServiceResult::ok(ret_value); async move { result }.boxed_local() }) } #[rustfmt::skip] fn create_peer_host_function(peer_id: String, known_peer_ids: Vec) -> CallServiceClosure<'static> { let relay_blueprints = (0..=2).map(|id| format!("{peer_id}_blueprint_{id}")).collect::>(); let relay_modules = (0..=2).map(|id| format!("{peer_id}_module_{id}")).collect::>(); let relay_interfaces = (0..=2).map(|id| format!("{peer_id}_interface_{id}")).collect::>(); let relay_ident = format!("{peer_id}_ident"); peer_host_function( known_peer_ids, relay_blueprints, relay_modules, relay_interfaces, relay_ident, ) } struct AVMState { vm: TestRunner, peer_id: String, prev_result: Vec, } #[tokio::test] async fn dashboard() { let script = include_str!("./scripts/dashboard.air"); let known_peer_ids = parse_peers(); let client_id = "client_id".to_string(); let relay_id = "relay_id".to_string(); let (host_function, all_info) = client_host_function(known_peer_ids.clone(), client_id.clone(), relay_id.clone()); let mut client = create_avm(host_function, client_id.clone()).await; let mut relay = create_avm( create_peer_host_function(relay_id.clone(), known_peer_ids.clone()), relay_id.clone(), ) .await; let mut known_peers = futures::stream::iter(known_peer_ids.iter().cloned()) .then(|peer_id| async { let vm = create_avm( create_peer_host_function(peer_id.clone(), known_peer_ids.clone()), peer_id.clone(), ) .await; AVMState { vm, peer_id, prev_result: vec![], } }) .collect::>() .await; let test_params = TestRunParameters::from_init_peer_id(client_id.clone()); // -> client 1 let client_1_result = checked_call_vm!(client, test_params.clone(), script, "", ""); let next_peer_pks = into_hashset(client_1_result.next_peer_pks); let mut all_peer_pks = into_hashset(known_peer_ids); all_peer_pks.insert(relay_id.clone()); assert_eq!(next_peer_pks, all_peer_pks); // client 1 -> relay 1 let relay_1_result = checked_call_vm!(relay, test_params.clone(), script, client_1_result.data.clone(), ""); let next_peer_pks = into_hashset(relay_1_result.next_peer_pks.clone()); all_peer_pks.remove(&relay_id); all_peer_pks.insert(client_id.clone()); assert_eq!(next_peer_pks, all_peer_pks); // relay 1 -> client 2 let client_2_result = checked_call_vm!( client, test_params.clone(), script, client_1_result.data.clone(), relay_1_result.data.clone() ); assert!(client_2_result.next_peer_pks.is_empty()); assert_eq!( *all_info.borrow(), String::from( r#"["relay_id","relay_id_ident",["relay_id_interface_0","relay_id_interface_1","relay_id_interface_2"],["relay_id_blueprint_0","relay_id_blueprint_1","relay_id_blueprint_2"],["relay_id_module_0","relay_id_module_1","relay_id_module_2"]]"# ) ); let mut relay_2_result = relay_1_result.clone(); let mut client_3_result = client_2_result; // peers 1 -> relay 2 -> client 3 for avm in known_peers.iter_mut() { let prev_result = std::mem::take(&mut avm.prev_result); let known_peer_result = checked_call_vm!( avm.vm, test_params.clone(), script, prev_result, client_1_result.data.clone() ); assert_eq!(known_peer_result.next_peer_pks, vec![relay_id.clone()]); avm.prev_result = known_peer_result.data; relay_2_result = checked_call_vm!( relay, test_params.clone(), script, relay_2_result.data.clone(), avm.prev_result.clone() ); assert_eq!(relay_2_result.next_peer_pks, vec![client_id.clone()]); client_3_result = checked_call_vm!( client, test_params.clone(), script, client_3_result.data.clone(), relay_2_result.data.clone() ); assert!(client_3_result.next_peer_pks.is_empty()); assert_eq!( *all_info.borrow(), format!( r#"["{peer_id}","{peer_id}_ident",["{peer_id}_interface_0","{peer_id}_interface_1","{peer_id}_interface_2"],["{peer_id}_blueprint_0","{peer_id}_blueprint_1","{peer_id}_blueprint_2"],["{peer_id}_module_0","{peer_id}_module_1","{peer_id}_module_2"]]"#, peer_id = avm.peer_id ) ) } all_peer_pks.remove(&client_id); all_peer_pks.insert(relay_id.to_string()); let mut relay_3_result = relay_2_result; let mut client_4_result = client_3_result; // peers 2 -> relay 3 -> client 4 for avm in known_peers.iter_mut() { let prev_result = std::mem::take(&mut avm.prev_result); let known_peer_result = checked_call_vm!( avm.vm, test_params.clone(), script, prev_result, relay_1_result.data.clone() ); all_peer_pks.remove(&avm.peer_id); let next_peer_pks = into_hashset(known_peer_result.next_peer_pks.clone()); assert_eq!(next_peer_pks, all_peer_pks); all_peer_pks.insert(avm.peer_id.clone()); avm.prev_result = known_peer_result.data; relay_3_result = checked_call_vm!( relay, test_params.clone(), script, relay_3_result.data.clone(), avm.prev_result.clone() ); assert_eq!(relay_3_result.next_peer_pks, vec![client_id.clone()]); // client -> peers -> relay -> client client_4_result = checked_call_vm!( client, test_params.clone(), script, client_4_result.data.clone(), relay_3_result.data.clone() ); assert!(client_4_result.next_peer_pks.is_empty()); assert_eq!( *all_info.borrow(), format!( r#"["{peer_id}","{peer_id}_ident",["{peer_id}_interface_0","{peer_id}_interface_1","{peer_id}_interface_2"],["{peer_id}_blueprint_0","{peer_id}_blueprint_1","{peer_id}_blueprint_2"],["{peer_id}_module_0","{peer_id}_module_1","{peer_id}_module_2"]]"#, peer_id = avm.peer_id ) ) } let mut relay_4_result = relay_3_result; let mut client_5_result = client_4_result; // peers 2 -> peers 3 -> relay 4 -> client 5 for i in 0..known_peers.len() { for j in 0..known_peers.len() { if known_peers[i].peer_id == known_peers[j].peer_id { continue; } let prev_data = known_peers[j].prev_result.clone(); let data = known_peers[i].prev_result.clone(); let known_peer_i_j_result = checked_call_vm!(known_peers[j].vm, test_params.clone(), script, prev_data, data); assert_eq!(known_peer_i_j_result.next_peer_pks, vec![relay_id.clone()]); known_peers[j].prev_result = known_peer_i_j_result.data; relay_4_result = checked_call_vm!( relay, test_params.clone(), script, relay_4_result.data.clone(), known_peers[j].prev_result.clone() ); assert_eq!(relay_4_result.next_peer_pks, vec![client_id.clone()]); // client -> peers -> relay -> client client_5_result = checked_call_vm!( client, test_params.clone(), script, client_5_result.data.clone(), relay_4_result.data.clone() ); assert!(client_5_result.next_peer_pks.is_empty()); assert_eq!( *all_info.borrow(), format!( r#"["{peer_id}","{peer_id}_ident",["{peer_id}_interface_0","{peer_id}_interface_1","{peer_id}_interface_2"],["{peer_id}_blueprint_0","{peer_id}_blueprint_1","{peer_id}_blueprint_2"],["{peer_id}_module_0","{peer_id}_module_1","{peer_id}_module_2"]]"#, peer_id = known_peers[j].peer_id ) ); } } }