/* * AquaVM Workflow Engine * * Copyright (C) 2024 Fluence DAO * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation version 3 of the * License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use air::ExecutionCidState; use air::UncatchableError; use air_interpreter_data::ExecutionTrace; use air_interpreter_data::ValueRef; use air_test_utils::prelude::*; use air_trace_handler::merger::CallResultError; use air_trace_handler::merger::MergeError; use air_trace_handler::TraceHandlerError; use pretty_assertions::assert_eq; use std::convert::TryInto; #[tokio::test] async fn par_early_exit() { let init_peer_id = "init_peer_id"; let setter_1_id = "setter_1"; let setter_2_id = "setter_2"; let setter_3_id = "setter_3"; let mut init = create_avm(unit_call_service(), init_peer_id).await; let mut setter_1 = create_avm(set_variable_call_service(json!("1")), setter_1_id).await; let mut setter_2 = create_avm(set_variable_call_service(json!("2")), setter_2_id).await; let mut setter_3 = create_avm(fallible_call_service("error"), setter_3_id).await; let script = format!( include_str!("scripts/par_early_exit.air"), init_peer_id, setter_1_id, setter_2_id, setter_3_id ); let init_result_1 = checked_call_vm!(init, <_>::default(), &script, "", ""); let setter_1_res = checked_call_vm!(setter_1, <_>::default(), &script, "", init_result_1.data.clone()); let setter_2_res = checked_call_vm!(setter_2, <_>::default(), &script, "", init_result_1.data.clone()); let setter_3_res_1 = checked_call_vm!(setter_3, <_>::default(), &script, "", init_result_1.data.clone()); let actual_trace_1 = trace_from_result(&setter_3_res_1); let expected_trace = ExecutionTrace::from(vec![ unused!("result from unit_call_service", peer = init_peer_id), executed_state::par(12, 1), executed_state::par(9, 1), executed_state::par(7, 1), executed_state::par(5, 1), executed_state::par(3, 1), executed_state::par(1, 1), executed_state::request_sent_by(init_peer_id), executed_state::request_sent_by(init_peer_id), executed_state::request_sent_by(init_peer_id), stream!("success result from fallible_call_service", 0, peer = setter_3_id), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), stream!("success result from fallible_call_service", 0, peer = setter_3_id), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), executed_state::request_sent_by(setter_3_id), ]); assert_eq!(actual_trace_1, expected_trace); let setter_3_res_2 = checked_call_vm!( setter_3, <_>::default(), &script, setter_3_res_1.data, setter_1_res.data ); let setter_3_res_3 = checked_call_vm!( setter_3, <_>::default(), &script, setter_3_res_2.data, setter_2_res.data ); let init_result_2 = checked_call_vm!( init, <_>::default(), &script, init_result_1.data, setter_3_res_3.data.clone() ); let actual_trace_2 = trace_from_result(&setter_3_res_3); let actual_trace_3 = trace_from_result(&init_result_2); let expected_trace = ExecutionTrace::from(vec![ unused!("result from unit_call_service", peer = init_peer_id), executed_state::par(12, 1), executed_state::par(9, 1), executed_state::par(7, 1), executed_state::par(5, 1), executed_state::par(3, 1), executed_state::par(1, 1), stream!("1", 1, peer = setter_1_id), stream!("2", 2, peer = setter_2_id), stream!("1", 1, peer = setter_1_id), stream!("success result from fallible_call_service", 0, peer = setter_3_id), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), stream!("success result from fallible_call_service", 0, peer = setter_3_id), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), executed_state::request_sent_by("setter_3"), ]); assert_eq!(actual_trace_2, expected_trace); let expected_trace = ExecutionTrace::from(vec![ unused!("result from unit_call_service", peer = init_peer_id), executed_state::par(12, 1), executed_state::par(9, 1), executed_state::par(7, 1), executed_state::par(5, 1), executed_state::par(3, 1), executed_state::par(1, 1), stream!("1", 1, peer = setter_1_id), stream!("2", 2, peer = setter_2_id), stream!("1", 1, peer = setter_1_id), stream!("success result from fallible_call_service", 0, peer = setter_3_id), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), stream!("success result from fallible_call_service", 0, peer = setter_3_id), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), unused!("result from unit_call_service", peer = init_peer_id), ]); assert_eq!(actual_trace_3, expected_trace); let mut setter_3_cid_state = ExecutionCidState::new(); let setter_3_malicious_trace = ExecutionTrace::from(vec![ unused!("result from unit_call_service", peer = init_peer_id), executed_state::par(10, 0), executed_state::par(9, 0), executed_state::par(7, 1), executed_state::par(5, 1), executed_state::par(3, 1), executed_state::par(1, 1), executed_state::request_sent_by(init_peer_id), executed_state::request_sent_by(init_peer_id), stream_tracked!("non_exist_value", 0, setter_3_cid_state, peer = setter_1_id), stream_tracked!( "success result from fallible_call_service", 0, setter_3_cid_state, peer = setter_1_id ), failed!( 1, "failed result from fallible_call_service", peer = setter_3_id, service = "error" ), executed_state::request_sent_by(setter_3_id), ]); let setter_3_malicious_data = raw_data_from_trace(setter_3_malicious_trace, setter_3_cid_state); let init_result_3 = call_vm!( init, <_>::default(), &script, init_result_2.data.clone(), setter_3_malicious_data ); let mut cid_state = ExecutionCidState::new(); let prev_value = ValueRef::Stream { cid: value_aggregate_cid( json!("1"), SecurityTetraplet::new(setter_1_id, "", "", ""), vec![], &mut cid_state, ), generation: 1.into(), }; let current_value = ValueRef::Stream { cid: value_aggregate_cid( json!("non_exist_value"), SecurityTetraplet::new(setter_1_id, "", "", ""), vec![], &mut cid_state, ), generation: 0.into(), }; let expected_error = UncatchableError::TraceError { trace_error: TraceHandlerError::MergeError(MergeError::IncorrectCallResult(CallResultError::ValuesNotEqual { prev_value, current_value, })), instruction: r#"call "setter_1" ("" "") [] $stream"#.to_string(), }; assert!(check_error(&init_result_3, expected_error)); let actual_trace = trace_from_result(&init_result_3); let expected_trace = trace_from_result(&init_result_2); assert_eq!(actual_trace, expected_trace); } #[tokio::test] async fn fold_early_exit() { let fold_executor_id = "fold_executor_id"; let error_trigger_id = "error_trigger_id"; let last_error_receiver_id = "last_error_receiver_id"; let last_peer_checker_id = "last_peer_checker_id"; let mut fold_executor = create_avm(unit_call_service(), fold_executor_id).await; let mut error_trigger = create_avm(fallible_call_service("error"), error_trigger_id).await; let mut last_peer_checker = create_avm(echo_call_service(), last_peer_checker_id).await; let script = format!( include_str!("scripts/fold_early_exit.air"), fold_executor_id = fold_executor_id, error_trigger_id = error_trigger_id, last_error_receiver_id = last_error_receiver_id, last_peer_checker_id = last_peer_checker_id ); let fold_executor_result = checked_call_vm!(fold_executor, <_>::default(), &script, "", ""); let error_trigger_result = checked_call_vm!(error_trigger, <_>::default(), &script, "", fold_executor_result.data); let fold_executor_result = checked_call_vm!(fold_executor, <_>::default(), &script, "", error_trigger_result.data); let error_trigger_result = checked_call_vm!(error_trigger, <_>::default(), &script, "", fold_executor_result.data); let last_peer_checker_result = checked_call_vm!( last_peer_checker, <_>::default(), &script, "", error_trigger_result.data ); let actual_trace = trace_from_result(&last_peer_checker_result); let error_value = json!({ "error_code": 10000i64, "instruction" : r#"call "error_trigger_id" ("error" "") [] "#, "message": r#"Local service error, ret_code is 1, error message is '"failed result from fallible_call_service"'"#, "peer_id": "error_trigger_id" }); let expected_state = unused!(error_value.clone(), peer = last_peer_checker_id, args = [error_value]); let bubbled_error_from_stream_1 = actual_trace.len() - 3; assert_eq!( &actual_trace[bubbled_error_from_stream_1.try_into().unwrap()], &expected_state ); let bubbled_error_from_stream_2 = actual_trace.len() - 2; assert_eq!( &actual_trace[bubbled_error_from_stream_2.try_into().unwrap()], &expected_state ); } #[tokio::test] async fn fold_par_early_exit() { let variables_setter_id = "set_variable_id"; let stream_setter_id = "stream_setter_id"; let fold_executor_id = "fold_executor_id"; let error_trigger_id = "error_trigger_id"; let last_error_receiver_id = "last_error_receiver_id"; let last_peer_checker_id = "last_peer_checker_id"; let variables = maplit::hashmap!( "stream_1".to_string() => json!(["a1", "a2"]), "stream_2".to_string() => json!(["b1", "b2"]), "stream_3".to_string() => json!(["c1", "c2"]), "stream_4".to_string() => json!(["d1", "d2"]), ); let mut variables_setter = create_avm( set_variables_call_service(variables, VariableOptionSource::Argument(0)), variables_setter_id, ) .await; let mut stream_setter = create_avm(echo_call_service(), stream_setter_id).await; let mut fold_executor = create_avm(unit_call_service(), fold_executor_id).await; let mut error_trigger = create_avm(fallible_call_service("error"), error_trigger_id).await; let mut last_error_receiver = create_avm(unit_call_service(), last_error_receiver_id).await; let mut last_peer_checker = create_avm(unit_call_service(), last_peer_checker_id).await; let script = format!( include_str!("scripts/fold_par_early_exit.air"), variables_setter_id, stream_setter_id, fold_executor_id, error_trigger_id, last_error_receiver_id, last_peer_checker_id ); let variables_setter_result = checked_call_vm!(variables_setter, <_>::default(), &script, "", ""); let stream_setter_result = checked_call_vm!(stream_setter, <_>::default(), &script, "", variables_setter_result.data); let fold_executor_result = checked_call_vm!(fold_executor, <_>::default(), &script, "", stream_setter_result.data); let error_trigger_result = checked_call_vm!(error_trigger, <_>::default(), &script, "", fold_executor_result.data); let last_error_receiver_result = checked_call_vm!( last_error_receiver, <_>::default(), &script, "", error_trigger_result.data ); let last_peer_checker_result = checked_call_vm!( last_peer_checker, <_>::default(), &script, "", last_error_receiver_result.data ); let actual_trace = trace_from_result(&last_peer_checker_result); let unit_call_service_result = "result from unit_call_service"; let expected_trace = vec![ scalar!(json!(["a1", "a2"]), peer = variables_setter_id, args = ["stream_1"]), scalar!(json!(["b1", "b2"]), peer = variables_setter_id, args = ["stream_2"]), scalar!(json!(["c1", "c2"]), peer = variables_setter_id, args = ["stream_3"]), scalar!(json!(["d1", "d2"]), peer = variables_setter_id, args = ["stream_4"]), stream!("a1", 0, peer = stream_setter_id, args = ["a1"]), stream!("a2", 1, peer = stream_setter_id, args = ["a2"]), stream!("b1", 0, peer = stream_setter_id, args = ["b1"]), stream!("b2", 1, peer = stream_setter_id, args = ["b2"]), stream!("c1", 0, peer = stream_setter_id, args = ["c1"]), stream!("c2", 1, peer = stream_setter_id, args = ["c2"]), stream!("d1", 0, peer = stream_setter_id, args = ["d1"]), stream!("d2", 1, peer = stream_setter_id, args = ["d2"]), executed_state::par(69, 1), executed_state::fold(vec![ executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(48, 0)), executed_state::subtrace_lore(5, subtrace_desc(48, 34), subtrace_desc(82, 0)), ]), executed_state::par(33, 0), executed_state::fold(vec![ executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(32, 0)), executed_state::subtrace_lore(7, subtrace_desc(32, 16), subtrace_desc(48, 0)), ]), executed_state::par(15, 0), executed_state::par(13, 1), executed_state::fold(vec![ executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(25, 0)), executed_state::subtrace_lore(9, subtrace_desc(25, 6), subtrace_desc(31, 0)), ]), executed_state::par(5, 0), executed_state::fold(vec![ executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(23, 0)), executed_state::subtrace_lore(11, subtrace_desc(23, 2), subtrace_desc(25, 0)), ]), executed_state::par(1, 0), unused!(unit_call_service_result, peer = fold_executor_id), par(1, 0), unused!(unit_call_service_result, peer = fold_executor_id), executed_state::par(5, 0), executed_state::fold(vec![ executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(29, 0)), executed_state::subtrace_lore(11, subtrace_desc(29, 2), subtrace_desc(31, 0)), ]), executed_state::par(1, 0), unused!(unit_call_service_result, peer = fold_executor_id), par(1, 0), unused!(unit_call_service_result, peer = fold_executor_id), failed!( 1, "failed result from fallible_call_service", peer = error_trigger_id, service = "error" ), executed_state::par(15, 0), executed_state::par(13, 1), executed_state::fold(vec![ executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(41, 0)), executed_state::subtrace_lore(9, subtrace_desc(41, 6), subtrace_desc(47, 0)), ]), ]; let trace_len = expected_trace.len(); assert_eq!(&(*actual_trace)[0..trace_len], expected_trace); }