/* * AquaVM Workflow Engine * * Copyright (C) 2024 Fluence DAO * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation version 3 of the * License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use air::no_error_object; use air::ExecutionCidState; use air_test_framework::AirScriptExecutor; use air_test_utils::key_utils::at; use air_test_utils::prelude::*; use futures::FutureExt; use pretty_assertions::assert_eq; use std::cell::RefCell; use std::rc::Rc; #[tokio::test] async fn ap_with_scalars() { let vm_1_peer_id = "vm_1_peer_id"; let test_value = "scalar_2"; let mut vm_1 = create_avm(set_variable_call_service(json!({ "field": test_value })), vm_1_peer_id).await; let vm_2_peer_id = "vm_2_peer_id"; let mut vm_2 = create_avm(echo_call_service(), vm_2_peer_id).await; let script = format!( r#" (seq (seq (call "{vm_1_peer_id}" ("" "") ["scalar_1_result"] scalar_1) (ap scalar_1.$.field! scalar_2) ) (call "{vm_2_peer_id}" ("" "") [scalar_2]) ) "# ); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data); let actual_trace = trace_from_result(&result); let expected_state = vec![ scalar!( json!({ "field": test_value }), peer = vm_1_peer_id, args = ["scalar_1_result"] ), unused!(test_value, peer = vm_2_peer_id, args = [test_value]), ]; assert_eq!(actual_trace, expected_state); assert!(result.next_peer_pks.is_empty()); } #[tokio::test] async fn ap_with_string_literal() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id).await; let some_string = "some_string"; let script = format!( r#" (seq (ap "{some_string}" $stream) (seq (canon "{vm_1_peer_id}" $stream #canon_stream) (call "{vm_1_peer_id}" ("" "") [#canon_stream]))) "# ); let result = checked_call_vm!(vm_1, <_>::default(), script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = vec![ executed_state::ap(0), executed_state::canon(json!( { "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "values": [ { "result": "some_string", "tetraplet": {"function_name": "", "lens": "", "peer_pk": "", "service_id": ""}, "trace_pos": 0 } ] } )), unused!(json!([some_string]), peer = vm_1_peer_id, args = [json!([some_string])]), ]; assert_eq!(actual_trace, expected_state); assert!(result.next_peer_pks.is_empty()); } #[tokio::test] async fn ap_with_bool_literal() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id).await; let script = format!( r#" (seq (ap true $stream) (seq (canon "{vm_1_peer_id}" $stream #canon_stream) (call "{vm_1_peer_id}" ("" "") [#canon_stream]))) "# ); let result = checked_call_vm!(vm_1, <_>::default(), script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = vec![ executed_state::ap(0), executed_state::canon(json!( { "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "values": [ { "result": true, "tetraplet": {"function_name": "", "lens": "", "peer_pk": "", "service_id": ""}, "trace_pos": 0 } ] })), unused!(json!([true]), peer = vm_1_peer_id, args = [json!([true])]), ]; assert_eq!(actual_trace, expected_state); assert!(result.next_peer_pks.is_empty()); } #[tokio::test] async fn ap_with_number_literal() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id).await; let script = format!( r#" (seq (ap 100 $stream) (seq (canon "{vm_1_peer_id}" $stream #canon_stream) (call "{vm_1_peer_id}" ("" "") [#canon_stream]))) "# ); let result = checked_call_vm!(vm_1, <_>::default(), script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = vec![ executed_state::ap(0), executed_state::canon(json!({ "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "values": [ { "result": 100, "tetraplet": {"function_name": "", "lens": "", "peer_pk": "", "service_id": ""}, "trace_pos": 0 } ] })), unused!(json!([100]), peer = vm_1_peer_id, args = [json!([100])]), ]; assert_eq!(actual_trace, expected_state); assert!(result.next_peer_pks.is_empty()); } #[tokio::test] async fn ap_with_last_error() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id).await; let script = format!( r#" (seq (ap %last_error% $stream) (seq (canon "{vm_1_peer_id}" $stream #canon_stream) (call "{vm_1_peer_id}" ("" "") [#canon_stream]))) "# ); let result = checked_call_vm!(vm_1, <_>::default(), script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = vec![ executed_state::ap(0), executed_state::canon(json!({ "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "values": [ { "result": no_error_object(), "tetraplet": {"function_name": "", "lens": "", "peer_pk": "", "service_id": ""}, "trace_pos": 0 } ] })), unused!( json!([no_error_object()]), peer = vm_1_peer_id, args = [no_error_object()] ), ]; assert_eq!(actual_trace, expected_state); assert!(result.next_peer_pks.is_empty()); } #[tokio::test] async fn ap_with_error() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id).await; let script = format!( r#" (seq (ap :error: $stream) (seq (canon "{vm_1_peer_id}" $stream #canon_stream) (call "{vm_1_peer_id}" ("" "") [#canon_stream]))) "# ); let result = checked_call_vm!(vm_1, <_>::default(), script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = vec![ executed_state::ap(0), executed_state::canon(json!({ "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "values": [ { "result": no_error_object(), "tetraplet": {"function_name": "", "lens": "", "peer_pk": "", "service_id": ""}, "trace_pos": 0 } ] })), unused!( json!([no_error_object()]), peer = vm_1_peer_id, args = [no_error_object()] ), ]; assert_eq!(actual_trace, expected_state); assert!(result.next_peer_pks.is_empty()); } #[tokio::test] async fn ap_with_timestamp() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id).await; let script = format!( r#" (seq (ap %timestamp% scalar) (call "{vm_1_peer_id}" ("" "") [scalar]) ) "# ); let test_params = TestRunParameters::from_timestamp(1337); let result = checked_call_vm!(vm_1, test_params.clone(), script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = vec![unused!( test_params.timestamp, peer = vm_1_peer_id, args = [test_params.timestamp] )]; assert_eq!(actual_trace, expected_state); } #[tokio::test] async fn ap_with_ttl() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id).await; let script = format!( r#" (seq (ap %ttl% scalar) (call "{vm_1_peer_id}" ("" "") [scalar]) ) "# ); let test_params = TestRunParameters::from_ttl(1337); let result = checked_call_vm!(vm_1, test_params.clone(), script, "", ""); let actual_trace = trace_from_result(&result); let expected_state = vec![unused!(test_params.ttl, peer = vm_1_peer_id, args = [test_params.ttl])]; assert_eq!(actual_trace, expected_state); } #[tokio::test] async fn ap_with_dst_stream() { let vm_1_peer_id = "vm_1_peer_id"; let test_value = "scalar_2"; let mut vm_1 = create_avm(set_variable_call_service(json!({ "field": test_value })), vm_1_peer_id).await; let vm_2_peer_id = "vm_2_peer_id"; let mut vm_2 = create_avm(echo_call_service(), vm_2_peer_id).await; let script = format!( r#" (seq (seq (call "{vm_1_peer_id}" ("" "") ["scalar_1_result"] scalar_1) (ap scalar_1 $stream)) (seq (canon "{vm_2_peer_id}" $stream #canon_stream) (call "{vm_2_peer_id}" ("" "") [#canon_stream]))) "# ); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data); let val_1 = scalar!( json!({ "field": test_value }), peer = vm_1_peer_id, args = ["scalar_1_result"] ); let cid_1 = extract_service_result_cid(&val_1); let actual_trace = trace_from_result(&result); let expected_state = vec![ val_1, executed_state::ap(0), executed_state::canon(json!({ "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_2_peer_id", "service_id": ""}, "values": [{ "result": {"field": "scalar_2"}, "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "provenance": Provenance::service_result(cid_1), }] })), unused!( json!([{ "field": test_value }]), peer = vm_2_peer_id, args = [json!([{ "field": test_value }])] ), ]; assert_eq!(actual_trace, expected_state); assert!(result.next_peer_pks.is_empty()); } #[tokio::test] async fn ap_canon_stream_with_lambda() { let vm_1_peer_id = "vm_1_peer_id"; let (echo_call_service, tetraplet_checker) = tetraplet_host_function(echo_call_service()); let mut vm_1 = create_avm(echo_call_service, vm_1_peer_id).await; let service_name = "some_service_name"; let function_name = "some_function_name"; let script = format!( r#" (seq (seq (call "{vm_1_peer_id}" ("" "") [0] $stream) (call "{vm_1_peer_id}" ("{service_name}" "{function_name}") [1] $stream)) (seq (canon "{vm_1_peer_id}" $stream #canon_stream) (seq (ap #canon_stream.$.[1] $stream_2) (seq (canon "{vm_1_peer_id}" $stream_2 #canon_stream_2) (call "{vm_1_peer_id}" ("" "") [#canon_stream_2]))))) "# ); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let val_1 = stream!(0, 0, peer = vm_1_peer_id, args = [0]); let val_2 = stream!( 1, 1, peer = vm_1_peer_id, service = service_name, function = function_name, args = [1] ); let cid_1 = extract_service_result_cid(&val_1); let cid_2 = extract_service_result_cid(&val_2); let canon_1 = executed_state::canon(json!({ "tetraplet": { "function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": "", }, "values": [{ "result": 0, "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "provenance": Provenance::service_result(cid_1), }, { "result": 1, "tetraplet": { "function_name": "some_function_name", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": "some_service_name", }, "provenance": Provenance::service_result(cid_2.clone()), }]})); let actual_trace = trace_from_result(&result); let expected_state = vec![ val_1, val_2, canon_1, executed_state::ap(0), executed_state::canon(json!({ "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "values": [{ "result": 1, "tetraplet": {"function_name": "some_function_name", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": "some_service_name"}, "provenance": Provenance::service_result(cid_2), }] })), unused!(json!([1]), peer = vm_1_peer_id, args = [json!([1])]), ]; assert_eq!(actual_trace, expected_state); let expected_tetraplet = RefCell::new(vec![vec![SecurityTetraplet::new( vm_1_peer_id, service_name, function_name, "", )]]); assert_eq!(tetraplet_checker.as_ref(), &expected_tetraplet); } #[tokio::test] async fn ap_canon_stream() { let vm_1_peer_id = "vm_1_peer_id"; let arg_tetraplets = Rc::new(RefCell::new(vec![])); let echo_call_service: CallServiceClosure = Box::new(move |mut params| { let arg_tetraplets_inner = arg_tetraplets.clone(); arg_tetraplets_inner.borrow_mut().push(params.tetraplets.clone()); let result = CallServiceResult::ok(params.arguments.remove(0)); async move { result }.boxed_local() }); let (echo_call_service, tetraplet_checker) = tetraplet_host_function(echo_call_service); let mut vm_1 = create_avm(echo_call_service, vm_1_peer_id).await; let service_name = "some_service_name"; let function_name = "some_function_name"; let script = format!( r#" (seq (seq (call "{vm_1_peer_id}" ("" "") [0] $stream) (call "{vm_1_peer_id}" ("{service_name}" "{function_name}") [1] $stream)) (seq (canon "{vm_1_peer_id}" $stream #canon_stream) (seq (ap #canon_stream $stream_2) (seq (canon "{vm_1_peer_id}" $stream_2 #canon_stream_2) (call "{vm_1_peer_id}" ("" "") [#canon_stream_2]))))) "# ); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); print_trace(&result, ""); let val_1 = stream!(0, 0, peer = vm_1_peer_id, args = [0]); let val_2 = stream!( 1, 1, peer = vm_1_peer_id, service = service_name, function = function_name, args = [1] ); let cid_1 = extract_service_result_cid(&val_1); let cid_2 = extract_service_result_cid(&val_2); let canon_1 = executed_state::canon(json!({ "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "values": [{ "result": 0, "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "provenance": Provenance::service_result(cid_1), }, { "result": 1, "tetraplet": { "function_name": "some_function_name", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": "some_service_name" }, "provenance": Provenance::service_result(cid_2), }]})); let canon_cid_1 = extract_canon_result_cid(&canon_1); let actual_trace = trace_from_result(&result); let expected_state = vec![ val_1, val_2, canon_1, executed_state::ap(0), executed_state::canon(json!({ "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "values": [{ "result": [0, 1], "tetraplet": {"function_name": "", "lens": "", "peer_pk": "vm_1_peer_id", "service_id": ""}, "provenance": Provenance::canon(canon_cid_1), }]} )), unused!(json!([[0, 1]]), peer = vm_1_peer_id, args = [json!([[0, 1]])]), ]; assert_eq!(actual_trace, expected_state); let expected_tetraplet = RefCell::new(vec![vec![SecurityTetraplet::new(vm_1_peer_id, "", "", "")]]); assert_eq!(tetraplet_checker.as_ref(), &expected_tetraplet); } #[tokio::test] async fn ap_stream_map() { let vm_1_peer_id = "vm_1_peer_id"; let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id).await; let service_name1 = "serv1"; let service_name2 = "serv2"; let script = format!( r#" (seq (seq (ap ("{vm_1_peer_id}" "{service_name1}") %map) (ap ("{vm_1_peer_id}" "{service_name2}") %map) ) (fold %map i (seq (call i.$.key (i.$.key i.$.value) [i] u) (next i) ) ) ) "# ); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let actual_trace = trace_from_result(&result); let generation_idx = 0; let mut cid_tracker = ExecutionCidState::new(); let service_result1 = json!({ "key": vm_1_peer_id, "value": service_name1, }); let service_result2 = json!({ "key": vm_1_peer_id, "value": service_name2, }); let service_args1 = vec![service_result1.clone()]; let service_args2 = vec![service_result2.clone()]; let expected_state = ExecutionTrace::from(vec![ executed_state::ap(generation_idx), executed_state::ap(generation_idx), executed_state::fold(vec![ subtrace_lore(0, SubTraceDesc::new(3.into(), 1), SubTraceDesc::new(5.into(), 0)), subtrace_lore(1, SubTraceDesc::new(4.into(), 1), SubTraceDesc::new(5.into(), 0)), ]), scalar_tracked!( service_result1, cid_tracker, peer = vm_1_peer_id, service = vm_1_peer_id, function = service_name1, args = service_args1 ), scalar_tracked!( service_result2, cid_tracker, peer = vm_1_peer_id, service = vm_1_peer_id, function = service_name2, args = service_args2 ), ]); assert_eq!(actual_trace, expected_state); } #[tokio::test] async fn ap_stream_map_with_undefined_last_error() { let vm_1_peer_id = "vm_1_peer_id"; let script = format!( r#" (seq (ap ("key" %last_error%) %map) (fold %map i (seq (call "{vm_1_peer_id}" ("m" "f") [i.$.value]) ; behaviour = echo (next i) ) ) ) "# ); let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(vm_1_peer_id), &script) .await .expect("invalid test AIR script"); let result = executor.execute_all(vm_1_peer_id).await.unwrap(); let actual_trace = trace_from_result(&result.last().unwrap()); let expected_state = vec![ executed_state::ap(0), executed_state::fold(vec![subtrace_lore( 0, SubTraceDesc::new(2.into(), 1), SubTraceDesc::new(3.into(), 0), )]), unused!( no_error_object(), peer = vm_1_peer_id, service = "m", function = "f", args = [no_error_object()] ), ]; assert_eq!(actual_trace, expected_state,); } #[tokio::test] async fn ap_canon_stream_map_with_string_key_accessor_lambda() { let vm_1_peer_name = "vm_1_peer_id"; let vm_1_peer_id = at(vm_1_peer_name); let script = format!( r#" (seq (seq (ap ("key" "value1") %map) (canon "{vm_1_peer_name}" %map #%canon_map) ) (seq (ap #%canon_map.$.key scalar) (call "{vm_1_peer_name}" ("m" "f") [scalar] scalar1) ; behaviour = echo ) ) "# ); let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(vm_1_peer_name), &script) .await .expect("invalid test AIR script"); let result = executor.execute_all(vm_1_peer_name).await.unwrap(); let actual_trace = trace_from_result(&result.last().unwrap()); let mut cid_tracker: ExecutionCidState = ExecutionCidState::new(); let map_value = json!({"key": "key", "value": "value1"}); let tetraplet = json!({"function_name": "", "lens": "", "peer_pk": vm_1_peer_id, "service_id": ""}); let call_arg = json!(["value1"]); let expected_trace: Vec = vec![ executed_state::ap(0), canon_tracked( json!({"tetraplet": tetraplet, "values": [ { "result": map_value, "tetraplet": tetraplet, "provenance": Provenance::Literal, }, ]}), &mut cid_tracker, ), scalar_tracked!( call_arg.clone(), cid_tracker, peer = vm_1_peer_id, service = "m..0", function = "f", args = [call_arg] ), ]; assert_eq!(&*actual_trace, expected_trace,); } #[tokio::test] async fn ap_canon_stream_map_with_numeric_key_accessor_lambda() { let vm_1_peer_name = "vm_1_peer_id"; let vm_1_peer_id = at(vm_1_peer_name); let script = format!( r#" (seq (seq (ap (42 "value1") %map) (canon "{vm_1_peer_name}" %map #%canon_map) ) (seq (ap #%canon_map.$.[42] scalar) (call "{vm_1_peer_name}" ("m" "f") [scalar] scalar1) ; behaviour = echo ) ) "# ); let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(vm_1_peer_name), &script) .await .expect("invalid test AIR script"); let result = executor.execute_all(vm_1_peer_name).await.unwrap(); let actual_trace = trace_from_result(&result.last().unwrap()); let mut cid_tracker: ExecutionCidState = ExecutionCidState::new(); let map_value = json!({"key": 42, "value": "value1"}); let tetraplet = json!({"function_name": "", "lens": "", "peer_pk": vm_1_peer_id, "service_id": ""}); let call_arg = json!(["value1"]); let expected_trace: Vec = vec![ executed_state::ap(0), canon_tracked( json!({"tetraplet": tetraplet, "values": [ { "result": map_value, "tetraplet": tetraplet, "provenance": Provenance::Literal, }, ]}), &mut cid_tracker, ), scalar_tracked!( call_arg.clone(), cid_tracker, peer = vm_1_peer_id, service = "m..0", function = "f", args = [call_arg] ), ]; assert_eq!(&*actual_trace, expected_trace,); } #[tokio::test] async fn ap_map_key_join_behavior() { let vm_1_peer_id = "vm_1_peer_id"; let script = r#" (seq (par (null) (seq (never) (ap "42" key) ) ) (seq (ap (key "value") %map) (canon %init_peer_id% %map map) ) ) "#; let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(vm_1_peer_id), &script) .await .expect("invalid test AIR script"); let result = executor.execute_one(vm_1_peer_id).await.unwrap(); assert_eq!(result.ret_code, 0, "{:?}", result.error_message); } #[tokio::test] async fn ap_map_value_join_behavior() { let vm_1_peer_id = "vm_1_peer_id"; let script = r#" (seq (par (null) (seq (never) (ap "42" value) ) ) (ap ("key" value) %map) ) "#; let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(vm_1_peer_id), &script) .await .expect("invalid test AIR script"); let result = executor.execute_one(vm_1_peer_id).await.unwrap(); assert_eq!(result.ret_code, 0, "{:?}", result.error_message); }