| Crates.io | spark-channel |
| lib.rs | spark-channel |
| version | 0.0.3 |
| created_at | 2025-05-05 08:00:16.946975+00 |
| updated_at | 2025-05-08 03:01:50.139198+00 |
| description | A generic channel listener implementation for Spark Rust SDK |
| homepage | |
| repository | https://github.com/polarityorg/spark-rs |
| max_upload_size | |
| id | 1660363 |
| size | 45,856 |
A generic event listener and message dispatcher for building event-driven architectures in Rust.
Spark Channel is a library that enables modules to communicate with each other through an asynchronous message-passing architecture. It provides a foundation for implementing event-driven systems with support for:
The library is designed to be generic and can be used with any message and response types that satisfy the required trait bounds.
Spark Channel is built on three main components:
The SparkChannelCancellationToken is a wrapper around Tokio's CancellationToken. It implements the SparkChannelCancellationTrait, which provides a method to cancel the execution of a running task. Please note that SparkChannelCancellationTrait is a very loose trait. If you are implementing your own cancellation mechanism, this trait won't serve as a functional baseline.
The callback module provides types for handling one-shot responses:
CallbackSender<V>: A type alias for Tokio's oneshot::Sender<V>CallbackWrapper<T, V>: A struct that wraps a message of type T with a sender for a response of type VThe listener module is the core of the library and provides:
SparkGenericModuleMessage: An enum that represents different types of messages that can be sent:
Request: A message that expects a responseCommand: A fire-and-forget messageShutdown: A message to stop the module serverSparkGenericModuleDispatcher: A struct that provides methods to send messages to a module server:
request: Sends a request and awaits a responsesend_command: Sends a command without expecting a responseSparkGenericModuleHandler: A trait that defines how module servers handle messages:
handle_request: Processes a request and returns a Result<Response, Error>handle_command: Processes a command and returns a Result<(), Error>run_module_server: A function that runs a module server, receiving messages and delegating them to a handler
Here's how messages flow through the system:
Request/Response Flow:
request methodCallbackWrapper with the message and a oneshot senderhandle_request methodResult<Response, Error>Command Flow:
send_command methodhandle_command methodResult<(), Error>Shutdown Flow:
Shutdown message with a cancellation tokenThe library uses generics extensively to provide flexibility:
Message: The type of messages sent between modulesResponse: The type of responses returned by request handlersCancellationToken: The type used for cancellation (must implement SparkChannelCancellationTrait)Error: The error type returned by handlersThe library has a sophisticated error handling mechanism:
Result<Response, Error> or Result<(), Error> typesIntoResult trait allows for converting between different result typeseyre::Result and custom SparkChannelError are providedThe dispatcher methods return results to handle various failure scenarios:
The library uses Rust's Any trait to support downcasting of response types. This enables handling different response types for different requests while maintaining type safety. The request method automatically handles downcasting the received response to the expected type.
Here's a simplified example of how to use Spark Channel:
// Define message and response types
enum MyMessage {
GetData(String),
UpdateData(String, u32),
LogEvent(String),
}
enum MyResponse {
Data(Vec<u32>),
Success(bool),
}
// Implement handler
struct MyHandler {
// Handler state...
}
#[async_trait]
impl SparkGenericModuleHandler<MyMessage, MyResponse, eyre::Error> for MyHandler {
async fn handle_request(&mut self, request: MyMessage) -> Result<MyResponse, eyre::Error> {
match request {
MyMessage::GetData(key) => {
// Process request and get data...
let data = vec![1, 2, 3]; // Example data
Ok(MyResponse::Data(data))
},
MyMessage::UpdateData(key, value) => {
// Update data...
let success = true; // Example result
Ok(MyResponse::Success(success))
},
MyMessage::LogEvent(_) => {
// This shouldn't happen as LogEvent is a command
Err(eyre::eyre!("LogEvent received as request"))
},
}
}
async fn handle_command(&mut self, command: MyMessage) -> Result<(), eyre::Error> {
if let MyMessage::LogEvent(event) = command {
// Log the event...
println!("Event logged: {}", event);
Ok(())
} else {
Err(eyre::eyre!("Unexpected command type"))
}
}
}
// Create and run the server
async fn start_server() {
// Create channel
let (tx, rx) = mpsc::channel(32);
// Create handler
let handler = MyHandler { /* ... */ };
// Create dispatcher
let dispatcher = SparkGenericModuleDispatcher::new(tx);
// Spawn server
tokio::spawn(run_module_server(handler, rx));
// Return dispatcher to client code
// ...
}
// Client usage
async fn client_code(dispatcher: &SparkGenericModuleDispatcher<MyMessage, MyResponse, SparkChannelCancellationToken, eyre::Error>) {
// Send a request
let response = dispatcher
.request(MyMessage::GetData("user123".to_string()))
.await
.unwrap();
// Typically you'd match on the response type
if let MyResponse::Data(data) = response {
println!("Received data: {:?}", data);
}
// Send a command
dispatcher
.send_command::<_, eyre::Result<()>>(MyMessage::LogEvent("User logged in".to_string()))
.await
.unwrap();
// Shutdown the server
let token = SparkChannelCancellationToken::new();
dispatcher
.sender
.send(SparkGenericModuleMessage::Shutdown(token))
.await
.unwrap();
}
Message Design:
Response Handling:
Concurrency:
Testing:
If you need to share state between multiple handlers or between a handler and other parts of your application, consider using:
Arc<Mutex<T>> for shared mutable stateArc<RwLock<T>> for read-heavy shared stateArc<T> for immutable shared stateFor complex applications, you might want to route different message types to different handlers:
To handle high message volumes: