use std::sync::Arc; use armonik::{agent, reexports::tokio_stream::StreamExt, server::AgentServiceExt}; mod common; #[derive(Debug, Clone, Default)] struct Service { failure: Option, wait: Option, } impl armonik::server::AgentService for Service { async fn create_results_metadata( self: Arc, request: agent::create_results_metadata::Request, cancellation_token: tokio_util::sync::CancellationToken, ) -> std::result::Result { common::unary_rpc_impl( self.wait.clone(), self.failure.clone(), cancellation_token, || { Ok(agent::create_results_metadata::Response { communication_token: request.communication_token, results: request .names .into_iter() .map(|name| { ( name.clone(), agent::ResultMetaData { session_id: String::from("rpc-create-results-metadata-output"), name, ..Default::default() }, ) }) .collect(), }) }, ) .await } async fn create_results( self: Arc, request: agent::create_results::Request, cancellation_token: tokio_util::sync::CancellationToken, ) -> std::result::Result { common::unary_rpc_impl( self.wait.clone(), self.failure.clone(), cancellation_token, || { Ok(agent::create_results::Response { communication_token: request.communication_token, results: request .results .into_iter() .map(|(name, _)| { eprintln!("NAME: {name}"); ( name.clone(), agent::ResultMetaData { name, session_id: String::from("rpc-create-results-output"), ..Default::default() }, ) }) .collect(), }) }, ) .await } async fn notify_result_data( self: Arc, request: agent::notify_result_data::Request, cancellation_token: tokio_util::sync::CancellationToken, ) -> std::result::Result { common::unary_rpc_impl( self.wait.clone(), self.failure.clone(), cancellation_token, || { Ok(agent::notify_result_data::Response { result_ids: vec![ request.communication_token, String::from("rpc-notify-result-data-output"), ], }) }, ) .await } async fn submit_tasks( self: Arc, request: agent::submit_tasks::Request, cancellation_token: tokio_util::sync::CancellationToken, ) -> std::result::Result { common::unary_rpc_impl( self.wait.clone(), self.failure.clone(), cancellation_token, || { Ok(agent::submit_tasks::Response { communication_token: request.communication_token, items: vec![agent::submit_tasks::ResponseItem { task_id: String::from("rpc-submit-tasks-output"), ..Default::default() }], }) }, ) .await } async fn get_resource_data( self: Arc, _request: agent::get_resource_data::Request, cancellation_token: tokio_util::sync::CancellationToken, ) -> std::result::Result { common::unary_rpc_impl( self.wait.clone(), self.failure.clone(), cancellation_token, || { Ok(agent::get_resource_data::Response { result_id: String::from("rpc-get-resource-data-output"), }) }, ) .await } async fn get_common_data( self: Arc, _request: agent::get_common_data::Request, cancellation_token: tokio_util::sync::CancellationToken, ) -> std::result::Result { common::unary_rpc_impl( self.wait.clone(), self.failure.clone(), cancellation_token, || { Ok(agent::get_common_data::Response { result_id: String::from("rpc-get-common-data-output"), }) }, ) .await } async fn get_direct_data( self: Arc, _request: agent::get_direct_data::Request, cancellation_token: tokio_util::sync::CancellationToken, ) -> std::result::Result { common::unary_rpc_impl( self.wait.clone(), self.failure.clone(), cancellation_token, || { Ok(agent::get_direct_data::Response { result_id: String::from("rpc-get-direct-data-output"), }) }, ) .await } async fn create_tasks( self: Arc, request: impl tonic::codegen::tokio_stream::Stream< Item = Result, > + Send + 'static, cancellation_token: tokio_util::sync::CancellationToken, ) -> Result { let mut request = std::pin::pin!(request); let mut token = None; loop { match request.next().await { Some(Ok(agent::create_tasks::Request::InitTaskRequest { communication_token, .. })) => { token = Some(communication_token); } Some(Ok(_)) => {} Some(Err(err)) => return Err(err), None => break, } } common::unary_rpc_impl( self.wait.clone(), self.failure.clone(), cancellation_token, || { Ok(agent::create_tasks::Response::Status { communication_token: token.unwrap_or_default(), statuses: vec![agent::create_tasks::Status::TaskInfo { task_id: String::from("rpc-create-tasks-output"), expected_output_keys: vec![], data_dependencies: vec![], payload_id: String::new(), }], }) }, ) .await } } #[tokio::test] async fn create_results_metadata() { let mut client = armonik::Client::with_channel(Service::default().agent_server()).agent(); let response = client .create_results_metadata("rpc-create-results-metadata-input", "", ["result-id"]) .await .unwrap(); assert_eq!( response["result-id"].session_id, "rpc-create-results-metadata-output" ); } #[tokio::test] async fn create_results() { let mut client = armonik::Client::with_channel(Service::default().agent_server()).agent(); let response = client .create_results("rpc-create-results-input", "", [("result-id", b"")]) .await .unwrap(); assert_eq!( response["result-id"].session_id, "rpc-create-results-output" ); } #[tokio::test] async fn notify_result_data() { let mut client = armonik::Client::with_channel(Service::default().agent_server()).agent(); let response = client .notify_result_data("rpc-notify-result-data-input", "", [""]) .await .unwrap(); assert_eq!(response[0], "rpc-notify-result-data-input"); assert_eq!(response[1], "rpc-notify-result-data-output"); } #[tokio::test] async fn submit_tasks() { let mut client: armonik::client::AgentClient< armonik::api::v3::agent::agent_server::AgentServer, > = armonik::Client::with_channel(Service::default().agent_server()).agent(); let response = client .submit_tasks("rpc-submit-tasks-input", "", None, []) .await .unwrap(); assert_eq!(response[0].task_id, "rpc-submit-tasks-output"); } #[tokio::test] async fn get_resource_data() { let mut client: armonik::client::AgentClient< armonik::api::v3::agent::agent_server::AgentServer, > = armonik::Client::with_channel(Service::default().agent_server()).agent(); let response = client .call(agent::get_resource_data::Request { communication_token: String::from("rpc-get-resource-data-input"), result_id: String::from(""), }) .await .unwrap(); assert_eq!(response.result_id, "rpc-get-resource-data-output"); } #[tokio::test] async fn get_common_data() { let mut client: armonik::client::AgentClient< armonik::api::v3::agent::agent_server::AgentServer, > = armonik::Client::with_channel(Service::default().agent_server()).agent(); let response = client .call(agent::get_common_data::Request { communication_token: String::from("rpc-get-common-data-input"), result_id: String::from(""), }) .await .unwrap(); assert_eq!(response.result_id, "rpc-get-common-data-output"); } #[tokio::test] async fn get_direct_data() { let mut client: armonik::client::AgentClient< armonik::api::v3::agent::agent_server::AgentServer, > = armonik::Client::with_channel(Service::default().agent_server()).agent(); let response = client .call(agent::get_direct_data::Request { communication_token: String::from("rpc-get-direct-data-input"), result_id: String::from(""), }) .await .unwrap(); assert_eq!(response.result_id, "rpc-get-direct-data-output"); } #[tokio::test] async fn create_tasks() { let mut client: armonik::client::AgentClient< armonik::api::v3::agent::agent_server::AgentServer, > = armonik::Client::with_channel(Service::default().agent_server()).agent(); let response = client .create_tasks(futures::stream::iter([ agent::create_tasks::Request::InitRequest { communication_token: String::from("rpc-create-tasks-input"), request: agent::create_tasks::InitRequest { task_options: None }, }, ])) .await .unwrap(); match &response[0] { agent::create_tasks::Status::TaskInfo { task_id, .. } => { assert_eq!(task_id, "rpc-create-tasks-output"); } agent::create_tasks::Status::Error(err) => { panic!("Expected TaskInfo, but got Error({err})") } } }