/*
* AquaVM Workflow Engine
*
* Copyright (C) 2024 Fluence DAO
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation version 3 of the
* License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
use air_test_framework::AirScriptExecutor;
use air_test_utils::{key_utils::at, prelude::*};
use futures::stream::StreamExt;
#[tokio::test]
async fn merging_fold_iterations_extensively() {
let script = r#"
(seq
(seq
(call "client" ("get" "data") [] permutations) ; ok = [[@"p1",[[[@"p1",1],[@"p2",2],[@"p3",3]],[[@"p1",4],[@"p3",5],[@"p2",6]]]],[@"p2",[[[@"p2",7],[@"p1",8],[@"p3",9]],[[@"p2",10],[@"p3",11],[@"p1",12]]]],[@"p3",[[[@"p3",13],[@"p1",14],[@"p2",15]],[[@"p3",16],[@"p2",17],[@"p1",18]]]]]
(seq
(fold permutations pair
(seq
(fold pair.$.[1] peer_ids
(seq
(seq
(call pair.$.[0] ("op" "noop") []) ; ok = null
(ap peer_ids $inner)
)
(next peer_ids)
)
)
(next pair)
)
)
(seq
(canon "relay" $inner #inner)
(fold $inner ns
(par
(fold ns pair
(seq
(seq
(call pair.$.[0] ("op" "noop") []) ; ok = null
(ap pair.$.[1] $result)
)
(next pair)
)
)
(next ns)
)
)
)
)
)
(seq
(new $monotonic_stream
(seq
(fold $result elem
(seq
(ap elem $monotonic_stream)
(seq
(canon "relay" $monotonic_stream #canon_stream)
(xor
(match #canon_stream.length 18
(null)
)
(next elem)
)
)
)
)
(canon "relay" $result #joined_result)
)
)
(call "client" ("return" "") [#inner #joined_result]) ; ok = null
)
)
"#;
let engine = ::new(
TestRunParameters::from_init_peer_id("client"),
vec![],
vec!["relay", "p1", "p2", "p3"].into_iter().map(Into::into),
script,
)
.await
.unwrap();
let mut queue = std::collections::vec_deque::VecDeque::new();
let mut relay_outcomes = Vec::::new();
queue.push_back("client".to_string());
while !queue.is_empty() {
let peer = queue.pop_front().unwrap();
if let Some(outcomes) = engine.execution_iter(peer.as_str()) {
for outcome in outcomes.collect::>().await {
assert_eq!(outcome.ret_code, 0, "{outcome:?}");
for peer in &outcome.next_peer_pks {
queue.push_back(peer.clone());
}
if peer == at("relay") {
relay_outcomes.push(outcome);
}
}
} else {
println!("peer: {peer}, no executions");
}
}
let last_relay_data = relay_outcomes.last().unwrap();
let last_relay_trace = trace_from_result(last_relay_data);
let last_fold = last_relay_trace
.iter()
.filter_map(|state| match state {
ExecutedState::Fold(fold_result) => Some(fold_result),
_ => None,
})
.last()
.unwrap();
assert_eq!(last_fold.lore.len(), 18);
}
#[tokio::test]
async fn merging_fold_iterations_extensively_2() {
let script = r#"
(seq
(seq
(call "client" ("get" "data") [] permutations) ; ok = [[@"p1",[[[@"p1",1],[@"p2",2],[@"p3",3]],[[@"p1",4],[@"p3",5],[@"p2",6]]]],[@"p2",[[[@"p2",7],[@"p1",8],[@"p3",9]],[[@"p2",10],[@"p3",11],[@"p1",12]]]],[@"p3",[[[@"p3",13],[@"p1",14],[@"p2",15]],[[@"p3",16],[@"p2",17],[@"p1",18]]]]]
(seq
(seq
(fold permutations pair
(seq
(null)
(seq
(fold pair.$.[1] pid-num-arr
(seq
(seq
(call pair.$.[0] ("op" "noop") []) ; ok = null
(ap pid-num-arr $pid-num-arrs)
)
(seq
(null)
(next pid-num-arr)
)
)
)
(next pair)
)
)
)
(seq
(canon "p1" $pid-num-arrs #pid-num-arrs-1)
(call "p1" ("test" "print") [#pid-num-arrs-1]) ; behaviour = echo
)
)
(seq
(seq
(canon "p1" $pid-num-arrs #pid-num-arrs-2)
(call "p1" ("test" "print") [#pid-num-arrs-2]) ; behaviour = echo
)
(new $result
(fold $pid-num-arrs pid-num-arr
(seq
(seq
(call "p1" ("test" "print") [pid-num-arr]) ; behaviour = echo
(fold pid-num-arr pid-num
(seq
(seq
(null)
(seq
(call pid-num.$.[0] ("op" "noop") []) ; ok = null
(ap pid-num.$.[1] $result)
)
)
(seq
(seq
(canon pid-num.$.[0] $result #mon_res)
(call pid-num.$.[0] ("test" "print") [#mon_res]) ; behaviour = echo
)
(next pid-num)
)
)
)
)
(seq
(seq
(canon "p1" $result #mon_res)
(call "p1" ("test" "print") [#mon_res]) ; behaviour = echo
)
(xor
(match #mon_res.length 18
(call "p1" ("test" "print") [#mon_res.length]) ; behaviour = echo
)
(seq
(call "p1" ("test" "print") ["not enought length"]) ; behaviour = echo
(next pid-num-arr)
)
)
)
)
)
)
)
)
)
(seq
(call "p1" ("op" "noop") ["final p1"]) ; behaviour = echo
(seq
(canon "client" $result #end_result)
(call "p1" ("return" "") [#end_result]) ; behaviour = echo
)
)
)
"#;
let engine = ::new(
TestRunParameters::from_init_peer_id("client"),
vec![],
vec!["relay", "p1", "p2", "p3"].into_iter().map(Into::into),
script,
)
.await
.unwrap();
let mut queue = std::collections::vec_deque::VecDeque::new();
let mut p1_outcomes = Vec::::new();
queue.push_back("client".to_string());
while !queue.is_empty() {
let peer = queue.pop_front().unwrap();
if let Some(outcomes) = engine.execution_iter(peer.as_str()) {
for outcome in outcomes.collect::>().await {
assert_eq!(outcome.ret_code, 0, "{outcome:?}");
for peer in &outcome.next_peer_pks {
if !queue.contains(peer) {
queue.push_back(peer.clone());
}
}
if peer == at("p1") {
p1_outcomes.push(outcome);
}
}
} else {
println!("peer: {peer}, no executions");
}
}
let last_p1_data = p1_outcomes.last().unwrap();
let last_p1_trace = trace_from_result(last_p1_data);
let last_fold = last_p1_trace
.iter()
.filter_map(|state| match state {
ExecutedState::Fold(fold_result) => Some(fold_result),
_ => None,
})
.last()
.unwrap();
assert_eq!(last_fold.lore.len(), 6);
}