/* * AquaVM Workflow Engine * * Copyright (C) 2024 Fluence DAO * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation version 3 of the * License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use air::UncatchableError::StreamSizeLimitExceeded; use air_interpreter_data::ExecutionTrace; use air_test_framework::AirScriptExecutor; use air_test_utils::prelude::*; use futures::FutureExt; use pretty_assertions::assert_eq; #[tokio::test] async fn recursive_stream_with_early_exit() { let vm_peer_id = "vm_peer_id"; let variable_mappings = maplit::hashmap! { "stream_value".to_string() => json!(1), "stop".to_string() => json!("stop"), }; let mut vm = create_avm( set_variables_call_service(variable_mappings, VariableOptionSource::FunctionName), vm_peer_id, ) .await; let script = format!( r#" (seq (seq (call "{vm_peer_id}" ("" "stream_value") [] $stream) (call "{vm_peer_id}" ("" "stream_value") [] $stream) ) (fold $stream iterator (seq (call "{vm_peer_id}" ("" "stop") [] value) (xor (match value "stop" (null) ) (seq (ap value $stream) (next iterator) ) ) ) ) )"# ); let result = checked_call_vm!(vm, <_>::default(), script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = vec![ stream!(1, 0, peer = vm_peer_id, function = "stream_value"), stream!(1, 1, peer = vm_peer_id, function = "stream_value"), executed_state::fold(vec![ executed_state::subtrace_lore(0, subtrace_desc(3, 1), subtrace_desc(4, 0)), executed_state::subtrace_lore(1, subtrace_desc(4, 1), subtrace_desc(5, 0)), ]), scalar!("stop", peer = vm_peer_id, function = "stop"), scalar!("stop", peer = vm_peer_id, function = "stop"), ]; assert_eq!(actual_trace, expected_state); } #[tokio::test] async fn recursive_stream_many_iterations() { let vm_peer_id_1 = "vm_peer_id_1"; let request_id = std::cell::Cell::new(0); let stop_request_id = 10; let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| { let uncelled_request_id = request_id.get(); let result = if uncelled_request_id >= stop_request_id { CallServiceResult::ok(json!("stop")) } else { CallServiceResult::ok(json!("non_stop")) }; request_id.set(uncelled_request_id + 1); async move { result }.boxed_local() }); let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1).await; let vm_peer_id_2 = "vm_peer_id_2"; let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2).await; let result_value = "result_value"; let script = format!( r#" (seq (seq (seq (call "{vm_peer_id_1}" ("" "stream_value") [] $stream) (call "{vm_peer_id_1}" ("" "stream_value") [] $stream) ) (fold $stream iterator (seq (call "{vm_peer_id_1}" ("" "stop") [] value) (xor (match value "stop" (null) ) (seq (ap value $stream) (next iterator) ) ) ) ) ) (call "{vm_peer_id_2}" ("" "") ["{result_value}"]) )"# ); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let actual_fold = &actual_trace[2.into()]; let expected_fold_v1 = executed_state::fold(vec![ executed_state::subtrace_lore(0, subtrace_desc(3, 2), subtrace_desc(5, 0)), executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)), executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)), executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)), executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(13, 0)), executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)), executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(17, 0)), executed_state::subtrace_lore(14, subtrace_desc(17, 2), subtrace_desc(19, 0)), executed_state::subtrace_lore(16, subtrace_desc(19, 1), subtrace_desc(20, 0)), executed_state::subtrace_lore(18, subtrace_desc(20, 1), subtrace_desc(21, 0)), ]); let expected_fold_v2 = executed_state::fold(vec![ executed_state::subtrace_lore(0, subtrace_desc(3, 2), subtrace_desc(5, 0)), executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)), executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)), executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)), executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(13, 0)), executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)), executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(17, 0)), executed_state::subtrace_lore(14, subtrace_desc(17, 1), subtrace_desc(18, 0)), executed_state::subtrace_lore(16, subtrace_desc(18, 2), subtrace_desc(20, 0)), executed_state::subtrace_lore(19, subtrace_desc(20, 1), subtrace_desc(21, 0)), ]); let test_passed = (actual_fold == &expected_fold_v1) || (actual_fold == &expected_fold_v2); if !test_passed { print_trace(&result, ""); } assert!(test_passed); let actual_last_state = actual_trace.last().unwrap(); let expected_last_state = executed_state::request_sent_by(vm_peer_id_1); assert_eq!(actual_last_state, &expected_last_state); let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data); let actual_trace = trace_from_result(&result); let actual_last_state = actual_trace.last().unwrap(); let expected_last_state = unused!(result_value, peer = vm_peer_id_2, args = [result_value]); assert_eq!(actual_last_state, &expected_last_state); } #[tokio::test] async fn recursive_stream_join() { let vm_peer_id_1 = "vm_peer_id_1"; let request_id = std::cell::Cell::new(0); let stop_request_id = 5; let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| { let uncelled_request_id = request_id.get(); let result = if uncelled_request_id >= stop_request_id { CallServiceResult::ok(json!("join")) } else { CallServiceResult::ok(json!("non_join")) }; request_id.set(uncelled_request_id + 1); async move { result }.boxed_local() }); let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1).await; let vm_peer_id_2 = "vm_peer_id_2"; let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2).await; let vm_peer_id_3 = "vm_peer_id_3"; let mut vm_3 = create_avm(echo_call_service(), vm_peer_id_3).await; let result_value = "result_value"; let script = format!( r#" (seq (seq (par (call "{vm_peer_id_1}" ("" "stream_value") [] $stream) (call "{vm_peer_id_3}" ("" "stream_value") [""] join_variable) ) (fold $stream iterator (seq (call "{vm_peer_id_1}" ("" "") [""] value) (xor (match value "join" (call "{vm_peer_id_2}" ("" "") [join_variable]) ) (seq (ap value $stream) (next iterator) ) ) ) ) ) (call "{vm_peer_id_2}" ("" "") ["{result_value}"]) )"# ); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let result = checked_call_vm!(vm_3, <_>::default(), &script, "", result.data); let result = checked_call_vm!(vm_2, <_>::default(), &script, "", result.data); let actual_trace = trace_from_result(&result); let expected_trace = ExecutionTrace::from(vec![ executed_state::par(1, 1), stream!("non_join", 0, peer = vm_peer_id_1, function = "stream_value"), scalar!("", peer = vm_peer_id_3, function = "stream_value", args = [""]), executed_state::fold(vec![ executed_state::subtrace_lore(1, subtrace_desc(4, 2), subtrace_desc(6, 0)), executed_state::subtrace_lore(5, subtrace_desc(6, 2), subtrace_desc(8, 0)), executed_state::subtrace_lore(7, subtrace_desc(8, 2), subtrace_desc(10, 0)), executed_state::subtrace_lore(9, subtrace_desc(10, 2), subtrace_desc(12, 0)), executed_state::subtrace_lore(11, subtrace_desc(12, 2), subtrace_desc(14, 0)), ]), scalar!("non_join", peer = vm_peer_id_1, args = [""]), executed_state::ap(1), scalar!("non_join", peer = vm_peer_id_1, args = [""]), executed_state::ap(2), scalar!("non_join", peer = vm_peer_id_1, args = [""]), executed_state::ap(3), scalar!("non_join", peer = vm_peer_id_1, args = [""]), executed_state::ap(4), scalar!("join", peer = vm_peer_id_1, args = [""]), unused!("", peer = vm_peer_id_2, args = [""]), unused!(result_value, peer = vm_peer_id_2, args = [result_value]), ]); assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn recursive_stream_error_handling() { let vm_peer_id_1 = "vm_peer_id_1"; let request_id = std::cell::Cell::new(0); let stop_request_id = 5; let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| { let uncelled_request_id = request_id.get(); let result = if uncelled_request_id >= stop_request_id { CallServiceResult::err(1, json!("error")) } else { CallServiceResult::ok(json!("non_stop")) }; request_id.set(uncelled_request_id + 1); async move { result }.boxed_local() }); let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1).await; let result_value = "result_value"; let vm_peer_id_2 = "vm_peer_id_2"; let script = format!( r#" (xor (seq (seq (call "{vm_peer_id_1}" ("" "stream_value") [] $stream) (call "{vm_peer_id_1}" ("" "stream_value") [] $stream)) (fold $stream iterator (seq (call "{vm_peer_id_1}" ("" "stop") [] value) (xor (match value "stop" (null)) (seq (ap value $stream) (next iterator) ) ) ) ) ) (call "{vm_peer_id_2}" ("" "") ["{result_value}"])) "# ); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let expected_trace = vec![ stream!("non_stop", 0, peer = vm_peer_id_1, function = "stream_value"), stream!("non_stop", 1, peer = vm_peer_id_1, function = "stream_value"), executed_state::fold(vec![ subtrace_lore(0, SubTraceDesc::new(3.into(), 2), SubTraceDesc::new(5.into(), 0)), subtrace_lore(1, SubTraceDesc::new(5.into(), 2), SubTraceDesc::new(7.into(), 0)), subtrace_lore(4, SubTraceDesc::new(7.into(), 2), SubTraceDesc::new(10.into(), 0)), subtrace_lore(6, SubTraceDesc::new(9.into(), 1), SubTraceDesc::new(10.into(), 0)), subtrace_lore(8, SubTraceDesc::new(10.into(), 1), SubTraceDesc::new(11.into(), 0)), ]), scalar!("non_stop", peer = vm_peer_id_1, function = "stop"), executed_state::ap(2), scalar!("non_stop", peer = vm_peer_id_1, function = "stop"), executed_state::ap(2), scalar!("non_stop", peer = vm_peer_id_1, function = "stop"), executed_state::ap(3), failed!(1, "error", peer = vm_peer_id_1, function = "stop"), failed!(1, "error", peer = vm_peer_id_1, function = "stop"), ]; assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn recursive_stream_inner_fold() { let vm_peer_id_1 = "vm_peer_id_1"; let request_id = std::cell::Cell::new(0); let stop_request_id = 10; let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| { let uncelled_request_id = request_id.get(); let result = if uncelled_request_id >= stop_request_id { CallServiceResult::ok(json!("stop")) } else { CallServiceResult::ok(json!("non_stop")) }; request_id.set(uncelled_request_id + 1); async move { result }.boxed_local() }); let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1).await; let vm_peer_id_2 = "vm_peer_id_2"; let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2).await; let result_value = "result_value"; let script = format!( r#" (seq (seq (seq (call "{vm_peer_id_1}" ("" "stream_value") [] $stream_1) (call "{vm_peer_id_1}" ("" "stream_value") [] $stream_2)) (fold $stream_1 iterator_1 (seq (call "{vm_peer_id_1}" ("" "stop") [] value) (xor (match value "stop" (null)) (seq (seq (ap value $stream_1) (fold $stream_2 iterator_2 (seq (call "{vm_peer_id_1}" ("" "stop") [] value) (xor (match value "stop" (null)) (seq (ap value $stream_2) (next iterator_2)))))) (next iterator_1)))))) (call "{vm_peer_id_2}" ("" "") ["{result_value}"])) "# ); let result = call_vm!(vm_1, <_>::default(), &script, "", ""); let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data); let actual_trace = trace_from_result(&result); let actual_last_state = actual_trace.last().unwrap(); let expected_last_state = unused!(result_value, peer = vm_peer_id_2, args = [result_value]); assert_eq!(actual_last_state, &expected_last_state); } #[tokio::test] async fn recursive_stream_fold_with_n_service_call() { let vm_peer_id = "vm_peer_id_1"; let request_id = std::cell::Cell::new(0); let stop_request_id = 10; let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| { let uncelled_request_id = request_id.get(); let result = if uncelled_request_id >= stop_request_id { CallServiceResult::ok(json!("no")) } else { CallServiceResult::ok(json!("yes")) }; request_id.set(uncelled_request_id + 1); async move { result }.boxed_local() }); let mut vm = create_avm(give_n_results_and_then_stop, vm_peer_id).await; let script = format!( r#" (xor (seq (seq (call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-) (new $loop (new $result (seq (seq (ap "yes" $loop) (fold $loop l (seq (seq (xor (match l "yes" (xor (call %init_peer_id% ("yesno" "get") [] $loop) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1]) ) ) (null) ) (ap "success" $result) ) (next l) ) ) ) (seq (canon %init_peer_id% $result #canon_stream) (call %init_peer_id% ("op" "identity") [#canon_stream] result-fix) ) ) ) ) ) (xor (call %init_peer_id% ("callbackSrv" "response") [result-fix]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) ) ) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) ) "# ); let test_params = TestRunParameters::from_init_peer_id(vm_peer_id); let result = checked_call_vm!(vm, test_params, &script, "", ""); let actual_trace = trace_from_result(&result); let actual_fold_state = match &actual_trace[2.into()] { ExecutedState::Fold(fold_result) => fold_result, _ => panic!("2nd state should be fold"), }; let expected_fold_lores = stop_request_id + 1; assert_eq!(actual_fold_state.lore.len(), expected_fold_lores); } #[tokio::test] async fn recursive_stream_size_limit() { let vm_peer_id_1 = "vm_peer_id_1"; let script = format!( r#" (seq (ap 42 $stream) (fold $stream i (seq (ap i $stream) (next i) ) ) )"# ); let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(vm_peer_id_1), &script) .await .expect("invalid test AIR script"); let result = executor.execute_all(vm_peer_id_1).await.unwrap(); let result = result.last().unwrap(); let expected_error = StreamSizeLimitExceeded; assert!(check_error(&result, expected_error)); }