| Crates.io | manager_handlers |
| lib.rs | manager_handlers |
| version | 0.6.1 |
| created_at | 2024-09-04 14:29:04.934328+00 |
| updated_at | 2025-03-21 12:35:45.508283+00 |
| description | A microservice manager implementation that creates HTTP-accessible handlers with configurable replicas. Handlers communicate via an internal bus, enabling collaborative request processing in a distributed architecture. |
| homepage | |
| repository | |
| max_upload_size | |
| id | 1363388 |
| size | 144,790 |
Manager is a scalable, async-driven system that handles requests and communication between different components using a pub/sub model in Rust. It is built on top of the Actix Web framework for HTTP server handling and utilizes the Tokio runtime for asynchronous tasks. This crate allows you to create handlers that process requests and handle messages via a message bus, making it highly modular and easy to extend.
nr request at the time from HTTP requests. Handlers inside the service can replicate as many times as they are configured.Before you begin, ensure you have met the following requirements:
To use this crate in your project, add the following dependencies to your Cargo.toml file:
[dependencies]
async-trait = "0.1"
manager-handlers = "0.6.1"
Handlers process incoming requests and communicate with other handlers. Implement the Base trait to create custom handlers.
The MultiBus facilitates communication between handlers using publish/subscribe patterns.
A thread-safe storage mechanism accessible by all handlers, supporting various data types and callable functions.
To create a custom handler, you need to implement the Base trait:
use async_trait::async_trait;
use std::sync::Arc;
use futures::future::{BoxFuture, FutureExt};
use std::time::Duration;
use tokio::time::sleep;
use manager_handlers::multibus::MultiBus;
use manager_handlers::manager::{StateType, SharedState, Base};
pub struct MyHandler {
communication_line: Arc<MultiBus>,
shared_state: Arc<SharedState>
};
#[async_trait]
impl Base for MyHandler {
async fn run(&self, src: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Process the incoming data
println!("Received data: {}", data);
// Example: Publishing a message and awaiting response
let response = self.publish(
data.clone(),
"other_handler".to_string()
).await;
// Example: Fire-and-forget dispatch
self.dispatch(
"notification data".to_string(),
"notification_handler".to_string()
).await;
// Example: Store a value in shared state
self.shared_state.insert(&"counter".to_string(), StateType::Int(42)).await;
// Example: Store a synchronous function
let shared_function: Arc<dyn Fn(String) -> String + Sync + Send> = Arc::new(|input: String| -> String {
println!("Hello, {}!", input);
input + " pesto"
});
self.shared_state.insert(&"sync_func".to_string(), StateType::FunctionSync(shared_function)).await;
// Example: Store an asynchronous function
let shared_async_function: Arc<dyn Fn(String) -> BoxFuture<'static, String> + Send + Sync> = Arc::new(|input: String| async move {
println!("Got in the async function");
sleep(Duration::from_secs(5)).await;
"Done".to_string()
}.boxed());
self.shared_state.insert(&"async_func".to_string(), StateType::FunctionAsync(shared_async_function)).await;
Ok(format!("Processed data with response: {}", response))
}
fn get_shared_state(&self) -> Arc<SharedState> {
Arc::clone(&self.shared_state)
}
fn get_communication_line(&self) -> Arc<MultiBus> {
Arc::clone(&self.communication_line)
}
fn get_name(&self) -> String {
"myhandler".to_string()
}
fn new(communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Self {
MyHandler {communication_line, shared_state}
}
}
The Manager is responsible for initializing all the handlers and launching the HTTP server:
use manager_handlers::manager::Manager;
use std::collections::HashMap;
#[tokio::main]
async fn main() {
// Create a new Manager instance
let mut manager = Manager::new_default();
// Optional: Configure TLS
manager.with_tls("path/to/cert.pem", "path/to/key.pem", Some("path/to/ca.pem"));
// Optional: Set allowed client certificate names if using client cert auth
manager.with_allowed_names(vec!["client1".to_string(), "client2".to_string()]);
// Optional: Set API key for authentication
manager.with_api_key("my-secret-api-key");
// Optional: Configure maximum concurrent requests
manager.with_max_requests(100);
// Optional: Configure keep-alive timeout
manager.with_keep_alive(30);
// Register handlers with their replica counts
manager.add_handler::<MyHandler>("my_handler", 5);
manager.add_handler::<OtherHandler>("other_handler", 2);
println!("Starting manager...");
// Start the manager
manager.start().await;
}
The manager provides three methods in the Base trait that you can implement to create handler for file management:
UploadHandler: Implement this trait to customize how files are uploaded and stored.
#[async_trait]
impl Base for UploadHandler {
async fn run_stream(&self, src: String, mut stream: Pin<Box<dyn Stream<Item=Bytes> + Send>>, file_name: String, approx_size: usize) -> Result<String, Box<dyn Error + Send + Sync>> {
todo!()
}
}
DownloadHandler: Implement this trait to customize how files are downloaded.
#[async_trait]
impl Base for DownloadHandler {
async fn run_file(&self, src: String, filename: String) -> Result<(Box<dyn AsyncRead + Send + Unpin>, u64), Box<dyn Error + Send + Sync>> {
todo!()
}
}
MetadataHandler: Implement this trait to customize how file metadata is retrieved.
#[async_trait]
impl Base for MetadataHandler {
async fn run_metadata(&self, src: String, filename: String) -> Result<String, Box<dyn Error + Send + Sync>> {
todo!()
}
}
POST /{handler_name}Send a request to a registered handler.
Example:
curl -X POST http://localhost:8080/my_handler \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_API_KEY" \
-d '{"type":"request","src":"client","data":"hello world"}'
POST /stream/upload/{file_name}Upload files to the server.
Example:
curl -X POST http://localhost:8080/stream/upload/example.txt \
-H "Authorization: Bearer YOUR_API_KEY" \
--data-binary "@/path/to/local/example.txt"
GET /stream/download/{file_id}Download a previously uploaded file.
Example:
curl -X GET http://localhost:8080/stream/download/abc123 \
-H "Authorization: Bearer YOUR_API_KEY" \
--output downloaded_file.txt
GET /stream/metadata/{file_id}Get metadata for a file.
Example:
curl -X GET http://localhost:8080/stream/metadata/abc123 \
-H "Authorization: Bearer YOUR_API_KEY"
POST /shutdownGracefully shut down the server.
Example:
curl -X POST http://localhost:8080/shutdown \
-H "Authorization: Bearer YOUR_API_KEY"
The system uses middleware to authenticate requests using API keys. Include your API key in the Authorization header as a Bearer token.
The Manager instance provides several configuration methods:
// Configure TLS with optional client certificate verification
manager.with_tls(Some("path/to/cert.pem"), Some("path/to/key.pem"), Some("path/to/ca.pem"));
// Set allowed client certificate names for authentication - only working if a CA is provided
manager.with_allowed_names(vec!["client1".to_string()]);
// Set API key for authentication
manager.with_api_key("your-secret-api-key");
// Set maximum concurrent HTTP requests
manager.with_max_requests(100);
// Set keep-alive timeout in seconds
manager.with_keep_alive(30);
Errors during request processing or message dispatching are handled gracefully, and appropriate error messages are returned. If a handler encounters an error, it logs the issue and returns an error message.
{
"status": "error",
"message": "Handler not found: invalid_handler"
}
More examples can be seen in the tests.
This crate is open-sourced under the MIT license.