9 releases
new 0.2.7 | Jan 24, 2025 |
---|---|
0.2.6 | Jan 24, 2025 |
0.1.0 | Jan 22, 2025 |
#255 in Asynchronous
1,026 downloads per month
26KB
395 lines
Ollie
Ollie is a Rust crate built on top of Lapin to simplify RabbitMQ interactions. It provides a high-level, HTTP-like API experience for handling message queues. By defining routes (queue names) and corresponding asynchronous handler functions, RabbitRouter makes it easier to process messages while maintaining flexibility.
Features
-
Route-based API: Define routes to route data from queues or exchanges to handler functions.
-
Shared state management: Easily pass shared application state to your handlers.
-
Exchange support: Bind queues to exchanges with specific routing keys.
-
Async handler support: Write asynchronous handlers for processing messages.
-
Built-in error handling: Acknowledge messages and optionally publish results to exchanges.
Installation
Add ollie to your cargo.toml
cargo add ollie
Getting Started
1. Creating a RabbitRouter
Start by creating a new instance of RabbitRouter with a RabbitMQ connection URI and an initial state:
use ollie::RabbitRouter;
use std::collections::HashMap;
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let initial_state = HashMap::new(); // Shared state for handlers
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
}
2. Connect to a queue or exchange
You can add routes for specific queues or exchanges. Each route requires a handler function that defines how messages should be processed.
Connecting directly to a queue
You can connect directly to a queue to receive data from that queue. Since it's a RabbitMq queue only one consumer will handle the message. You might use this to attach many workers to the same queue.
use ollie::RabbitResult;
use std::collections::HashMap;
use tokio::sync::Mutex;
use std::sync::Arc;
async fn my_handler(
data: Vec<u8>,
state: Arc<Mutex<HashMap<String, i32>>>, //note that this is the type of the state defined in main but wrapped in Arc<Mutex<>>
) -> Option<RabbitResult> {
let mut state = state.lock().await;
let count = state.entry("processed_count".to_string()).or_insert(0);
*count += 1;
println!("Message: {:?}", String::from_utf8_lossy(&data));
None
}
#[tokio::main]
async fn main() {
let initial_state = HashMap::new();
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
router
.add_route_queue("test_queue", None, my_handler)
.await
.expect("Failed to add route");
}
Connecting to an exchange
use ollie::{RabbitRouter, Exchange};
#[tokio::main]
async fn main() {
let initial_state = HashMap::new();
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
router
.add_route_exchange("test_exchange", "test.routing.key", None, my_handler)
.await
.expect("Failed to add route to exchange");
}
Advanced Features
Shared State
RabbitRouter supports shared state between routes, enabling coordinated processing. The router automatically imposes Arc<Mutex<S>>
around the state type for safe shared state access.
Result Publishing
If you need to publish results after processing, specify a result_exchange when adding a route. The handler can return a RabbitResult, which is serialized and published to the specified exchange.
How the Results Exchange Works
When you specify a result_exchange, RabbitRouter will automatically publish the result returned by the handler to the specified exchange. A result_exchange has the following components:
Exchange Name: The name of the RabbitMQ exchange where results are published.
Routing Key: A key used to route the result to the appropriate queue.
The handler function can return a RabbitResult containing the data you want to publish. RabbitRouter will serialize this result (e.g., using JSON) and publish it to the result_exchange.
The Router will automatrically set the first two topics in the routing key based on the RabbitResult. The first part of a routing key is the logging level and the second is the billing type.
Example:
use ollie::{RabbitRouter, RabbitResult, Exchange};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
async fn handler_with_result(
data: Vec<u8>,
state: Arc<Mutex<HashMap<String, i32>>>,
) -> Option<RabbitResult> {
let message = String::from_utf8_lossy(&data);
println!("Processed message: {}", message);
// Return a RabbitResult to be published to the result exchange
Some(RabbitResult {
logging_level: ollie::rabbit_result::LoggingLevel::Info,
billing_type: ollie::rabbit_result::BillingType::NotBilled,
task_result: TaskResult {
task_id: id,
engine_metadata: EngineMetadata {
engine_id: "Filter Engine".to_string(),
time_taken: elapsed,
}
data: json!({"your_data":"here"})
}
})
}
#[tokio::main]
async fn main() {
let state = HashMap::new();
let initial_state = Arc::new(Mutex::new(state));
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state.clone()).await;
let result_exchange = Some(Exchange {
name: "result_exchange".to_string(),
routing_key: "result.key".to_string(),
});
router
.add_route_queue("my_queue", result_exchange, handler_with_result)
.await
.expect("Failed to add route");
}
Error Handling
RabbitRouter automatically acknowledges messages. Customize your handlers to handle errors and return appropriate results.
Examples
Multiple Queues Example
#[tokio::main]
async fn main() {
let state = HashMap::new();
let initial_state = Arc::new(Mutex::new(state));
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
router
.add_route_queue("queue_1", None, my_handler)
.await
.expect("Failed to add route");
router
.add_route_queue("queue_2", None, my_handler)
.await
.expect("Failed to add route");
}
Testing
Run the included tests to verify functionality:
cargo test
License
This project is licensed under the MIT or Apache 2.0 License
Contributing
Contributions are welcome! Feel free to open issues or submit pull requests on GitHub.
Feedback
If you have any questions or feedback, please open an issue on GitHub or contact the maintainers.
Dependencies
~11–24MB
~357K SLoC