// SPDX-FileCopyrightText: The infect authors // SPDX-License-Identifier: MPL-2.0 use std::fmt; use futures_channel::mpsc; use crate::Message; /// Message sender for submitting messages pub type MessageSender = mpsc::Sender>; /// Message receiver for consuming messages pub type MessageReceiver = mpsc::Receiver>; /// Buffered, MPSC message channel pub type MessageChannel = ( MessageSender, MessageReceiver, ); /// Create a buffered, MPSC message channel with limited capacity /// /// FIFO queue of sent messages that are consumed by a single /// [`MessageReceiver`]. #[must_use] pub fn message_channel( capacity: usize, ) -> ( MessageSender, MessageReceiver, ) { mpsc::channel(capacity) } /// Domain-specific wrapper around a [`MessageSender`] #[derive(Debug)] pub struct MessagePort { message_tx: MessageSender, } impl MessagePort { /// Create a new instance #[must_use] pub fn new(message_tx: MessageSender) -> Self { Self { message_tx } } /// Obtain the inner [`MessageSender`] for the channel #[must_use] pub fn into_inner(self) -> MessageSender { let Self { message_tx } = self; message_tx } } impl MessagePort where Intent: fmt::Debug, Effect: fmt::Debug, { /// Enqueue a message into the channel /// /// A utility function that detects and logs unexpected send failures /// that the submitter should not be bothered with. /// /// Submitting a message is a fire-and-forget operation that must /// always succeed. The framework is responsible for dealing with /// unexpected failures. pub fn submit_message(&mut self, message: impl Into>) { let message = message.into(); log::debug!("Sending message: {message:?}"); if let Err(err) = self.message_tx.try_send(message) { if err.is_disconnected() { // No receiver log::debug!( "Dropping message - channel is closed: {message:?}", message = err.into_inner() ); } else if err.is_full() { log::warn!( "Dropping message - channel is full: {message:?}", message = err.into_inner() ); } else { // This code should be unreachable log::error!("Failed to send message: {err}"); } } } /// Submit an intent /// /// See also: [`Self::submit_message`] pub fn submit_intent(&mut self, intent: impl Into) { self.submit_message(Message::Intent(intent.into())); } /// Submit an effect /// /// See also: [`Self::submit_message`] pub fn submit_effect(&mut self, effect: impl Into) { self.submit_message(Message::Effect(effect.into())); } } impl Clone for MessagePort { fn clone(&self) -> Self { let Self { message_tx } = self; let message_tx = message_tx.clone(); Self { message_tx } } }