#![allow(async_fn_in_trait)] use std::{ borrow::Cow, future::Future, sync::{Arc, LazyLock}, }; use dashmap::DashSet; use super::{ batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}, fuzzy::RedisFuzzy, pubsub::{pubsub_global::RedisPubSubGlobal, RedisChannelListener}, RedisJson, }; use crate::{log::record_exception, redis::RedisScript}; /// A lazy redis connection. #[derive(Debug, Clone)] pub struct RedisConn<'a> { pub(crate) prefix: &'a str, pool: &'a deadpool_redis::Pool, // It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection: pubsub_global: &'a Arc, scripts_loaded: &'a Arc>, // We used to cache the [`deadpool_redis::Connection`] conn in here, // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. // so getting for each usage is fine, given: // - most conns will probably be used once anyway, e.g. get a conn in a handler and do some caching or whatever in a batch(). // - prevents needing mutable references to the conn anymore, much nicer ergonomics. // - prevents the chance of a stale cached connection, had issues before with this, deadpool handles internally. // conn: Option, } impl<'a> RedisConn<'a> { pub(crate) fn new( pool: &'a deadpool_redis::Pool, prefix: &'a str, pubsub_global: &'a Arc, scripts_loaded: &'a Arc>, ) -> Self { Self { pool, prefix, pubsub_global, scripts_loaded, } } } /// An owned variant of [`RedisConn`]. /// Just requires a couple of Arc clones, so still quite lightweight. #[derive(Debug, Clone)] pub struct RedisConnOwned { // Prefix and pool both Arced now at top level for easy cloning. pub(crate) prefix: Arc, pool: deadpool_redis::Pool, // It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection: pubsub_global: Arc, scripts_loaded: Arc>, // We used to cache the [`deadpool_redis::Connection`] conn in here, // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. // so getting for each usage is fine, given: // - most conns will probably be used once anyway, e.g. get a conn in a handler and do some caching or whatever in a batch(). // - prevents needing mutable references to the conn anymore, much nicer ergonomics. // - prevents the chance of a stale cached connection, had issues before with this, deadpool handles internally. // conn: Option, } /// Generic methods over the RedisConn and RedisConnOwned types. pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// Get an internal connection from the pool. /// Despite returning an owned object, the underlying real redis connection will be reused after this user drops it. /// If redis is acting up and unavailable, this will return None. /// NOTE: this mainly is used internally, but provides a fallback to the underlying connection, if the exposed interface does not provide options that fit an external user need (which could definitely happen). async fn get_inner_conn(&self) -> Option; /// Get the redis configured prefix. fn prefix(&self) -> &str; /// Get the scripts_loaded dashset of script hashes. fn scripts_loaded(&self) -> &Arc>; /// Get the redis pubsub global manager. fn _pubsub_global(&self) -> &Arc; /// Convert to the owned variant. fn to_conn_owned(&self) -> RedisConnOwned; /// Ping redis, returning true if it's up and responsive. async fn ping(&self) -> bool { self.batch() .custom::>("PING") .fire() .await .flatten() .is_some() } /// Subscribe to a channel via pubsub, receiving messages through the returned receiver. /// The subscription will be dropped when the receiver is dropped. /// /// Sending can be done via normal batches using [`RedisBatch::publish`]. /// /// Returns None when redis unavailable for some reason, after a few seconds of trying to connect. async fn subscribe( &self, namespace: &str, channel: &str, ) -> Option> { self._pubsub_global() .subscribe(self.final_key(namespace, channel.into())) .await } /// Subscribe to a channel pattern via pubsub, receiving messages through the returned receiver. /// The subscription will be dropped when the receiver is dropped. /// /// Sending can be done via normal batches using [`RedisBatch::publish`]. /// /// Returns None when redis unavailable for some reason, after a few seconds of trying to connect. /// /// According to redis (): /// Supported glob-style patterns: /// - h?llo subscribes to hello, hallo and hxllo /// - h*llo subscribes to hllo and heeeello /// - h[ae]llo subscribes to hello and hallo, but not hillo async fn psubscribe( &self, namespace: &str, channel_pattern: &str, ) -> Option> { self._pubsub_global() .psubscribe(self.final_key(namespace, channel_pattern.into())) .await } // Commented out as untested, not sure if works. // /// Get all data from redis, only really useful during testing. // /// // async fn dev_all_data(&self) -> HashMap { // if let Some(mut conn) = self.get_inner_conn().await { // let mut cmd = redis::cmd("SCAN"); // cmd.arg(0); // let mut data = HashMap::new(); // loop { // let (next_cursor, keys): (i64, Vec) = cmd.query_async(&mut conn).await.unwrap(); // for key in keys { // let val: redis::Value = // redis::cmd("GET").arg(&key).query_async(&mut conn).await.unwrap(); // data.insert(key, val); // } // if next_cursor == 0 { // break; // } // cmd.arg(next_cursor); // } // data // } else { // HashMap::new() // } // } /// Flush the whole redis cache, will delete all data. /// Returns the resulting string from the command, or None if failed for some reason. async fn dev_flushall(&self, sync: bool) -> Option { let mut batch = self.batch().custom::>("FLUSHALL"); if sync { batch = batch.custom_arg("SYNC"); } else { batch = batch.custom_arg("ASYNC"); } batch.fire().await.flatten() } /// A simple rate_limiter/backoff helper. /// Can be used to protect against repeated attempts in quick succession. /// Once `start_delaying_after_attempt` is hit, the operation delay will multiplied by the multiplier each time. /// Only once no call is made for the duration of the current delay (so current delay doubled) will the attempt number reset to zero. /// /// Arguments: /// - `namespace`: A unique identifier for the endpoint, e.g. user-login. /// - `caller_identifier`: A unique identifier for the caller, e.g. a user id. /// - `start_delaying_after_attempt`: The number of attempts before the delays start being imposed. /// - `initial_delay`: The initial delay to impose. /// - `multiplier`: The multiplier to apply, `(attempt-start_delaying_after_attempt) * multiplier * initial_delay = delay`. /// /// Returns: /// - `None`: Continue with the operation. /// - `Some`: Retry after the duration. async fn rate_limiter( &self, namespace: &str, caller_identifier: &str, start_delaying_after_attempt: usize, initial_delay: chrono::Duration, multiplier: f64, ) -> Option { static LUA_BACKOFF_SCRIPT: LazyLock = LazyLock::new(|| RedisScript::new(include_str!("lua_scripts/backoff_protector.lua"))); let final_key = self.final_key(namespace, caller_identifier.into()); let result = self .batch() .script::( LUA_BACKOFF_SCRIPT .invoker() .key(final_key) .arg(start_delaying_after_attempt) .arg(initial_delay.num_milliseconds()) .arg(multiplier), ) .fire() .await .flatten(); if let Some(result) = result { if result > 0 { Some(chrono::Duration::milliseconds(result)) } else { None } } else { None } } /// Get a new [`RedisBatch`] for this connection that commands can be piped together with. fn batch(&self) -> RedisBatch<'_, '_, Self, ()> { RedisBatch::new(self) } /// Redis keys are all prefixed, use this to finalise a namespace outside of built in commands, e.g. for use in a custom script. #[inline] fn final_namespace(&self, namespace: &str) -> String { format!("{}:{}", self.prefix(), namespace) } /// Redis keys are all prefixed, use this to finalise a key outside of built in commands, e.g. for use in a custom script. #[inline] fn final_key(&self, namespace: &str, key: Cow<'_, str>) -> String { format!("{}:{}", self.final_namespace(namespace), key) } /// Cache an async function in redis with an optional expiry. /// If already stored, the cached value will be returned, otherwise the function will be stored in redis for next time. /// /// If redis is unavailable, or the existing contents at the key is wrong, the function output will be used. /// The only error coming out of here should be something wrong with the external callback. /// /// Expiry accurate to a millisecond. /// /// Uses serde internally instead of from/to redis for ease of use. #[inline] async fn cached_fn<'b, T, E, Fut, K: Into>>( &self, namespace: &str, key: K, expiry: Option, cb: impl FnOnce() -> Fut, ) -> Result where T: serde::Serialize + serde::de::DeserializeOwned, Fut: Future>, { let key: Cow<'b, str> = key.into(); let cached = self .batch() .get::>(namespace, &key) .fire() .await .flatten(); if let Some(cached) = cached { Ok(cached.0) } else { let val = cb().await?; let wrapped = RedisJson(val); self.batch() .set(namespace, &key, &wrapped, expiry) .fire() .await; Ok(wrapped.0) } } } impl<'a> RedisConnLike for RedisConn<'a> { async fn get_inner_conn(&self) -> Option { match self.pool.get().await { Ok(conn) => Some(conn), Err(e) => { record_exception("Failed to get redis connection.", format!("{:?}", e)); None } } } fn prefix(&self) -> &str { self.prefix } fn _pubsub_global(&self) -> &Arc { self.pubsub_global } fn to_conn_owned(&self) -> RedisConnOwned { RedisConnOwned { prefix: Arc::new(self.prefix.to_string()), pool: self.pool.clone(), pubsub_global: self.pubsub_global.clone(), scripts_loaded: self.scripts_loaded.clone(), } } fn scripts_loaded(&self) -> &Arc> { self.scripts_loaded } } impl RedisConnLike for RedisConnOwned { async fn get_inner_conn(&self) -> Option { match self.pool.get().await { Ok(conn) => Some(conn), Err(e) => { record_exception("Failed to get redis connection.", format!("{:?}", e)); None } } } fn prefix(&self) -> &str { &self.prefix } fn _pubsub_global(&self) -> &Arc { &self.pubsub_global } fn to_conn_owned(&self) -> RedisConnOwned { self.clone() } fn scripts_loaded(&self) -> &Arc> { &self.scripts_loaded } }