use actix_web::{get, http::StatusCode, post, web, App, HttpResponse, HttpServer, Responder}; use futures_util::StreamExt; use rustis::{ client::Client, commands::{ListCommands, PubSubCommands}, }; use std::{fmt, net::SocketAddr, time::Duration}; const POLL_TIMEOUT: Duration = Duration::from_secs(10); pub struct RedisClients { /// Redis client for regular operations pub regular: Client, /// Redis client for subscriptions pub sub: Client, } #[tokio::main] async fn main() -> std::io::Result<()> { // build rustis client in multiplexer mode (a unique rustis instance for all actix workers) // build a separated rustis client for subscriptions only in multiplexer mode (a unique rustis instance for all actix workers) let redis_uri = "redis://127.0.0.1:6379"; let redis_clients = web::Data::new(RedisClients { regular: Client::connect(redis_uri).await.unwrap(), sub: Client::connect(redis_uri).await.unwrap(), }); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); println!("listening on {}", addr); HttpServer::new(move || { App::new() .app_data(redis_clients.clone()) .service(poll_messages) .service(publish) }) .bind(addr)? .run() .await } #[get("/{channel}")] async fn poll_messages( redis: web::Data, channel: web::Path, ) -> Result { let channel = channel.into_inner(); let messages = get_messages_from_queue(&redis.regular, &channel).await?; if POLL_TIMEOUT.is_zero() || !messages.is_empty() { return Ok(web::Json(messages)); } let mut sub_stream = redis.sub.subscribe(&channel).await?; let msg = tokio::time::timeout(POLL_TIMEOUT, sub_stream.next()).await; let messages: Vec = match msg { Ok(Some(Ok(_msg))) => get_messages_from_queue(&redis.regular, &channel).await?, Ok(Some(Err(e))) => { return Err(ServiceError::new( StatusCode::INTERNAL_SERVER_ERROR, format!("error received from PubSubStream: {e}"), )) } // stream closed Ok(None) => Vec::new(), // timeout Err(_e) => Vec::new(), }; Ok(web::Json(messages)) } async fn get_messages_from_queue( redis: &Client, channel: &str, ) -> Result, ServiceError> { Ok(redis.lpop(channel, i32::MAX as usize).await?) } #[post("/{channel}")] async fn publish( redis: web::Data, channel: web::Path, message: Option, ) -> Result { let Some(message) = message else { return Err(ServiceError::new( StatusCode::BAD_REQUEST, "Message not provided", )) }; let channel = channel.into_inner(); // data is not sent via pub/sub; the pub/sub API is used only to notify subscriber to check for new notifications // the actual data is pushed into a list used as a queue redis.regular.lpush(&channel, &message).await?; redis.regular.publish(&channel, "new").await?; Ok(HttpResponse::Ok()) } #[derive(Debug)] struct ServiceError(StatusCode, String); impl ServiceError { fn new(status_code: StatusCode, description: impl ToString) -> Self { Self(status_code, description.to_string()) } } impl fmt::Display for ServiceError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&self.1) } } impl actix_web::error::ResponseError for ServiceError { fn status_code(&self) -> actix_web::http::StatusCode { self.0 } } impl From for ServiceError { fn from(e: rustis::Error) -> Self { eprintln!("rustis error: {e}"); ServiceError::new(StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error") } }