/*
* AquaVM Workflow Engine
*
* Copyright (C) 2024 Fluence DAO
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation version 3 of the
* License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
use air_test_utils::prelude::*;
use futures::FutureExt;
use std::cell::RefCell;
use std::rc::Rc;
type ClosureSettableVar = Rc>;
#[derive(Default, Clone, Debug, PartialEq, Eq)]
struct ClosureCallArgs {
service_id_var: Rc>,
function_name_var: ClosureSettableVar,
args_var: ClosureSettableVar>,
tetraplets: ClosureSettableVar>>,
}
fn create_check_service_closure(closure_call_args: ClosureCallArgs) -> CallServiceClosure<'static> {
Box::new(move |params| {
use std::ops::Deref;
*closure_call_args.service_id_var.deref().borrow_mut() = params.service_id.clone();
*closure_call_args.function_name_var.deref().borrow_mut() = params.function_name.clone();
let call_args: Vec = serde_json::from_value(serde_json::Value::Array(params.arguments))
.expect("json deserialization shouldn't fail");
*closure_call_args.args_var.deref().borrow_mut() = call_args;
let result = CallServiceResult::ok(json!(""));
async move { result }.boxed_local()
})
}
#[tokio::test]
async fn flattening_scalar_arrays() {
let scalar_array = json!({"iterable": [
{"peer_id" : "local_peer_id", "service_id": "local_service_id", "function_name": "local_function_name", "args": [0, 1]},
{"peer_id" : "local_peer_id", "service_id": "local_service_id", "function_name": "local_function_name", "args": [2, 3]},
]});
let set_variable_peer_id = "set_variable";
let mut set_variable_vm = create_avm(set_variable_call_service(scalar_array), set_variable_peer_id).await;
let closure_call_args = ClosureCallArgs::default();
let local_peer_id = "local_peer_id";
let mut local_vm = create_avm(create_check_service_closure(closure_call_args.clone()), local_peer_id).await;
let script = format!(
r#"
(seq
(call "{set_variable_peer_id}" ("" "") [] scalar_array)
(fold scalar_array.$.iterable! v
(seq
(call v.$.peer_id! (v.$.service_id! v.$.function_name!) [v.$.args.[0]! v.$.args.[1]!])
(next v)
)
)
)
"#
);
let result = checked_call_vm!(set_variable_vm, <_>::default(), script.clone(), "", "");
let result = call_vm!(local_vm, <_>::default(), script, "", result.data);
assert!(is_interpreter_succeded(&result));
assert_eq!(
closure_call_args.service_id_var,
Rc::new(RefCell::new("local_service_id".to_string()))
);
assert_eq!(
closure_call_args.function_name_var,
Rc::new(RefCell::new("local_function_name".to_string()))
);
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![2, 3])));
}
#[tokio::test]
#[ignore]
async fn flattening_streams() {
let stream_value = json!(
{"peer_id" : "local_peer_id", "service_id": "local_service_id", "function_name": "local_function_name", "args": [0, 1]}
);
let set_variable_peer_id = "set_variable";
let mut set_variable_vm = create_avm(set_variable_call_service(stream_value), set_variable_peer_id).await;
let closure_call_args = ClosureCallArgs::default();
let local_peer_id = "local_peer_id";
let mut local_vm = create_avm(create_check_service_closure(closure_call_args.clone()), local_peer_id).await;
let script = format!(
r#"
(seq
(seq
(seq
(call "{set_variable_peer_id}" ("" "") [] $stream)
(call "{set_variable_peer_id}" ("" "") [] $stream)
)
(call "{set_variable_peer_id}" ("" "") [] $stream)
)
(fold $stream.$.[0,1,2] v
(seq
(call v.$.peer_id! (v.$.service_id! v.$.function_name!) [v.$.args[0]! v.$.args[1]!])
(next v)
)
)
)
"#
);
let result = checked_call_vm!(set_variable_vm, <_>::default(), script.clone(), "", "");
let result = call_vm!(local_vm, <_>::default(), script, "", result.data);
assert!(is_interpreter_succeded(&result));
assert_eq!(
closure_call_args.service_id_var,
Rc::new(RefCell::new("local_service_id".to_string()))
);
assert_eq!(
closure_call_args.function_name_var,
Rc::new(RefCell::new("local_function_name".to_string()))
);
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![0, 1])));
}
#[tokio::test]
#[ignore]
async fn test_handling_non_flattening_values() {
let stream_value = json!(
{"peer_id" : "local_peer_id", "service_id": "local_service_id", "function_name": "local_function_name", "args": [0, 1]}
);
let set_variable_peer_id = "set_variable";
let mut set_variable_vm = create_avm(set_variable_call_service(stream_value), set_variable_peer_id).await;
let closure_call_args = ClosureCallArgs::default();
let local_peer_id = "local_peer_id";
let mut local_vm = create_avm(create_check_service_closure(closure_call_args), local_peer_id).await;
let script = format!(
r#"
(seq
(seq
(seq
(call "{set_variable_peer_id}" ("" "") [] $stream)
(call "{set_variable_peer_id}" ("" "") [] $stream)
)
(call "{set_variable_peer_id}" ("" "") [] $stream)
)
(fold $stream.$.[0,1,2]! v
(seq
(call v.$.peer_id! (v.$.service_id! v.$.function_name!) [v.$.args[0]! v.$.args[1]!])
(next v)
)
)
)
"#
);
let result = checked_call_vm!(set_variable_vm, <_>::default(), &script, "", "");
let result = call_vm!(local_vm, <_>::default(), &script, "", result.data);
assert_eq!(result.ret_code, 1017);
assert_eq!(
result.error_message,
String::from(
r#"jvalue '[{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]},{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]},{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]}]' can't be flattened, to be flattened a jvalue should have an array type and consist of zero or one values"#
)
);
}