use crate::functions::{CachedScript, RsmqFunctions}; use crate::r#trait::RsmqConnection; use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes}; use crate::{RsmqError, RsmqResult}; use core::convert::TryFrom; use core::marker::PhantomData; use std::sync::Arc; use std::time::Duration; use tokio::runtime::Runtime; #[derive(Clone)] struct RedisConnection(redis::aio::MultiplexedConnection); impl std::fmt::Debug for RedisConnection { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "MultiplexedRedisAsyncConnnection") } } #[derive(Debug, Clone)] pub struct RsmqSync { connection: RedisConnection, functions: RsmqFunctions, runner: Arc, scripts: CachedScript, } impl RsmqSync { /// Creates a new RSMQ instance, including its connection pub async fn new(options: RsmqOptions) -> RsmqResult { let runner = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|e| RsmqError::TokioStart(e.into()))?; let conn_info = redis::ConnectionInfo { addr: redis::ConnectionAddr::Tcp(options.host, options.port), redis: redis::RedisConnectionInfo { db: options.db.into(), username: options.username, password: options.password, }, }; let client = redis::Client::open(conn_info)?; let functions = RsmqFunctions { ns: options.ns, realtime: options.realtime, conn: PhantomData, }; let (connection, scripts) = runner.block_on(async { let mut conn = client.get_multiplexed_async_connection().await?; let scripts = functions.load_scripts(&mut conn).await?; Result::<_, RsmqError>::Ok((conn, scripts)) })?; Ok(RsmqSync { connection: RedisConnection(connection), functions, runner: Arc::new(runner), scripts, }) } } #[async_trait::async_trait] impl RsmqConnection for RsmqSync { async fn change_message_visibility( &mut self, qname: &str, message_id: &str, hidden: Duration, ) -> RsmqResult<()> { self.runner.block_on(async { self.functions .change_message_visibility( &mut self.connection.0, qname, message_id, hidden, &self.scripts, ) .await }) } async fn create_queue( &mut self, qname: &str, hidden: Option, delay: Option, maxsize: Option, ) -> RsmqResult<()> { self.runner.block_on(async { self.functions .create_queue(&mut self.connection.0, qname, hidden, delay, maxsize) .await }) } async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult { self.runner.block_on(async { self.functions .delete_message(&mut self.connection.0, qname, id) .await }) } async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> { self.runner.block_on(async { self.functions .delete_queue(&mut self.connection.0, qname) .await }) } async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult { self.runner.block_on(async { self.functions .get_queue_attributes(&mut self.connection.0, qname) .await }) } async fn list_queues(&mut self) -> RsmqResult> { self.runner .block_on(async { self.functions.list_queues(&mut self.connection.0).await }) } async fn pop_message>>( &mut self, qname: &str, ) -> RsmqResult>> { self.runner.block_on(async { self.functions .pop_message::(&mut self.connection.0, qname, &self.scripts) .await }) } async fn receive_message>>( &mut self, qname: &str, hidden: Option, ) -> RsmqResult>> { self.runner.block_on(async { self.functions .receive_message::(&mut self.connection.0, qname, hidden, &self.scripts) .await }) } async fn send_message + Send>( &mut self, qname: &str, message: E, delay: Option, ) -> RsmqResult { self.runner.block_on(async { self.functions .send_message(&mut self.connection.0, qname, message, delay) .await }) } async fn set_queue_attributes( &mut self, qname: &str, hidden: Option, delay: Option, maxsize: Option, ) -> RsmqResult { self.runner.block_on(async { self.functions .set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize) .await }) } }