69 releases (4 breaking)
Uses new Rust 2024
new 0.5.60 | Mar 13, 2025 |
---|---|
0.4.99992 | Mar 3, 2025 |
0.4.1 | Sep 13, 2024 |
#130 in HTTP server
6,531 downloads per month
74KB
754 lines
Manager Crate
Overview
Manager
is a scalable, async-driven system that handles requests and communication between different components using a pub/sub model in Rust. It is built on top of the Actix Web
framework for HTTP server handling and utilizes the Tokio runtime for asynchronous tasks. This crate allows you to create handlers that process requests and handle messages via a message bus, making it highly modular and easy to extend.
Features
- Dynamic Handler Registration: Register new handlers dynamically that can process requests and publish messages.
- Pub/Sub Messaging: Implement publish/subscribe messaging between different services or components.
- Concurrency Control: Uses a semaphore to limit to
nr
request at the time from HTTP requests. Handlers inside the service can replicate as many times as they are configured. - TLS Support: Configure secure HTTPS connections with optional client certificate verification.
- File Streaming: Upload and download files with metadata support.
- Graceful Shutdown: Includes an HTTP shutdown mechanism for controlled termination of the service.
- Asynchronous Processing: All handlers and requests are processed asynchronously using Tokio's async runtime.
- Shared State: All handlers can access a shared state to store and retrieve data.
Prerequisites
Before you begin, ensure you have met the following requirements:
- Rust version >= 1.85 (due to the use 2024 rust version).
Installation
To use this crate in your project, add the following dependencies to your Cargo.toml
file:
[dependencies]
async-trait = "0.1"
manager-handlers = "0.5.50"
Core Concepts
Handlers
Handlers process incoming requests and communicate with other handlers. Implement the Base
trait to create custom handlers.
Message Bus
The MultiBus
facilitates communication between handlers using publish/subscribe patterns.
Shared State
A thread-safe storage mechanism accessible by all handlers, supporting various data types and callable functions.
How to Use
Define a Handler
To create a custom handler, you need to implement the Base
trait:
use async_trait::async_trait;
use std::sync::Arc;
use futures::future::{BoxFuture, FutureExt};
use std::time::Duration;
use tokio::time::sleep;
use manager_handlers::multibus::MultiBus;
use manager_handlers::manager::{StateType, SharedState, Base};
pub struct MyHandler;
#[async_trait]
impl Base for MyHandler {
async fn run(&self, src: String, data: String, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Process the incoming data
println!("Received data: {}", data);
// Example: Publishing a message and awaiting response
let response = self.publish(
data.clone(),
"my_handler".to_string(),
"other_handler".to_string(),
communication_line.clone()
).await?;
// Example: Fire-and-forget dispatch
self.dispatch(
"notification data".to_string(),
"notification_handler".to_string(),
communication_line.clone()
).await?;
// Example: Store a value in shared state
shared_state.insert(&"counter".to_string(), StateType::Int(42)).await;
// Example: Store a synchronous function
let shared_function: Arc<dyn Fn(String) -> String + Sync + Send> = Arc::new(|input: String| -> String {
println!("Hello, {}!", input);
input + " pesto"
});
shared_state.insert(&"sync_func".to_string(), StateType::FunctionSync(shared_function)).await;
// Example: Store an asynchronous function
let shared_async_function: Arc<dyn Fn(String) -> BoxFuture<'static, String> + Send + Sync> = Arc::new(|input: String| async move {
println!("Got in the async function");
sleep(Duration::from_secs(5)).await;
"Done".to_string()
}.boxed());
shared_state.insert(&"async_func".to_string(), StateType::FunctionAsync(shared_async_function)).await;
Ok(format!("Processed data with response: {}", response))
}
fn new() -> Self {
MyHandler {}
}
}
Create and Start the Manager
The Manager
is responsible for initializing all the handlers and launching the HTTP server:
use manager_handlers::manager::Manager;
use std::collections::HashMap;
#[tokio::main]
async fn main() {
// Create a new Manager instance
let mut manager = Manager::new_default();
// Optional: Configure TLS
manager.with_tls("path/to/cert.pem", "path/to/key.pem", Some("path/to/ca.pem"));
// Optional: Set allowed client certificate names if using client cert auth
manager.with_allowed_names(vec!["client1".to_string(), "client2".to_string()]);
// Optional: Set API key for authentication
manager.with_api_key("my-secret-api-key");
// Optional: Configure maximum concurrent requests
manager.with_max_requests(100);
// Optional: Configure keep-alive timeout
manager.with_keep_alive(30);
// Register handlers with their replica counts
manager.add_handler::<MyHandler>("my_handler", 5);
manager.add_handler::<OtherHandler>("other_handler", 2);
println!("Starting manager...");
// Start the manager
manager.start().await;
}
File Handling Traits
The manager provides three methods in the Base trait that you can implement to create handler for file management:
-
UploadHandler: Implement this trait to customize how files are uploaded and stored.
#[async_trait] impl Base for UploadHandler { async fn run_stream(&self, src: String, mut stream: Pin<Box<dyn Stream<Item=Bytes> + Send>>, file_name: String, approx_size: usize, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<String, Box<dyn Error + Send + Sync>> { todo!() } }
-
DownloadHandler: Implement this trait to customize how files are downloaded.
#[async_trait] impl Base for DownloadHandler { async fn run_file(&self, src: String, filename: String, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<(Box<dyn AsyncRead + Send + Unpin>, u64), Box<dyn Error + Send + Sync>> { todo!() } }
-
MetadataHandler: Implement this trait to customize how file metadata is retrieved.
#[async_trait] impl Base for MetadataHandler { async fn run_metadata(&self, src: String, filename: String, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<String, Box<dyn Error + Send + Sync>> { todo!() } }
Configuration Options
HTTP Endpoints
Handler Endpoints
POST /{handler_name}
Send a request to a registered handler.
Example:
curl -X POST http://localhost:8080/my_handler \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_API_KEY" \
-d '{"type":"request","src":"client","data":"hello world"}'
File Operations
Upload a File: POST /stream/upload/{file_name}
Upload files to the server.
Example:
curl -X POST http://localhost:8080/stream/upload/example.txt \
-H "Authorization: Bearer YOUR_API_KEY" \
--data-binary "@/path/to/local/example.txt"
Download a File: GET /stream/download/{file_id}
Download a previously uploaded file.
Example:
curl -X GET http://localhost:8080/stream/download/abc123 \
-H "Authorization: Bearer YOUR_API_KEY" \
--output downloaded_file.txt
Retrieve File Metadata: GET /stream/metadata/{file_id}
Get metadata for a file.
Example:
curl -X GET http://localhost:8080/stream/metadata/abc123 \
-H "Authorization: Bearer YOUR_API_KEY"
System Management
Shutdown Server: POST /shutdown
Gracefully shut down the server.
Example:
curl -X POST http://localhost:8080/shutdown \
-H "Authorization: Bearer YOUR_API_KEY"
Authentication
The system uses middleware to authenticate requests using API keys. Include your API key in the Authorization
header as a Bearer token.
Configuration Options
The Manager instance provides several configuration methods:
// Configure TLS with optional client certificate verification
manager.with_tls(Some("path/to/cert.pem"), Some("path/to/key.pem"), Some("path/to/ca.pem"));
// Set allowed client certificate names for authentication - only working if a CA is provided
manager.with_allowed_names(vec!["client1".to_string()]);
// Set API key for authentication
manager.with_api_key("your-secret-api-key");
// Set maximum concurrent HTTP requests
manager.with_max_requests(100);
// Set keep-alive timeout in seconds
manager.with_keep_alive(30);
Error Handling
Errors during request processing or message dispatching are handled gracefully, and appropriate error messages are returned. If a handler encounters an error, it logs the issue and returns an error message.
Example Error Response
{
"status": "error",
"message": "Handler not found: invalid_handler"
}
License
This crate is open-sourced under the MIT license.
Dependencies
~25–37MB
~737K SLoC