#routes #exchange #handler #routing #queue #shared-state #lapin

ollie

An abstraction layer on top of lapin, to align with traditional HTTP API routing

9 releases

new 0.2.7 Jan 24, 2025
0.2.6 Jan 24, 2025
0.1.0 Jan 22, 2025

#255 in Asynchronous

Download history 1026/week @ 2025-01-22

1,026 downloads per month

MIT/Apache

26KB
395 lines

Ollie

Ollie is a Rust crate built on top of Lapin to simplify RabbitMQ interactions. It provides a high-level, HTTP-like API experience for handling message queues. By defining routes (queue names) and corresponding asynchronous handler functions, RabbitRouter makes it easier to process messages while maintaining flexibility.

Features

  • Route-based API: Define routes to route data from queues or exchanges to handler functions.

  • Shared state management: Easily pass shared application state to your handlers.

  • Exchange support: Bind queues to exchanges with specific routing keys.

  • Async handler support: Write asynchronous handlers for processing messages.

  • Built-in error handling: Acknowledge messages and optionally publish results to exchanges.

Installation

Add ollie to your cargo.toml

cargo add ollie

Getting Started

1. Creating a RabbitRouter

Start by creating a new instance of RabbitRouter with a RabbitMQ connection URI and an initial state:

use ollie::RabbitRouter;
use std::collections::HashMap;
use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let initial_state = HashMap::new(); // Shared state for handlers

    let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
}

2. Connect to a queue or exchange

You can add routes for specific queues or exchanges. Each route requires a handler function that defines how messages should be processed.

Connecting directly to a queue

You can connect directly to a queue to receive data from that queue. Since it's a RabbitMq queue only one consumer will handle the message. You might use this to attach many workers to the same queue.

use ollie::RabbitResult;
use std::collections::HashMap;
use tokio::sync::Mutex;
use std::sync::Arc;

async fn my_handler(
    data: Vec<u8>,
    state: Arc<Mutex<HashMap<String, i32>>>, //note that this is the type of the  state defined in main but wrapped in Arc<Mutex<>>
) -> Option<RabbitResult> {
    let mut state = state.lock().await;
    let count = state.entry("processed_count".to_string()).or_insert(0);
    *count += 1;

    println!("Message: {:?}", String::from_utf8_lossy(&data));
    None
}

#[tokio::main]
async fn main() {
    let initial_state = HashMap::new();

    let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
    
    router
        .add_route_queue("test_queue", None, my_handler)
        .await
        .expect("Failed to add route");
}

Connecting to an exchange

use ollie::{RabbitRouter, Exchange};

#[tokio::main]
async fn main() {
    let initial_state = HashMap::new();

    let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;

    router
        .add_route_exchange("test_exchange", "test.routing.key", None, my_handler)
        .await
        .expect("Failed to add route to exchange");
}

Advanced Features

Shared State

RabbitRouter supports shared state between routes, enabling coordinated processing. The router automatically imposes Arc<Mutex<S>> around the state type for safe shared state access.

Result Publishing

If you need to publish results after processing, specify a result_exchange when adding a route. The handler can return a RabbitResult, which is serialized and published to the specified exchange.

How the Results Exchange Works

When you specify a result_exchange, RabbitRouter will automatically publish the result returned by the handler to the specified exchange. A result_exchange has the following components:

Exchange Name: The name of the RabbitMQ exchange where results are published.

Routing Key: A key used to route the result to the appropriate queue.

The handler function can return a RabbitResult containing the data you want to publish. RabbitRouter will serialize this result (e.g., using JSON) and publish it to the result_exchange.

The Router will automatrically set the first two topics in the routing key based on the RabbitResult. The first part of a routing key is the logging level and the second is the billing type.

Example:

use ollie::{RabbitRouter, RabbitResult, Exchange};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;

async fn handler_with_result(
    data: Vec<u8>,
    state: Arc<Mutex<HashMap<String, i32>>>,
) -> Option<RabbitResult> {
    let message = String::from_utf8_lossy(&data);
    println!("Processed message: {}", message);

    // Return a RabbitResult to be published to the result exchange
    Some(RabbitResult {
        logging_level: ollie::rabbit_result::LoggingLevel::Info,
        billing_type: ollie::rabbit_result::BillingType::NotBilled,
        task_result: TaskResult {
            task_id: id,
            engine_metadata: EngineMetadata {
                engine_id: "Filter Engine".to_string(),
                time_taken: elapsed,
            }
            data: json!({"your_data":"here"})
        }
    })
}

#[tokio::main]
async fn main() {
    let state = HashMap::new();
    let initial_state = Arc::new(Mutex::new(state));

    let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state.clone()).await;

    let result_exchange = Some(Exchange {
        name: "result_exchange".to_string(),
        routing_key: "result.key".to_string(),
    });

    router
        .add_route_queue("my_queue", result_exchange, handler_with_result)
        .await
        .expect("Failed to add route");
}

Error Handling

RabbitRouter automatically acknowledges messages. Customize your handlers to handle errors and return appropriate results.

Examples

Multiple Queues Example

#[tokio::main]
async fn main() {
    let state = HashMap::new();
    let initial_state = Arc::new(Mutex::new(state));

    let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;

    router
        .add_route_queue("queue_1", None, my_handler)
        .await
        .expect("Failed to add route");

    router
        .add_route_queue("queue_2", None, my_handler)
        .await
        .expect("Failed to add route");
}

Testing

Run the included tests to verify functionality:

cargo test

License

This project is licensed under the MIT or Apache 2.0 License

Contributing

Contributions are welcome! Feel free to open issues or submit pull requests on GitHub.

Feedback

If you have any questions or feedback, please open an issue on GitHub or contact the maintainers.

Dependencies

~11–24MB
~357K SLoC