use std::sync::Arc; use redis::from_owned_redis_value; use crate::{log::record_exception, redis::RedisJson}; use super::pubsub_global::ChannelSubscription; /// A listener to receive messages from a redis channel via pubsub. pub struct RedisChannelListener { pub(crate) on_drop_tx: Arc>, pub(crate) key: u64, pub(crate) channel_sub: ChannelSubscription, pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver, pub(crate) _t: std::marker::PhantomData, } impl RedisChannelListener { /// Get a new message from the channel. /// The outer None indicates the channel has been closed erroneously, or the internal data could not be coerced to the target type. /// In either case, something's gone wrong, an exception will probably have been recorded too. pub async fn recv(&mut self) -> Option { if let Some(v) = self.rx.recv().await { match from_owned_redis_value::>(v) { Ok(v) => Some(v.0), Err(e) => { record_exception( format!( "Failed to convert redis value to target type '{}'", std::any::type_name::() ), format!("{:?}", e), ); None } } } else { None } } } /// Tell the global pubsub manager this listener is being dropped. impl Drop for RedisChannelListener { fn drop(&mut self) { let _ = self.on_drop_tx.send((self.channel_sub.clone(), self.key)); } }