use fundamentum_edge_proto::com::fundamentum::edge::v1::{ action_response::{OngoingStatus, Status}, actions_client::ActionsClient, ActionRequest, ActionResponse, Qos, }; use tonic::{transport::Channel, Request, Streaming}; async fn subscribe_actions_stream() -> Result, Box> { let mut client = ActionsClient::connect("http://127.0.0.1:8080").await?; let actions = client.subscribe(Request::new(())).await?.into_inner(); Ok(actions) } async fn update_action_status( client: &mut ActionsClient, action_request: ActionRequest, ) -> Result<(), Box> { for (i, sn) in action_request.target_devices.into_iter().enumerate() { let action_response = ActionResponse { serial_numbers: vec![sn], id: action_request.id, status: Some(Status::Ongoing(OngoingStatus { progress: i as u32 * 10, })), message: "In progress...".to_owned(), payload: action_request.payload.clone(), qos: Some(i32::from(Qos::AtLeastOnce)), }; println!("Sending: {:?}", action_response); let _response = client.update_status(Request::new(action_response)).await?; } Ok(()) } #[tokio::main] async fn main() -> Result<(), Box> { let mut client = ActionsClient::connect("http://127.0.0.1:8080").await?; let mut actions_stream = subscribe_actions_stream().await?; while let Some(action_request) = actions_stream.message().await? { println!("Received: {:?}", action_request); update_action_status(&mut client, action_request).await?; } Ok(()) }