/* * AquaVM Workflow Engine * * Copyright (C) 2024 Fluence DAO * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation version 3 of the * License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use air_test_utils::key_utils::at; use air_test_utils::prelude::*; use futures::FutureExt; use polyplets::SecurityTetraplet; use pretty_assertions::assert_eq; use std::cell::RefCell; use std::rc::Rc; type ArgTetraplets = Vec>; fn arg_host_function() -> (CallServiceClosure<'static>, Rc>) { let arg_tetraplets = Rc::new(RefCell::new(ArgTetraplets::new())); let arg_tetraplets_inner = arg_tetraplets.clone(); let host_function: CallServiceClosure = Box::new(move |params| { let result = json!(params.tetraplets); *arg_tetraplets_inner.borrow_mut() = params.tetraplets; let result = CallServiceResult::ok(result); async move { result }.boxed_local() }); (host_function, arg_tetraplets) } #[tokio::test] async fn fold_with_inner_call() { let return_numbers_call_service: CallServiceClosure = Box::new(|_| { async move { CallServiceResult::ok(json!(["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"])) }.boxed_local() }); let set_variable_vm_peer_id = String::from("some_peer_id_1"); let mut set_variable_vm = create_avm(return_numbers_call_service, set_variable_vm_peer_id.clone()).await; let mut client_vms = Vec::new(); for i in 1..=10 { let (arg_host_func, arg_tetraplets) = arg_host_function(); let vm = create_avm(arg_host_func, i.to_string()).await; client_vms.push((vm, arg_tetraplets)) } let service_id = String::from("some_service_id"); let function_name = String::from("some_function_name"); let script = format!( r#" (seq (call "{set_variable_vm_peer_id}" ("{service_id}" "{function_name}") [] IterableResultPeer1) (fold IterableResultPeer1 i (par (call i ("local_service_id" "local_fn_name") [i "some_text_literal"] $acc) (next i) ) ) ) "# ); let test_params = TestRunParameters::from_init_peer_id("init_peer_id"); let result = checked_call_vm!(set_variable_vm, test_params.clone(), script.clone(), "", ""); let mut data = result.data; let second_arg_tetraplet = SecurityTetraplet { peer_pk: test_params.init_peer_id.clone(), ..Default::default() }; for i in 0..10 { let result = checked_call_vm!(client_vms[i].0, test_params.clone(), script.clone(), "", data); data = result.data; let first_arg_tetraplet = SecurityTetraplet { peer_pk: set_variable_vm_peer_id.clone(), service_id: service_id.clone(), function_name: function_name.clone(), lens: format!(".$.[{}]", i), }; let expected_tetraplets = vec![vec![first_arg_tetraplet], vec![second_arg_tetraplet.clone()]]; let expected_tetraplets = Rc::new(RefCell::new(expected_tetraplets)); assert_eq!(client_vms[i].1, expected_tetraplets); } } #[tokio::test] async fn fold_stream_with_inner_call() { let init_peer_name = "init_peer_id"; let air_script = r#" (seq (seq (call "init_peer_id" ("" "") [] $stream) ; ok = 42 (seq (call "init_peer_id" ("" "") [] var) ; ok = {"field": 43} (ap var.$.field $stream))) (fold $stream i (seq (call "init_peer_id" ("" "") [i] $s2) ; behaviour = tetraplet (next i)))) "#; let executor = air_test_framework::AirScriptExecutor::from_annotated( TestRunParameters::from_init_peer_id(init_peer_name), &air_script, ) .await .unwrap(); let result = executor.execute_one(init_peer_name).await.unwrap(); assert_eq!(result.ret_code, 0, "{}", result.error_message); let data = data_from_result(&result); let init_peer_id = at(init_peer_name); let expected_trace = vec![ stream!( json!([[{"peer_pk": init_peer_id, "service_id": "..0", "function_name": "", "lens": ""}]]), 0, peer = &init_peer_id, service = "..2", args = [42] ), stream!( json!([[{"peer_pk": init_peer_id, "service_id": "..1", "function_name": "", "lens": ".$.field"}]]), 0, peer = init_peer_id, service = "..2", args = [43] ), ]; assert_eq!(&(*data.trace)[4..], &expected_trace, "{:?}", data.cid_info); } #[tokio::test] async fn fold_canon_with_inner_call() { let init_peer_name = "init_peer_id"; let air_script = r#" (seq (seq (seq (call "init_peer_id" ("" "") [] $stream) ; ok = 42 (call "init_peer_id" ("" "") [] var)) ; ok = {"field": 43} (ap var.$.field $stream)) (seq (canon "init_peer_id" $stream #can) (fold #can x (seq (call "init_peer_id" ("" "") [x] $s2) ; behaviour=tetraplet (next x))))) "#; let executor = air_test_framework::AirScriptExecutor::from_annotated( TestRunParameters::from_init_peer_id(init_peer_name), &air_script, ) .await .unwrap(); let result = executor.execute_one(init_peer_name).await.unwrap(); assert_eq!(result.ret_code, 0, "{}", result.error_message); let data = data_from_result(&result); let init_peer_id = at(init_peer_name); let expected_trace = vec![ stream!( json!([[{"peer_pk": init_peer_id, "service_id": "..0", "function_name": "", "lens": ""}]]), 0, peer = &init_peer_id, service = "..2", args = [42] ), stream!( json!([[{"peer_pk": init_peer_id, "service_id": "..1", "function_name": "", "lens": ".$.field"}]]), 1, peer = init_peer_id, service = "..2", args = [43] ), ]; assert_eq!(&(*data.trace)[4..], &expected_trace, "{:?}", data.cid_info); } #[tokio::test] async fn fold_lens() { let variable_numbers = json!({"args": ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]}); let set_variable_vm_peer_id = String::from("some_peer_id_1"); let mut set_variable_vm = create_avm( set_variable_call_service(variable_numbers), set_variable_vm_peer_id.clone(), ) .await; let (arg_host_func, arg_tetraplets) = arg_host_function(); let client_peer_id = String::from("client_id"); let mut client_vm = create_avm(arg_host_func, client_peer_id.clone()).await; let service_id = String::from("some_service_id"); let function_name = String::from("some_function_name"); let script = format!( r#" (seq (call "{set_variable_vm_peer_id}" ("{service_id}" "{function_name}") [] IterableResultPeer1) (fold IterableResultPeer1.$.args i (seq (fold IterableResultPeer1.$.args j (seq (call "{client_peer_id}" ("local_service_id" "local_fn_name") [i "some_text_literal"] $acc) (next j) ) ) (next i) ) ) ) "# ); let test_params = TestRunParameters::from_init_peer_id("some_init_peer_id"); let result = checked_call_vm!(set_variable_vm, test_params.clone(), script.clone(), "", ""); let first_arg_tetraplet = SecurityTetraplet { peer_pk: set_variable_vm_peer_id, service_id, function_name, lens: String::from(".$.args.$.[9]"), }; let second_arg_tetraplet = SecurityTetraplet { peer_pk: test_params.init_peer_id.clone(), service_id: String::new(), function_name: String::new(), lens: String::new(), }; let expected_tetraplets = vec![vec![first_arg_tetraplet], vec![second_arg_tetraplet]]; let expected_tetraplets = Rc::new(RefCell::new(expected_tetraplets)); checked_call_vm!(client_vm, test_params, script, "", result.data); assert_eq!(arg_tetraplets, expected_tetraplets); } #[tokio::test] async fn check_tetraplet_works_correctly() { let return_numbers_call_service: CallServiceClosure = Box::new(|_| { async move { CallServiceResult::ok(json!({"args": ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]})) } .boxed_local() }); let set_variable_vm_peer_id = String::from("some_peer_id_1"); let mut set_variable_vm = create_avm(return_numbers_call_service, set_variable_vm_peer_id.clone()).await; let (arg_host_func, arg_tetraplets) = arg_host_function(); let client_peer_id = String::from("client_id"); let mut client_vm = create_avm(arg_host_func, client_peer_id.clone()).await; let service_id = String::from("some_service_id"); let function_name = String::from("some_function_name"); let script = format!( r#" (seq (call "{set_variable_vm_peer_id}" ("{service_id}" "{function_name}") [] value) (seq (call "{client_peer_id}" ("local_service_id" "local_fn_name") [value.$.args value.$.args.[0]]) (call "{client_peer_id}" ("local_service_id" "local_fn_name") [value.$.args value.$.args.[0]]) ) )"# ); let result = checked_call_vm!(set_variable_vm, <_>::default(), script.clone(), "", ""); let first_arg_tetraplet = SecurityTetraplet { peer_pk: set_variable_vm_peer_id.clone(), service_id: service_id.clone(), function_name: function_name.clone(), lens: String::from(".$.args"), }; let second_arg_tetraplet = SecurityTetraplet { peer_pk: set_variable_vm_peer_id, service_id, function_name, lens: String::from(".$.args.[0]"), }; let expected_tetraplets = vec![vec![first_arg_tetraplet], vec![second_arg_tetraplet]]; let expected_tetraplets = Rc::new(RefCell::new(expected_tetraplets)); checked_call_vm!(client_vm, <_>::default(), script, "", result.data); assert_eq!(arg_tetraplets, expected_tetraplets); } use fluence_app_service::AppServiceConfig; use fluence_app_service::MarineConfig; use fluence_app_service::ModuleDescriptor; use fluence_app_service::{AppService, MarineModuleConfig}; use air_test_utils::trace_from_result; use std::path::PathBuf; fn construct_service_config(module_name: impl Into) -> AppServiceConfig { let module_name = module_name.into(); let module_path = format!("./tests/security_tetraplets/{module_name}/target/wasm32-wasi/debug/"); let module_descriptor = ModuleDescriptor { load_from: None, file_name: module_name.clone() + ".wasm", import_name: module_name, config: MarineModuleConfig { logger_enabled: Default::default(), host_imports: Default::default(), wasi: Default::default(), logging_mask: Default::default(), }, }; let marine_config = MarineConfig { modules_dir: Some(PathBuf::from(module_path)), total_memory_limit: None, modules_config: vec![module_descriptor], default_modules_config: None, }; let service_working_dir = std::env::temp_dir(); AppServiceConfig { service_working_dir, marine_config, } } #[tokio::test] #[ignore] async fn tetraplet_with_wasm_modules() { use marine_rs_sdk::CallParameters; use marine_rs_sdk::SecurityTetraplet as SDKTetraplet; let auth_module_name = String::from("auth_module"); let auth_service_config = construct_service_config(auth_module_name.clone()); let auth_service = AppService::new(auth_service_config, auth_module_name, <_>::default()) .await .unwrap(); let log_module_name = String::from("log_storage"); let log_service_config = construct_service_config(log_module_name.clone()); let log_service = AppService::new(log_service_config, log_module_name, <_>::default()) .await .unwrap(); let services = maplit::hashmap!( "auth" => auth_service, "log_storage" => log_service, ); let services = Rc::new(RefCell::new(services)); const ADMIN_PEER_PK: &str = "12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE1"; let host_func: CallServiceClosure = Box::new(move |params| { let services_inner = services.clone(); let tetraplets = serde_json::to_vec(¶ms.tetraplets).expect("default serializer shouldn't fail"); let tetraplets: Vec> = serde_json::from_slice(&tetraplets).expect("default deserializer shouldn't fail"); let mut call_parameters = CallParameters::default(); call_parameters.particle.init_peer_id = ADMIN_PEER_PK.to_string(); call_parameters.tetraplets = tetraplets; async move { let mut service = services_inner.borrow_mut(); let service = service.get_mut(params.service_id.as_str()).unwrap(); let result = service .call_async( params.function_name, json!(params.arguments), to_app_service_call_parameters(call_parameters), ) .await .unwrap(); CallServiceResult::ok(result) } .boxed_local() }); let local_peer_id = "local_peer_id"; let script = format!( r#" (seq (call "{local_peer_id}" ("auth" "is_authorized") [] auth_result) (call "{local_peer_id}" ("log_storage" "delete") [auth_result.$.is_authorized "1"]) ) "# ); let mut vm = create_avm(host_func, local_peer_id).await; let test_params = TestRunParameters::from_init_peer_id(ADMIN_PEER_PK); let result = checked_call_vm!(vm, test_params, script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = scalar!("Ok"); assert_eq!(actual_trace[1.into()], expected_state) } fn to_app_service_call_parameters( call_parameters: marine_rs_sdk::CallParameters, ) -> fluence_app_service::CallParameters { fluence_app_service::CallParameters { particle: to_app_service_particle_parameters(call_parameters.particle), service_id: call_parameters.service_id, service_creator_peer_id: call_parameters.service_creator_peer_id, host_id: call_parameters.host_id, worker_id: call_parameters.worker_id, tetraplets: call_parameters .tetraplets .into_iter() .map(to_app_service_tetraplets) .collect(), } } fn to_app_service_particle_parameters( particle: marine_rs_sdk::ParticleParameters, ) -> fluence_app_service::ParticleParameters { fluence_app_service::ParticleParameters { id: particle.id, init_peer_id: particle.init_peer_id, timestamp: particle.timestamp, ttl: particle.ttl, script: particle.script, signature: particle.signature, token: particle.token, } } fn to_app_service_tetraplets( tetraplets: Vec, ) -> Vec { tetraplets.into_iter().map(to_app_service_tetraplet).collect() } fn to_app_service_tetraplet(tetraplet: marine_rs_sdk::SecurityTetraplet) -> fluence_app_service::SecurityTetraplet { fluence_app_service::SecurityTetraplet { peer_pk: tetraplet.peer_pk, service_id: tetraplet.service_id, function_name: tetraplet.function_name, lens: tetraplet.lens, } }