#message-bus #handlers #microservices #manager #pub-sub #shared-state #requests

manager_handlers

A microservice manager implementation that creates HTTP-accessible handlers with configurable replicas. Handlers communicate via an internal bus, enabling collaborative request processing in a distributed architecture.

69 releases (4 breaking)

Uses new Rust 2024

new 0.5.60 Mar 13, 2025
0.4.99992 Mar 3, 2025
0.4.1 Sep 13, 2024

#130 in HTTP server

Download history 8/week @ 2024-12-04 12/week @ 2024-12-11 1/week @ 2024-12-18 22/week @ 2024-12-25 98/week @ 2025-01-22 766/week @ 2025-01-29 306/week @ 2025-02-05 431/week @ 2025-02-12 985/week @ 2025-02-19 2354/week @ 2025-02-26 2673/week @ 2025-03-05

6,531 downloads per month

MIT license

74KB
754 lines

Manager Crate

Overview

Manager is a scalable, async-driven system that handles requests and communication between different components using a pub/sub model in Rust. It is built on top of the Actix Web framework for HTTP server handling and utilizes the Tokio runtime for asynchronous tasks. This crate allows you to create handlers that process requests and handle messages via a message bus, making it highly modular and easy to extend.

Features

  • Dynamic Handler Registration: Register new handlers dynamically that can process requests and publish messages.
  • Pub/Sub Messaging: Implement publish/subscribe messaging between different services or components.
  • Concurrency Control: Uses a semaphore to limit to nr request at the time from HTTP requests. Handlers inside the service can replicate as many times as they are configured.
  • TLS Support: Configure secure HTTPS connections with optional client certificate verification.
  • File Streaming: Upload and download files with metadata support.
  • Graceful Shutdown: Includes an HTTP shutdown mechanism for controlled termination of the service.
  • Asynchronous Processing: All handlers and requests are processed asynchronously using Tokio's async runtime.
  • Shared State: All handlers can access a shared state to store and retrieve data.

Prerequisites

Before you begin, ensure you have met the following requirements:

  • Rust version >= 1.85 (due to the use 2024 rust version).

Installation

To use this crate in your project, add the following dependencies to your Cargo.toml file:

[dependencies]
async-trait = "0.1"
manager-handlers = "0.5.50"

Core Concepts

Handlers

Handlers process incoming requests and communicate with other handlers. Implement the Base trait to create custom handlers.

Message Bus

The MultiBus facilitates communication between handlers using publish/subscribe patterns.

Shared State

A thread-safe storage mechanism accessible by all handlers, supporting various data types and callable functions.

How to Use

Define a Handler

To create a custom handler, you need to implement the Base trait:

use async_trait::async_trait;
use std::sync::Arc;
use futures::future::{BoxFuture, FutureExt};
use std::time::Duration;
use tokio::time::sleep;
use manager_handlers::multibus::MultiBus;
use manager_handlers::manager::{StateType, SharedState, Base};

pub struct MyHandler;

#[async_trait]
impl Base for MyHandler {
    async fn run(&self, src: String, data: String, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
        // Process the incoming data
        println!("Received data: {}", data);
        
        // Example: Publishing a message and awaiting response
        let response = self.publish(
            data.clone(), 
            "my_handler".to_string(), 
            "other_handler".to_string(), 
            communication_line.clone()
        ).await?;
        
        // Example: Fire-and-forget dispatch
        self.dispatch(
            "notification data".to_string(), 
            "notification_handler".to_string(), 
            communication_line.clone()
        ).await?;
        
        // Example: Store a value in shared state
        shared_state.insert(&"counter".to_string(), StateType::Int(42)).await;
        
        // Example: Store a synchronous function
        let shared_function: Arc<dyn Fn(String) -> String + Sync + Send> = Arc::new(|input: String| -> String {
          println!("Hello, {}!", input);
          input + " pesto"
        });
        shared_state.insert(&"sync_func".to_string(), StateType::FunctionSync(shared_function)).await;
        
        // Example: Store an asynchronous function
        let shared_async_function: Arc<dyn Fn(String) -> BoxFuture<'static, String> + Send + Sync> = Arc::new(|input: String| async move {
          println!("Got in the async function");
          sleep(Duration::from_secs(5)).await;
          "Done".to_string()
        }.boxed());
        shared_state.insert(&"async_func".to_string(), StateType::FunctionAsync(shared_async_function)).await;
        
        Ok(format!("Processed data with response: {}", response))
    }

    fn new() -> Self {
        MyHandler {}
    }
}

