use std::pin::Pin; use fundamentum_edge_proto::com::fundamentum::edge::v1::{ actions_server::{Actions, ActionsServer}, ActionRequest, ActionResponse, }; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::transport::Server; use tonic::{Request, Response, Status}; #[derive(Default)] pub struct ActionsService {} impl ActionsService { pub fn new() -> Self { Self {} } } #[tonic::async_trait] impl Actions for ActionsService { type SubscribeStream = Pin> + Send + 'static>>; async fn subscribe(&self, _: Request<()>) -> Result, Status> { let (tx, rx) = mpsc::channel(5); tokio::spawn(async move { for (i, devices) in (1..=12) .map(|n| format!("device{n}")) .collect::>() .chunks(3) .map(|c| c.to_vec()) .enumerate() { let action_request = ActionRequest { id: i as u64, target_devices: devices, r#type: 1, version: 1, payload: vec![1, 2, 3], }; println!("Sending: {:?}", action_request); tx.send(Ok(action_request)).await.unwrap(); } }); Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) } async fn update_status( &self, request: Request, ) -> Result, Status> { let action_response = request.into_inner(); println!("Received: {:?}", action_response); Ok(Response::new(())) } } #[tokio::main] async fn main() -> Result<(), Box> { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let actions_svc = ActionsService::new(); let router = Server::builder().add_service(ActionsServer::new(actions_svc)); router.serve(socket_addr).await?; Ok(()) }