// Copyright (c) 2022-2023 Yuki Kishimoto // Copyright (c) 2023-2024 Rust Nostr Developers // Distributed under the MIT software license //! Relay Pool use std::collections::HashMap; use std::future::Future; use std::sync::Arc; use std::time::Duration; use atomic_destructor::{AtomicDestructor, StealthClone}; use nostr_database::prelude::*; use tokio::sync::broadcast; pub use tokio_stream::wrappers::ReceiverStream; pub mod constants; mod error; mod inner; pub mod options; mod output; pub use self::error::Error; use self::inner::InnerRelayPool; pub use self::options::RelayPoolOptions; pub use self::output::Output; use crate::relay::flags::FlagCheck; use crate::relay::options::{FilterOptions, RelayOptions, SyncOptions}; use crate::relay::{Relay, RelayFiltering, RelayStatus}; use crate::{Reconciliation, RelayServiceFlags, SubscribeOptions}; /// Relay Pool Notification #[derive(Debug, Clone, PartialEq, Eq)] pub enum RelayPoolNotification { /// Received an [`Event`]. Does not include events sent by this client. Event { /// Relay url relay_url: RelayUrl, /// Subscription ID subscription_id: SubscriptionId, /// Event event: Box, }, /// Received a [`RelayMessage`]. Includes messages wrapping events that were sent by this client. Message { /// Relay url relay_url: RelayUrl, /// Relay Message message: RelayMessage, }, /// Relay status changed #[deprecated(since = "0.37.0")] RelayStatus { /// Relay url relay_url: RelayUrl, /// Relay Status status: RelayStatus, }, /// Authenticated to relay /// /// Authenticated { /// Relay url relay_url: RelayUrl, }, /// Shutdown Shutdown, } /// Relay Pool #[derive(Debug, Clone)] pub struct RelayPool { inner: AtomicDestructor, } impl Default for RelayPool { fn default() -> Self { Self::new(RelayPoolOptions::default()) } } impl StealthClone for RelayPool { #[inline(always)] fn stealth_clone(&self) -> Self { Self { inner: self.inner.stealth_clone(), } } } impl RelayPool { /// Create new `RelayPool` #[inline] pub fn new(opts: RelayPoolOptions) -> Self { Self::with_database(opts, Arc::new(MemoryDatabase::default())) } /// New with database #[inline] pub fn with_database(opts: RelayPoolOptions, database: D) -> Self where D: IntoNostrDatabase, { Self { inner: AtomicDestructor::new(InnerRelayPool::with_database(opts, database)), } } /// Completely shutdown pool #[inline] pub async fn shutdown(&self) -> Result<(), Error> { self.inner.shutdown().await } /// Get new **pool** notification listener /// ///
When you call this method, you subscribe to the notifications channel from that precise moment. Anything received by relay/s before that moment is not included in the channel!
#[inline] pub fn notifications(&self) -> broadcast::Receiver { self.inner.notifications() } /// Get database #[inline] pub fn database(&self) -> &Arc { &self.inner.database } /// Get relay filtering #[inline] pub fn filtering(&self) -> &RelayFiltering { &self.inner.filtering } /// Get all relays /// /// This method return all relays added to the pool, including the ones for gossip protocol or other services. #[inline] pub async fn all_relays(&self) -> HashMap { self.inner.all_relays().await } /// Get relays with `READ` or `WRITE` flags #[inline] pub async fn relays(&self) -> HashMap { self.inner.relays().await } /// Get relays that have a certain [RelayServiceFlag] enabled #[inline] pub async fn relays_with_flag( &self, flag: RelayServiceFlags, check: FlagCheck, ) -> HashMap { self.inner.relays_with_flag(flag, check).await } /// Get [`Relay`] #[inline] pub async fn relay(&self, url: U) -> Result where U: TryIntoUrl, Error: From<::Err>, { self.inner.relay(url).await } /// Add new relay /// /// If are set pool subscriptions, the new added relay will inherit them. Use `subscribe_to` method instead of `subscribe`, /// to avoid to set pool subscriptions. #[inline] pub async fn add_relay(&self, url: U, opts: RelayOptions) -> Result where U: TryIntoUrl, Error: From<::Err>, { self.inner.add_relay(url, true, opts).await } /// Try to get relay by `url` or add it to pool. /// /// Return `Some(..)` only if the relay already exists. #[inline] pub async fn get_or_add_relay( &self, url: U, inherit_pool_subscriptions: bool, opts: RelayOptions, ) -> Result, Error> where U: TryIntoUrl + Clone, Error: From<::Err>, { self.inner .get_or_add_relay(url, inherit_pool_subscriptions, opts) .await } /// Remove and disconnect relay /// /// If the relay has [`RelayServiceFlags::GOSSIP`], it will not be removed from the pool and its /// flags will be updated (remove [`RelayServiceFlags::READ`], /// [`RelayServiceFlags::WRITE`] and [`RelayServiceFlags::DISCOVERY`] flags). /// /// To fore remove a relay use [`RelayPool::force_remove_relay`]. #[inline] pub async fn remove_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, { self.inner.remove_relay(url, false).await } /// Force remove and disconnect relay /// /// Note: this method will remove the relay, also if it's in use for the gossip model or other service! #[inline] pub async fn force_remove_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, { self.inner.remove_relay(url, true).await } /// Disconnect and remove all relays /// /// This method may not remove all relays. /// Use [`RelayPool::force_remove_all_relays`] to remove every relay. #[inline] pub async fn remove_all_relays(&self) -> Result<(), Error> { self.inner.remove_all_relays(false).await } /// Disconnect and force remove all relays #[inline] pub async fn force_remove_all_relays(&self) -> Result<(), Error> { self.inner.remove_all_relays(true).await } /// Connect to all added relays and keep connection alive #[inline] pub async fn connect(&self, connection_timeout: Option) { self.inner.connect(connection_timeout).await } /// Disconnect from all relays #[inline] pub async fn disconnect(&self) -> Result<(), Error> { self.inner.disconnect().await } /// Connect to relay #[inline] pub async fn connect_relay( &self, url: U, connection_timeout: Option, ) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, { self.inner.connect_relay(url, connection_timeout).await } /// Disconnect relay #[inline] pub async fn disconnect_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, { self.inner.disconnect_relay(url).await } /// Get subscriptions #[inline] pub async fn subscriptions(&self) -> HashMap> { self.inner.subscriptions().await } /// Get subscription #[inline] pub async fn subscription(&self, id: &SubscriptionId) -> Option> { self.inner.subscription(id).await } /// Register subscription in the [RelayPool] /// /// When a new relay will be added, saved subscriptions will be automatically used for it. #[inline] pub async fn save_subscription(&self, id: SubscriptionId, filters: Vec) { self.inner.save_subscription(id, filters).await } /// Send client message to specific relays /// /// Note: **the relays must already be added!** #[inline] pub async fn send_msg_to(&self, urls: I, msg: ClientMessage) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner.send_msg_to(urls, msg).await } /// Send multiple client messages at once to specific relays /// /// Note: **the relays must already be added!** #[inline] pub async fn batch_msg_to( &self, urls: I, msgs: Vec, ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner.batch_msg_to(urls, msgs).await } /// Send event to all relays with `WRITE` flag (check [`RelayServiceFlags`] for more details). #[inline] pub async fn send_event(&self, event: Event) -> Result, Error> { self.inner.send_event(event).await } /// Send multiple events at once to all relays with `WRITE` flag (check [`RelayServiceFlags`] for more details). #[inline] pub async fn batch_event(&self, events: Vec) -> Result, Error> { self.inner.batch_event(events).await } /// Send event to specific relays #[inline] pub async fn send_event_to(&self, urls: I, event: Event) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner.send_event_to(urls, event).await } /// Send multiple events at once to specific relays #[inline] pub async fn batch_event_to( &self, urls: I, events: Vec, ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner.batch_event_to(urls, events).await } /// Subscribe to filters to all relays with `READ` flag. /// /// ### Auto-closing subscription /// /// It's possible to automatically close a subscription by configuring the [SubscribeOptions]. /// /// Note: auto-closing subscriptions aren't saved in subscriptions map! #[inline] pub async fn subscribe( &self, filters: Vec, opts: SubscribeOptions, ) -> Result, Error> { self.inner.subscribe(filters, opts).await } /// Subscribe to filters with custom [SubscriptionId] to all relays with `READ` flag. /// /// ### Auto-closing subscription /// /// It's possible to automatically close a subscription by configuring the [SubscribeOptions]. /// /// Note: auto-closing subscriptions aren't saved in subscriptions map! #[inline] pub async fn subscribe_with_id( &self, id: SubscriptionId, filters: Vec, opts: SubscribeOptions, ) -> Result, Error> { self.inner.subscribe_with_id(id, filters, opts).await } /// Subscribe to filters to specific relays /// /// ### Auto-closing subscription /// /// It's possible to automatically close a subscription by configuring the [SubscribeOptions]. #[inline] pub async fn subscribe_to( &self, urls: I, filters: Vec, opts: SubscribeOptions, ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner.subscribe_to(urls, filters, opts).await } /// Subscribe to filters with custom [SubscriptionId] to specific relays /// /// ### Auto-closing subscription /// /// It's possible to automatically close a subscription by configuring the [SubscribeOptions]. #[inline] pub async fn subscribe_with_id_to( &self, urls: I, id: SubscriptionId, filters: Vec, opts: SubscribeOptions, ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner .subscribe_with_id_to(urls, id, filters, opts) .await } /// Targeted subscription /// /// Subscribe to specific relays with specific filters #[inline] pub async fn subscribe_targeted( &self, id: SubscriptionId, targets: I, opts: SubscribeOptions, ) -> Result, Error> where I: IntoIterator)>, U: TryIntoUrl, Error: From<::Err>, { self.inner.subscribe_targeted(id, targets, opts).await } /// Unsubscribe from subscription #[inline] pub async fn unsubscribe(&self, id: SubscriptionId) { self.inner.unsubscribe(id).await } /// Unsubscribe from all subscriptions #[inline] pub async fn unsubscribe_all(&self) { self.inner.unsubscribe_all().await } /// Sync events with relays (negentropy reconciliation) #[inline] pub async fn sync( &self, filter: Filter, opts: &SyncOptions, ) -> Result, Error> { self.inner.sync(filter, opts).await } /// Sync events with specific relays (negentropy reconciliation) #[inline] pub async fn sync_with( &self, urls: I, filter: Filter, opts: &SyncOptions, ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner.sync_with(urls, filter, opts).await } /// Sync events with specific relays and filters (negentropy reconciliation) #[inline] pub async fn sync_targeted( &self, targets: I, opts: &SyncOptions, ) -> Result, Error> where I: IntoIterator>)>, U: TryIntoUrl, Error: From<::Err>, { self.inner.sync_targeted(targets, opts).await } /// Fetch events from relays with [`RelayServiceFlags::READ`] flag. #[inline] pub async fn fetch_events( &self, filters: Vec, timeout: Duration, opts: FilterOptions, ) -> Result { self.inner.fetch_events(filters, timeout, opts).await } /// Fetch events from specific relays #[inline] pub async fn fetch_events_from( &self, urls: I, filters: Vec, timeout: Duration, opts: FilterOptions, ) -> Result where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner .fetch_events_from(urls, filters, timeout, opts) .await } /// Stream events from relays with `READ` flag. #[inline] pub async fn stream_events( &self, filters: Vec, timeout: Duration, opts: FilterOptions, ) -> Result, Error> { self.inner.stream_events(filters, timeout, opts).await } /// Stream events from specific relays #[inline] pub async fn stream_events_from( &self, urls: I, filters: Vec, timeout: Duration, opts: FilterOptions, ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { self.inner .stream_events_from(urls, filters, timeout, opts) .await } /// Targeted streaming events /// /// Stream events from specific relays with specific filters pub async fn stream_events_targeted( &self, source: I, timeout: Duration, opts: FilterOptions, ) -> Result, Error> where I: IntoIterator)>, U: TryIntoUrl, Error: From<::Err>, { self.inner .stream_events_targeted(source, timeout, opts) .await } /// Handle notifications pub async fn handle_notifications(&self, func: F) -> Result<(), Error> where F: Fn(RelayPoolNotification) -> Fut, Fut: Future>, { let mut notifications = self.notifications(); while let Ok(notification) = notifications.recv().await { let shutdown: bool = RelayPoolNotification::Shutdown == notification; let exit: bool = func(notification) .await .map_err(|e| Error::Handler(e.to_string()))?; if exit || shutdown { break; } } Ok(()) } } #[cfg(test)] mod tests { use nostr_relay_builder::MockRelay; use super::*; #[tokio::test] async fn test_shutdown() { let mock = MockRelay::run().await.unwrap(); let url = mock.url(); let pool = RelayPool::default(); pool.add_relay(&url, RelayOptions::default()).await.unwrap(); pool.connect(None).await; assert!(!pool.inner.is_shutdown()); tokio::time::sleep(Duration::from_secs(1)).await; pool.shutdown().await.unwrap(); assert!(pool.inner.is_shutdown()); assert!(matches!( pool.add_relay(&url, RelayOptions::default()) .await .unwrap_err(), Error::Shutdown )); } }