Create and Start the Manager

The Manager is responsible for initializing all the handlers and launching the HTTP server:

use manager_handlers::manager::Manager;
use std::collections::HashMap;

#[tokio::main]
async fn main() {
    // Create a new Manager instance
    let mut manager = Manager::new_default();
    
    // Optional: Configure TLS
    manager.with_tls("path/to/cert.pem", "path/to/key.pem", Some("path/to/ca.pem"));
    
    // Optional: Set allowed client certificate names if using client cert auth
    manager.with_allowed_names(vec!["client1".to_string(), "client2".to_string()]);
    
    // Optional: Set API key for authentication
    manager.with_api_key("my-secret-api-key");
    
    // Optional: Configure maximum concurrent requests
    manager.with_max_requests(100);
    
    // Optional: Configure keep-alive timeout
    manager.with_keep_alive(30);
    
    // Register handlers with their replica counts
    manager.add_handler::<MyHandler>("my_handler", 5);
    manager.add_handler::<OtherHandler>("other_handler", 2);
    
    println!("Starting manager...");
    
    // Start the manager
    manager.start().await;
}

File Handling Traits

The manager provides three methods in the Base trait that you can implement to create handler for file management:

  1. UploadHandler: Implement this trait to customize how files are uploaded and stored.

    #[async_trait]
    impl Base for UploadHandler {
        async fn run_stream(&self, src: String, mut stream: Pin<Box<dyn Stream<Item=Bytes> + Send>>, file_name: String, approx_size: usize, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<String, Box<dyn Error + Send + Sync>> {
           todo!()
        }
    }
    
  2. DownloadHandler: Implement this trait to customize how files are downloaded.

    #[async_trait]
    impl Base for DownloadHandler {
        async fn run_file(&self, src: String, filename: String, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<(Box<dyn AsyncRead + Send + Unpin>, u64), Box<dyn Error + Send + Sync>> {
           todo!()    
        }
    }
    
  3. MetadataHandler: Implement this trait to customize how file metadata is retrieved.

    #[async_trait]
    impl Base for MetadataHandler {
        async fn run_metadata(&self, src: String, filename: String, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<String, Box<dyn Error + Send + Sync>> {
           todo!() 
        }
    }
    

Configuration Options

HTTP Endpoints

Handler Endpoints

POST /{handler_name}

Send a request to a registered handler.

Example:

curl -X POST http://localhost:8080/my_handler \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -d '{"type":"request","src":"client","data":"hello world"}'

File Operations

Upload a File: POST /stream/upload/{file_name}

Upload files to the server.

Example:

curl -X POST http://localhost:8080/stream/upload/example.txt \
  -H "Authorization: Bearer YOUR_API_KEY" \
  --data-binary "@/path/to/local/example.txt"

Download a File: GET /stream/download/{file_id}

Download a previously uploaded file.

Example:

curl -X GET http://localhost:8080/stream/download/abc123 \
  -H "Authorization: Bearer YOUR_API_KEY" \
  --output downloaded_file.txt

Retrieve File Metadata: GET /stream/metadata/{file_id}

Get metadata for a file.

Example:

curl -X GET http://localhost:8080/stream/metadata/abc123 \
  -H "Authorization: Bearer YOUR_API_KEY"

System Management

Shutdown Server: POST /shutdown

Gracefully shut down the server.

Example:

curl -X POST http://localhost:8080/shutdown \
  -H "Authorization: Bearer YOUR_API_KEY"

Authentication

The system uses middleware to authenticate requests using API keys. Include your API key in the Authorization header as a Bearer token.

Configuration Options

The Manager instance provides several configuration methods:

// Configure TLS with optional client certificate verification
manager.with_tls(Some("path/to/cert.pem"), Some("path/to/key.pem"), Some("path/to/ca.pem"));

// Set allowed client certificate names for authentication - only working if a CA is provided
manager.with_allowed_names(vec!["client1".to_string()]);

// Set API key for authentication
manager.with_api_key("your-secret-api-key");

// Set maximum concurrent HTTP requests
manager.with_max_requests(100);

// Set keep-alive timeout in seconds
manager.with_keep_alive(30);

Error Handling

Errors during request processing or message dispatching are handled gracefully, and appropriate error messages are returned. If a handler encounters an error, it logs the issue and returns an error message.

Example Error Response

{
    "status": "error",
    "message": "Handler not found: invalid_handler"
}

License

This crate is open-sourced under the MIT license.

Dependencies

~25–37MB
~737K SLoC