/* * AquaVM Workflow Engine * * Copyright (C) 2024 Fluence DAO * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation version 3 of the * License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use air::ExecutionCidState; use air::PreparationError; use air::ToErrorCode; use air_interpreter_data::ExecutionTrace; use air_test_framework::AirScriptExecutor; use air_test_utils::key_utils::at; use air_test_utils::prelude::*; use futures::FutureExt; use pretty_assertions::assert_eq; use std::cell::RefCell; use std::rc::Rc; #[tokio::test] async fn lfold() { let mut vm = create_avm(echo_call_service(), "A").await; let mut set_variable_vm = create_avm( set_variable_call_service(json!(["1", "2", "3", "4", "5"])), "set_variable", ) .await; let lfold = r#" (seq (call "set_variable" ("" "") [] Iterable) (fold Iterable i (seq (call "A" ("" "") [i] $acc) (next i) ) ) )"#; let result = checked_call_vm!(set_variable_vm, <_>::default(), lfold, "", ""); let result = checked_call_vm!(vm, <_>::default(), lfold, "", result.data); let actual_trace = trace_from_result(&result); let expected_state = scalar!(json!(["1", "2", "3", "4", "5"]), peer = "set_variable"); assert_eq!(actual_trace.len(), 6); assert_eq!(actual_trace[0.into()], expected_state); for i in 1..=5 { let val = format!("{i}"); let expected_state = stream!(val.as_str(), i as u32 - 1, peer = "A", args = [val.as_str()]); assert_eq!(actual_trace[i.into()], expected_state); } } #[tokio::test] async fn rfold() { let mut vm = create_avm(echo_call_service(), "A").await; let mut set_variable_vm = create_avm( set_variable_call_service(json!(["1", "2", "3", "4", "5"])), "set_variable", ) .await; let rfold = r#" (seq (call "set_variable" ("" "") [] Iterable) (fold Iterable i (seq (next i) (call "A" ("" "") [i] $acc) ) ) )"#; let result = checked_call_vm!(set_variable_vm, <_>::default(), rfold, "", ""); let result = checked_call_vm!(vm, <_>::default(), rfold, "", result.data); let actual_trace = trace_from_result(&result); assert_eq!(actual_trace.len(), 6); let expected_state = scalar!(json!(["1", "2", "3", "4", "5"]), peer = "set_variable"); assert_eq!(actual_trace[0.into()], expected_state); for i in 1..=5 { let val = format!("{}", 6 - i); let expected_state = stream!(val.as_str(), i as u32 - 1, peer = "A", args = [val.as_str()]); assert_eq!(actual_trace[i.into()], expected_state); } } #[tokio::test] async fn inner_fold() { let mut vm = create_avm(echo_call_service(), "A").await; let mut set_variable_vm = create_avm( set_variable_call_service(json!(["1", "2", "3", "4", "5"])), "set_variable", ) .await; let script = r#" (seq (seq (call "set_variable" ("" "") [] Iterable1) (call "set_variable" ("" "") [] Iterable2) ) (fold Iterable1 i (seq (fold Iterable2 j (seq (call "A" ("" "") [i] $acc) (next j) ) ) (next i) ) ) )"#; let result = checked_call_vm!(set_variable_vm, <_>::default(), script, "", ""); let result = checked_call_vm!(vm, <_>::default(), script, "", result.data); let actual_trace = trace_from_result(&result); assert_eq!(actual_trace.len(), 27); let expected_state = scalar!(json!(["1", "2", "3", "4", "5"]), peer = "set_variable"); assert_eq!(actual_trace[0.into()], expected_state); assert_eq!(actual_trace[1.into()], expected_state); for i in 1..=5 { for j in 1..=5 { let state_id = 1 + 5 * (i - 1) + j; let val = i.to_string(); let expected_state = stream!(val.as_str(), state_id as u32 - 2, peer = "A", args = [val]); assert_eq!(actual_trace[state_id.into()], expected_state); } } } #[tokio::test] async fn inner_fold_with_same_iterator() { let mut vm = create_avm( set_variable_call_service(json!(["1", "2", "3", "4", "5"])), "set_variable", ) .await; let script = r#" (seq (seq (call "set_variable" ("" "") [] Iterable1) (call "set_variable" ("" "") [] Iterable2) ) (fold Iterable1 i (seq (fold Iterable2 i (seq (call "A" ("" "") [i] $acc) (next i) ) ) (next i) ) ) )"#; let result = call_vm!(vm, <_>::default(), script, "", ""); let expected_error = PreparationError::AIRParseError("".to_string()); assert_eq!(result.ret_code, expected_error.to_error_code()); } #[tokio::test] async fn empty_iterable_fold() { let mut vm = create_avm(echo_call_service(), "A").await; let mut set_variable_vm = create_avm(set_variable_call_service(json!([])), "set_variable").await; let empty_fold = r#" (seq (call "set_variable" ("" "") [] Iterable) (fold Iterable i (seq (call "A" ("" "") [i] $acc) (next i) ) ) )"#; let result = checked_call_vm!(set_variable_vm, <_>::default(), empty_fold, "", ""); let result = checked_call_vm!(vm, <_>::default(), empty_fold, "", result.data); let actual_trace = trace_from_result(&result); let expected_state = scalar!(json!([]), peer = "set_variable"); assert_eq!(actual_trace.len(), 1); assert_eq!(actual_trace[0.into()], expected_state); } #[tokio::test] async fn empty_literal_array_fold() { let mut vm = create_avm(echo_call_service(), "A").await; let empty_fold = r#" (fold [] i (seq (call "A" ("" "") [i] $acc) (next i) ) )"#; let result = checked_call_vm!(vm, <_>::default(), empty_fold, "", ""); let actual_trace = trace_from_result(&result); assert!(actual_trace.is_empty()); } #[tokio::test] async fn empty_fold_lens() { let mut vm = create_avm(echo_call_service(), "A").await; let mut set_variable_vm = create_avm(set_variable_call_service(json!({ "messages": [] })), "set_variable").await; let empty_fold = r#" (seq (call "set_variable" ("" "") [] messages) (fold messages.$.messages! i (seq (call "A" ("" "") [i] $acc) (next i) ) ) )"#; let result = checked_call_vm!(set_variable_vm, <_>::default(), empty_fold, "", ""); let result = checked_call_vm!(vm, <_>::default(), empty_fold, "", result.data); let actual_trace = trace_from_result(&result); let expected_trace = vec![scalar!(json!({ "messages": [] }), peer = "set_variable")]; assert_eq!(actual_trace, expected_trace); } // Check that fold works with the join behaviour without hanging up. #[tokio::test] async fn fold_with_join() { let mut vm = create_avm(echo_call_service(), "A").await; let mut set_variable_vm = create_avm(set_variable_call_service(json!(["1", "2"])), "set_variable").await; let fold_with_join = r#" (seq (call "set_variable" ("" "") [] iterable) (par (call "unknown_peer" ("" "") [] lazy_def_variable) (fold iterable i (seq (call "A" ("" "") [lazy_def_variable.$.hash!] $acc) (next i) ) ) ) )"#; let result = checked_call_vm!(set_variable_vm, <_>::default(), fold_with_join, "", ""); let result = checked_call_vm!(vm, <_>::default(), fold_with_join, "", result.data); let actual_trace = trace_from_result(&result); assert_eq!(actual_trace.len(), 4); } #[tokio::test] async fn lambda() { let mut vm = create_avm(echo_call_service(), "A").await; let mut set_variable_vm = create_avm( set_variable_call_service(json!({ "array": ["1","2","3","4","5"] })), "set_variable", ) .await; let script = r#" (seq (call "set_variable" ("" "") [] iterable) (fold iterable.$.array! i (seq (call "A" ("" "") [i] $acc) (next i) ) ) )"#; let result = checked_call_vm!(set_variable_vm, <_>::default(), script, "", ""); let result = checked_call_vm!(vm, <_>::default(), script, "", result.data); let actual_trace = trace_from_result(&result); let expected_state = scalar!(json!({ "array": ["1", "2", "3", "4", "5"] }), peer = "set_variable"); assert_eq!(actual_trace.len(), 6); assert_eq!(actual_trace[0.into()], expected_state); for i in 1..=5 { let val = format!("{i}"); let expected_state = stream!(val.as_str(), i as u32 - 1, peer = "A", args = [val]); assert_eq!(actual_trace[i.into()], expected_state); } } #[tokio::test] async fn shadowing() { use executed_state::*; let mut set_variables_vm = create_avm(set_variable_call_service(json!(["1", "2"])), "set_variable").await; let mut vm_a = create_avm(echo_call_service(), "A").await; let mut vm_b = create_avm(echo_call_service(), "B").await; let script = r#" (seq (seq (call "set_variable" ("" "") [] iterable1) (call "set_variable" ("" "") [] iterable2) ) (fold iterable1 i (seq (seq (fold iterable2 j (seq (seq (call "A" ("" "") [i] local_j) (call "B" ("" "") [local_j]) ) (next j) ) ) (par (call "A" ("" "") [i] local_i) (call "B" ("" "") [i]) ) ) (next i) ) ) )"#; let result = checked_call_vm!(set_variables_vm, <_>::default(), script, "", ""); let result = checked_call_vm!(vm_a, <_>::default(), script, "", result.data); let result = checked_call_vm!(vm_b, <_>::default(), script, "", result.data); let result = checked_call_vm!(vm_a, <_>::default(), script, "", result.data); let result = checked_call_vm!(vm_b, <_>::default(), script, "", result.data); let result = checked_call_vm!(vm_a, <_>::default(), script, "", result.data); let result = checked_call_vm!(vm_b, <_>::default(), script, "", result.data); let actual_trace = trace_from_result(&result); let expected_trace = ExecutionTrace::from(vec![ scalar!(json!(["1", "2"]), peer = "set_variable"), scalar!(json!(["1", "2"]), peer = "set_variable"), scalar!("1", peer = "A", args = ["1"]), unused!("1", peer = "B", args = ["1"]), scalar!("1", peer = "A", args = ["1"]), unused!("1", peer = "B", args = ["1"]), par(1, 1), scalar!("1", peer = "A", args = ["1"]), unused!("1", peer = "B", args = ["1"]), scalar!("2", peer = "A", args = ["2"]), unused!("2", peer = "B", args = ["2"]), request_sent_by("B"), ]); assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn shadowing_scope() { use executed_state::*; async fn execute_script(script: String) -> Result { let mut set_variables_vm = create_avm(set_variable_call_service(json!(["1", "2"])), "set_variable").await; let mut vm_a = create_avm(echo_call_service(), "A").await; let mut vm_b = create_avm(echo_call_service(), "B").await; let result = checked_call_vm!(set_variables_vm, <_>::default(), script.clone(), "", ""); let result = checked_call_vm!(vm_a, <_>::default(), script.clone(), "", result.data); let result = checked_call_vm!(vm_b, <_>::default(), script.clone(), "", result.data); let result = checked_call_vm!(vm_a, <_>::default(), script.clone(), "", result.data); let result = checked_call_vm!(vm_b, <_>::default(), script.clone(), "", result.data); vm_a.call(script, "", result.data, <_>::default()).await } let variable_shadowing_script = r#" (seq (seq (call "set_variable" ("" "") [] iterable1) (call "set_variable" ("" "") [] iterable2) ) (fold iterable1 i (seq (seq (call "A" ("" "") ["value"] local_j) (seq (fold iterable2 j (seq (seq (call "A" ("" "") [i] local_j) (call "B" ("" "") [local_j]) ) (next j) ) ) (call "A" ("" "") [local_j]) ) ) (next i) ) ) )"#; let result = execute_script(String::from(variable_shadowing_script)).await.unwrap(); let actual_trace = trace_from_result(&result); let expected_trace = ExecutionTrace::from(vec![ scalar!(vec!["1", "2"], peer = "set_variable"), scalar!(vec!["1", "2"], peer = "set_variable"), scalar!("value", peer = "A", args = ["value"]), scalar!("1", peer = "A", args = ["1"]), unused!("1", peer = "B", args = ["1"]), scalar!("1", peer = "A", args = ["1"]), unused!("1", peer = "B", args = ["1"]), unused!("value", peer = "A", args = ["value"]), scalar!("value", peer = "A", args = ["value"]), scalar!("2", peer = "A", args = ["2"]), request_sent_by("A"), ]); assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_waits_on_empty_stream() { let vm_peer_id = "vm_peer_id"; let mut vm = create_avm(echo_call_service(), vm_peer_id).await; let script = format!( r#" (par (call "" ("" "") [] $stream) (fold $stream iterator (seq (call "{vm_peer_id}" ("" "") [iterator] $new_stream) (next iterator)))) "# ); let result = checked_call_vm!(vm, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let expected_trace = vec![executed_state::par(1, 0), executed_state::request_sent_by(vm_peer_id)]; assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_stream_seq_next_never_completes() { let vm_peer_id = "vm_peer_id"; let mut vm = create_avm(set_variable_call_service(json!(1)), vm_peer_id).await; let script = format!( r#" (seq (call "{vm_peer_id}" ("" "") [] $stream) (seq (fold $stream iterator (seq (call "{vm_peer_id}" ("" "") [iterator] $new_stream) (next iterator))) (call "{vm_peer_id}" ("" "") []))) "# ); let result = checked_call_vm!(vm, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let expected_trace = vec![ stream!(1, 0, peer = vm_peer_id), executed_state::fold(vec![subtrace_lore( 0, SubTraceDesc::new(2.into(), 1), SubTraceDesc::new(3.into(), 0), )]), stream!(1, 0, peer = vm_peer_id, args = [1]), ]; assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_stream_seq_next_never_completes_with_never() { let vm_peer_id = "vm_peer_id"; let mut vm = create_avm(set_variable_call_service(json!(1)), vm_peer_id).await; let script = format!( r#" (seq (call "{vm_peer_id}" ("" "") [] $stream) (seq (fold $stream iterator (seq (call "{vm_peer_id}" ("" "") [iterator] $new_stream) (next iterator) ) (never) ) (call "{vm_peer_id}" ("" "") []) ) ) "# ); let result = checked_call_vm!(vm, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let expected_trace = vec![ stream!(1, 0, peer = vm_peer_id), executed_state::fold(vec![subtrace_lore( 0, SubTraceDesc::new(2.into(), 1), SubTraceDesc::new(3.into(), 0), )]), stream!(1, 0, peer = vm_peer_id, args = [1]), ]; assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_stream_seq_next_completes_with_null() { let vm_peer_id = "vm_peer_id"; let mut vm = create_avm(set_variable_call_service(json!(1)), vm_peer_id).await; let script = format!( r#" (seq (call "{vm_peer_id}" ("" "") [] $stream) (seq (fold $stream iterator (seq (call "{vm_peer_id}" ("" "") [iterator] $new_stream) (next iterator) ) (null) ) (call "{vm_peer_id}" ("" "") []) ) ) "# ); let result = checked_call_vm!(vm, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let expected_trace = vec![ stream!(1, 0, peer = vm_peer_id), executed_state::fold(vec![subtrace_lore( 0, SubTraceDesc::new(2.into(), 1), SubTraceDesc::new(3.into(), 0), )]), stream!(1, 0, peer = vm_peer_id, args = [1]), unused!(1, peer = vm_peer_id), ]; assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_scalar_seq_next_completes_with_null() { let vm_peer_id = "vm_peer_id"; let service_result = json!([1, 2]); let mut vm = create_avm(set_variable_call_service(service_result.clone()), vm_peer_id).await; let script = format!( r#" (seq (call "{vm_peer_id}" ("" "") [] iterable) (seq (fold iterable iterator (par (call "{vm_peer_id}" ("" "") [iterator] $new_stream) (next iterator) ) (null) ) (seq (canon "{vm_peer_id}" $new_stream #canon_stream) (call "{vm_peer_id}" ("" "") [#canon_stream]) ) ) ) "# ); let result = checked_call_vm!(vm, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let expected_trace = ExecutionTrace::from(vec![ scalar!(service_result.clone(), peer = vm_peer_id), executed_state::par(1, 2), stream!(service_result.clone(), 0, peer = vm_peer_id, args = [1]), executed_state::par(1, 0), stream!(service_result.clone(), 0, peer = vm_peer_id, args = [2]), executed_state::canon( json!({"tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_peer_id", "service_id": ""}, "values": []}), ), unused!(service_result, peer = vm_peer_id, args = [json!([])]), ]); assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_scalar_seq_next_not_completes_with_never() { let vm_peer_id = "vm_peer_id"; let service_result = json!([1, 2]); let mut vm = create_avm(set_variable_call_service(service_result.clone()), vm_peer_id).await; let script = format!( r#" (seq (call "{vm_peer_id}" ("" "") [] iterable) (seq (fold iterable iterator (par (call "unknwon_peer_id" ("" "") [iterator] $new_stream) (next iterator) ) (never) ) (seq (canon "{vm_peer_id}" $new_stream #canon_stream) (call "{vm_peer_id}" ("" "") [#canon_stream]) ) ) ) "# ); let result = checked_call_vm!(vm, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let expected_trace = vec![ scalar!(service_result, peer = vm_peer_id), executed_state::par(1, 2), executed_state::request_sent_by(vm_peer_id), executed_state::par(1, 0), executed_state::request_sent_by(vm_peer_id), ]; assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_stream_seq_next_saves_call_result() { let vm_peer_id = "vm_peer_id"; let mut vm = create_avm(echo_call_service(), vm_peer_id).await; let script = format!( r#" (seq (seq (ap 1 $stream) (ap 2 $stream) ) (seq (fold $stream iterator (seq (call "{vm_peer_id}" ("" "") [iterator] $new_stream) (next iterator) ) (call "{vm_peer_id}" ("" "") [iterator] $new_stream) ) (call "{vm_peer_id}" ("" "") [0]) ) ) "# ); let result = checked_call_vm!(vm, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let expected_trace = vec![ executed_state::ap(0), executed_state::ap(0), executed_state::fold(vec![ subtrace_lore(0, SubTraceDesc::new(3.into(), 1), SubTraceDesc::new(6.into(), 0)), subtrace_lore(1, SubTraceDesc::new(4.into(), 1), SubTraceDesc::new(5.into(), 1)), ]), stream!(1, 0, peer = vm_peer_id, args = [1]), stream!(2, 1, peer = vm_peer_id, args = [2]), stream!(2, 2, peer = vm_peer_id, args = [2]), unused!(0, peer = vm_peer_id, args = [0]), ]; assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_par_next_completes() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(set_variable_call_service(json!(1)), vm_1_peer_id).await; let vm_2_peer_id = "vm_2_peer_id"; let mut vm_2 = create_avm(set_variable_call_service(json!(1)), vm_2_peer_id).await; let vm_3_peer_id = "vm_3_peer_id"; let mut vm_3 = create_avm(set_variable_call_service(json!(1)), vm_3_peer_id).await; let vm_4_peer_id = "vm_4_peer_id"; let mut vm_4 = create_avm(set_variable_call_service(json!(1)), vm_4_peer_id).await; let script = format!( r#" (seq (seq (seq (ap "{vm_2_peer_id}" $stream) (ap "{vm_3_peer_id}" $stream)) (ap "{vm_4_peer_id}" $stream)) (seq (fold $stream peer_id (par (call peer_id ("" "") [] $new_stream) (next peer_id))) (call "{vm_1_peer_id}" ("" "") []) ; this call should be executed if any of these three peers is reached ) ) "# ); let result_1 = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let result_2 = checked_call_vm!(vm_2, <_>::default(), &script, "", result_1.data.clone()); let actual_trace = trace_from_result(&result_2); let expected_trace = vec![ executed_state::ap(0), executed_state::ap(0), executed_state::ap(0), executed_state::fold(vec![ subtrace_lore(0, SubTraceDesc::new(4.into(), 2), SubTraceDesc::new(10.into(), 0)), subtrace_lore(1, SubTraceDesc::new(6.into(), 2), SubTraceDesc::new(10.into(), 0)), subtrace_lore(2, SubTraceDesc::new(8.into(), 2), SubTraceDesc::new(10.into(), 0)), ]), executed_state::par(1, 4), stream!(1, 0, peer = vm_2_peer_id), executed_state::par(1, 2), executed_state::request_sent_by(vm_1_peer_id), executed_state::par(1, 0), executed_state::request_sent_by(vm_1_peer_id), executed_state::request_sent_by(vm_2_peer_id), ]; assert_eq!(actual_trace, expected_trace); let result_3 = checked_call_vm!(vm_3, <_>::default(), &script, "", result_1.data.clone()); let actual_trace = trace_from_result(&result_3); let expected_trace = vec![ executed_state::ap(0), executed_state::ap(0), executed_state::ap(0), executed_state::fold(vec![ subtrace_lore(0, SubTraceDesc::new(4.into(), 2), SubTraceDesc::new(10.into(), 0)), subtrace_lore(1, SubTraceDesc::new(6.into(), 2), SubTraceDesc::new(10.into(), 0)), subtrace_lore(2, SubTraceDesc::new(8.into(), 2), SubTraceDesc::new(10.into(), 0)), ]), executed_state::par(1, 4), executed_state::request_sent_by(vm_1_peer_id), executed_state::par(1, 2), stream!(1, 0, peer = vm_3_peer_id), executed_state::par(1, 0), executed_state::request_sent_by(vm_1_peer_id), executed_state::request_sent_by(vm_3_peer_id), ]; assert_eq!(actual_trace, expected_trace); let result_4 = checked_call_vm!(vm_4, <_>::default(), &script, "", result_1.data); let actual_trace = trace_from_result(&result_4); let expected_trace = vec![ executed_state::ap(0), executed_state::ap(0), executed_state::ap(0), executed_state::fold(vec![ subtrace_lore(0, SubTraceDesc::new(4.into(), 2), SubTraceDesc::new(10.into(), 0)), subtrace_lore(1, SubTraceDesc::new(6.into(), 2), SubTraceDesc::new(10.into(), 0)), subtrace_lore(2, SubTraceDesc::new(8.into(), 2), SubTraceDesc::new(10.into(), 0)), ]), executed_state::par(1, 4), executed_state::request_sent_by(vm_1_peer_id), executed_state::par(1, 2), executed_state::request_sent_by(vm_1_peer_id), executed_state::par(1, 0), stream!(1, 0, peer = vm_4_peer_id), executed_state::request_sent_by(vm_4_peer_id), ]; assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_stream_map() { let vm_1_peer_id = "vm_1_peer_id"; let k1 = 42; let k2 = "some"; let arg_tetraplets = Rc::new(RefCell::new(vec![])); let arg_tetraplets_inner = arg_tetraplets.clone(); let set_variable_call_service: CallServiceClosure = Box::new(move |params| { arg_tetraplets_inner.borrow_mut().push(params.tetraplets.clone()); let result = CallServiceResult::ok(json!({"keyo": k1, "keyu": k2})); async move { result }.boxed_local() }); let mut vm_1 = create_avm(set_variable_call_service, vm_1_peer_id).await; let script = format!( r#" (seq (seq (seq (seq (call "{vm_1_peer_id}" ("m1" "f1") [1] scalar) (call "{vm_1_peer_id}" ("m2" "f2") [1] $stream) ) (canon "{vm_1_peer_id}" $stream #canon) ) (seq (ap (#canon.$.[0].keyo #canon.$.[0].keyo) %map) (ap (scalar.$.keyu scalar.$.keyu) %map) ) ) (fold %map i (seq (seq (call "{vm_1_peer_id}" ("m3" "f3") [i.$.key] u) (call "{vm_1_peer_id}" ("m4" "f4") [i.$.value] un) ) (next i) ) ) ) "# ); let test_params = TestRunParameters::from_init_peer_id(vm_1_peer_id); let result = checked_call_vm!(vm_1, test_params, &script, "", ""); let actual_trace = trace_from_result(&result); let generation_idx = 0; let mut cid_tracker = ExecutionCidState::new(); let service_result = json!({"keyo": k1, "keyu": k2}); let stream_1 = stream_tracked!( service_result.clone(), 0, cid_tracker, peer = vm_1_peer_id, service = "m2", function = "f2", args = [1] ); let cid_1 = extract_service_result_cid(&stream_1); let expected_state = ExecutionTrace::from(vec![ scalar_tracked!( service_result.clone(), cid_tracker, peer = vm_1_peer_id, service = "m1", function = "f1", args = [1] ), stream_1, executed_state::canon_tracked( json!({ "tetraplet": {"function_name": "", "lens": "", "peer_pk": vm_1_peer_id, "service_id": ""}, "values": [{ "result": service_result.clone(), "tetraplet": {"function_name": "f2", "lens": "", "peer_pk": vm_1_peer_id, "service_id": "m2"}, "provenance": Provenance::service_result(cid_1), }] }), &mut cid_tracker, ), executed_state::ap(generation_idx), executed_state::ap(generation_idx), executed_state::fold(vec![ subtrace_lore(3, SubTraceDesc::new(6.into(), 2), SubTraceDesc::new(10.into(), 0)), subtrace_lore(4, SubTraceDesc::new(8.into(), 2), SubTraceDesc::new(10.into(), 0)), ]), scalar_tracked!( service_result.clone(), cid_tracker, peer = vm_1_peer_id, service = "m3", function = "f3", args = [k1] ), scalar_tracked!( service_result.clone(), cid_tracker, peer = vm_1_peer_id, service = "m4", function = "f4", args = [k1] ), scalar_tracked!( service_result.clone(), cid_tracker, peer = vm_1_peer_id, service = "m3", function = "f3", args = [k2] ), scalar_tracked!( service_result, cid_tracker, peer = vm_1_peer_id, service = "m4", function = "f4", args = [k2] ), ]); assert_eq!(actual_trace, expected_state); let expected_tetraplets = vec![ vec![vec![SecurityTetraplet::new(vm_1_peer_id, "m2", "f2", ".$.key")]], vec![vec![SecurityTetraplet::new(vm_1_peer_id, "m2", "f2", ".$.value")]], vec![vec![SecurityTetraplet::new(vm_1_peer_id, "m1", "f1", ".$.keyu.$.key")]], vec![vec![SecurityTetraplet::new( vm_1_peer_id, "m1", "f1", ".$.keyu.$.value", )]], ]; let tetraplates_len = arg_tetraplets.borrow().len(); assert_eq!(&arg_tetraplets.borrow()[tetraplates_len - 4..], &expected_tetraplets); } #[tokio::test] async fn fold_canon_stream_map() { let vm_1_peer_name = "vm_1_peer_id"; let vm_1_peer_id = at(vm_1_peer_name); let script = format!( r#" (seq (seq (seq (ap ("key" "value1") %map) (ap (-42 "value2") %map) ) (seq (ap (-42 "value3") %map) (ap ("key" "value4") %map) ) ) (seq (canon "{vm_1_peer_name}" %map #%canon_map) (fold #%canon_map iter (seq (call "{vm_1_peer_name}" ("m" "f") [iter] scalar) ; behaviour = echo (next iter) ) ) ) ) "# ); let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(vm_1_peer_name), &script) .await .expect("invalid test AIR script"); let result = executor.execute_all(vm_1_peer_name).await.unwrap(); let actual_trace = trace_from_result(&result.last().unwrap()); let mut cid_tracker: ExecutionCidState = ExecutionCidState::new(); let tetraplet = json!({"function_name": "", "lens": "", "peer_pk": vm_1_peer_id, "service_id": ""}); let map_value_1 = json!({"key": "key", "value": "value1"}); let map_value_2 = json!({"key": -42, "value": "value2"}); let map_value_3 = json!({"key": -42, "value": "value3"}); let map_value_4 = json!({"key": "key", "value": "value4"}); let expected_trace: Vec = vec![ executed_state::ap(0), executed_state::ap(0), executed_state::ap(0), executed_state::ap(0), canon_tracked( json!({"tetraplet": tetraplet, "values": [ { "result": map_value_1, "tetraplet": tetraplet, "provenance": Provenance::Literal, }, { "result": map_value_2, "tetraplet": tetraplet, "provenance": Provenance::Literal, }, { "result": map_value_3, "tetraplet": tetraplet, "provenance": Provenance::Literal, }, { "result": map_value_4, "tetraplet": tetraplet, "provenance": Provenance::Literal, }, ]}), &mut cid_tracker, ), scalar_tracked!( map_value_3.clone(), cid_tracker, peer = &vm_1_peer_id, service = "m..0", function = "f", args = [map_value_3] ), scalar_tracked!( map_value_4.clone(), cid_tracker, peer = vm_1_peer_id, service = "m..0", function = "f", args = [map_value_4] ), ]; assert_eq!(&*actual_trace, expected_trace,); }