| Crates.io | manager_handlers |
| lib.rs | manager_handlers |
| version | 0.7.2 |
| created_at | 2024-09-04 14:29:04.934328+00 |
| updated_at | 2025-09-30 17:05:07.223275+00 |
| description | A microservice manager implementation that creates HTTP-accessible handlers with configurable replicas. Handlers communicate via an internal bus, enabling collaborative request processing in a distributed architecture. |
| homepage | https://github.com/mat50013/manager_handlers |
| repository | https://github.com/mat50013/manager_handlers |
| max_upload_size | |
| id | 1363388 |
| size | 171,560 |
A scalable, async-driven microservice framework for Rust that enables dynamic handler registration with HTTP endpoints and internal pub/sub messaging.
manager_handlers is built on top of Actix Web and Tokio, providing a robust foundation for building microservice architectures. It allows you to create handlers that process HTTP requests and communicate with each other through an internal message bus, making it highly modular and scalable.
Add this to your Cargo.toml:
[dependencies]
manager_handlers = "0.7.2"
async-trait = "0.1"
Or use cargo:
cargo add manager_handlers async-trait
use manager_handlers::manager::Manager;
use manager_handlers::handler;
use async_trait::async_trait;
// Define a simple handler using the macro
handler!(HelloHandler, hello;
async fn run(&self, _: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
Ok(format!("Hello, {}!", data))
}
);
#[tokio::main]
async fn main() {
let mut manager = Manager::new_default();
// Register the handler with 3 replicas
manager.add_handler::<HelloHandler>("hello", 3);
// Start the server on port 8080
manager.start().await;
}
Handlers are the core processing units that:
The internal MultiBus provides:
publish()dispatch()Thread-safe storage supporting:
AnyType wrapperImplement the Base trait for full control:
use async_trait::async_trait;
use std::sync::Arc;
use futures::future::{BoxFuture, FutureExt};
use std::time::Duration;
use tokio::time::sleep;
use manager_handlers::multibus::MultiBus;
use manager_handlers::manager::{StateType, SharedState, Base};
pub struct MyHandler {
communication_line: Arc<MultiBus>,
shared_state: Arc<SharedState>
};
#[async_trait]
impl Base for MyHandler {
async fn run(&self, src: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Process the incoming data
println!("Received data: {}", data);
// Example: Publishing a message and awaiting response
let response = self.publish(
data.clone(),
"other_handler".to_string()
).await;
// Example: Fire-and-forget dispatch
self.dispatch(
"notification data".to_string(),
"notification_handler".to_string()
).await;
// Example: Store a value in shared state
self.shared_state.insert(&"counter".to_string(), StateType::Int(42)).await;
// Example: Store a synchronous function
let shared_function: Arc<dyn Fn(String) -> String + Sync + Send> = Arc::new(|input: String| -> String {
println!("Hello, {}!", input);
input + " pesto"
});
self.shared_state.insert(&"sync_func".to_string(), StateType::FunctionSync(shared_function)).await;
// Example: Store an asynchronous function
let shared_async_function: Arc<dyn Fn(String) -> BoxFuture<'static, String> + Send + Sync> = Arc::new(|input: String| async move {
println!("Got in the async function");
sleep(Duration::from_secs(5)).await;
"Done".to_string()
}.boxed());
self.shared_state.insert(&"async_func".to_string(), StateType::FunctionAsync(shared_async_function)).await;
Ok(format!("Processed data with response: {}", response))
}
fn get_shared_state(&self) -> Arc<SharedState> {
Arc::clone(&self.shared_state)
}
fn get_communication_line(&self) -> Arc<MultiBus> {
Arc::clone(&self.communication_line)
}
fn get_name(&self) -> String {
"myhandler".to_string()
}
fn new(communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Self {
MyHandler {communication_line, shared_state}
}
}
For simpler handlers, use the handler! macro:
use manager_handlers::handler;
// Simple handler
handler!(EchoHandler, echo;
async fn run(&self, _: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
Ok(format!("Echo from {}: {}", src, data))
}
);
// Handler with state access
handler!(CounterHandler, counter;
async fn run(&self, _: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let state = self.get_shared_state();
// Increment counter
let counter = match state.get(&"counter".to_string()).await {
Some(StateType::Int(val)) => val + 1,
_ => 1,
};
state.insert(&"counter".to_string(), StateType::Int(counter)).await;
Ok(format!("Counter: {}", counter))
}
);
use manager_handlers::manager::Manager;
use std::collections::HashMap;
#[tokio::main]
async fn main() {
// Create a new Manager instance
let mut manager = Manager::new_default();
// Optional: Configure TLS
manager.with_tls("path/to/cert.pem", "path/to/key.pem", Some("path/to/ca.pem"));
// Optional: Set allowed client certificate names if using client cert auth
manager.with_allowed_names(vec!["client1".to_string(), "client2".to_string()]);
// Optional: Set API key for authentication
manager.with_api_key("my-secret-api-key");
// Optional: Configure maximum concurrent requests
manager.with_max_requests(100);
// Optional: Configure keep-alive timeout
manager.with_keep_alive(30);
// Register handlers with their replica counts
manager.add_handler::<MyHandler>("my_handler", 5);
manager.add_handler::<OtherHandler>("other_handler", 2);
println!("Starting manager...");
// Start the manager
manager.start().await;
}
Enable distributed pub/sub with Redis:
// Configure Redis URL
manager.with_redis_url(Some("redis://localhost:6379".to_string()));
// In your handler, use Redis pub/sub
let response = self.publish_redis(
"message".to_string(),
"remote_handler".to_string(),
Some(5000) // 5 second timeout
).await;
// Subscribe to Redis topics
let request = self.subscribe_topic_redis("my_topic".to_string()).await?;
Implement file handling capabilities:
UploadHandler: Implement this trait to customize how files are uploaded and stored.
#[async_trait]
impl Base for UploadHandler {
async fn run_stream(&self, src: String, mut stream: Pin<Box<dyn Stream<Item=Bytes> + Send>>, file_name: String, approx_size: usize) -> Result<String, Box<dyn Error + Send + Sync>> {
todo!()
}
}
DownloadHandler: Implement this trait to customize how files are downloaded.
#[async_trait]
impl Base for DownloadHandler {
async fn run_file(&self, src: String, filename: String) -> Result<(Box<dyn AsyncRead + Send + Unpin>, u64), Box<dyn Error + Send + Sync>> {
todo!()
}
}
MetadataHandler: Implement this trait to customize how file metadata is retrieved.
#[async_trait]
impl Base for MetadataHandler {
async fn run_metadata(&self, src: String, filename: String) -> Result<String, Box<dyn Error + Send + Sync>> {
todo!()
}
}
POST /{handler_name}Send a request to a registered handler.
Example:
curl -X POST http://localhost:8080/my_handler \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_API_KEY" \
-d '{"type":"request","src":"client","data":"hello world"}'
POST /stream/upload/{file_name}Upload files to the server.
Example:
curl -X POST http://localhost:8080/stream/upload/example.txt \
-H "Authorization: Bearer YOUR_API_KEY" \
--data-binary "@/path/to/local/example.txt"
GET /stream/download/{file_id}Download a previously uploaded file.
Example:
curl -X GET http://localhost:8080/stream/download/abc123 \
-H "Authorization: Bearer YOUR_API_KEY" \
--output downloaded_file.txt
GET /stream/metadata/{file_id}Get metadata for a file.
Example:
curl -X GET http://localhost:8080/stream/metadata/abc123 \
-H "Authorization: Bearer YOUR_API_KEY"
POST /shutdownGracefully shut down the server.
Example:
curl -X POST http://localhost:8080/shutdown \
-H "Authorization: Bearer YOUR_API_KEY"
API Key Authentication
with_api_key()TLS/SSL Support
with_tls()Client Certificate Authentication
with_allowed_names()let mut manager = Manager::new_default();
// Enable API key authentication
manager.with_api_key("your-secret-api-key");
// Configure TLS with client certificates
manager.with_tls(
"path/to/server-cert.pem",
"path/to/server-key.pem",
Some("path/to/ca-cert.pem")
);
// Allow specific client certificates
manager.with_allowed_names(vec![
"trusted-client-1".to_string(),
"trusted-client-2".to_string()
]);
// Limit total concurrent HTTP requests
manager.with_max_requests(1000);
// Configure handler replicas for load distribution
manager.add_handler::<MyHandler>("heavy_processor", 10);
// Set keep-alive for connection reuse
manager.with_keep_alive(60);
The framework provides comprehensive error handling:
// Handler errors are automatically caught and returned
async fn run(&self, src: String, data: String) -> Result<String, Box<dyn Error + Send + Sync>> {
// Your error will be properly formatted and returned to client
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Resource not found"
)))
}
// Handler not found
{
"status": "error",
"message": "Handler not found: invalid_handler"
}
// Authentication failure
{
"status": "error",
"message": "Unauthorized"
}
// Internal error
{
"status": "error",
"message": "Internal server error: details..."
}
Contributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the MIT License - see the LICENSE file for details.
Matei Aruxandei - stefmatei22@gmail.com