8 releases
0.1.0 | Dec 23, 2024 |
---|---|
0.0.8 | Dec 23, 2024 |
#402 in Database interfaces
767 downloads per month
355KB
897 lines
amqpsy
Extremely opinionated AMQP PubSub library
Built on top of lapin
- this library is designed to be a simple and easy to use AMQP PubSub library.
It's extremely opinionated and comes with some batteries* included.
Features
- Publishers with default configurations for Events and Commands.
- Consumers have DLQ enabled by default.
- Fluent interface for consumer groups.
- Ergonomic error handling API: retry, DLQ, or invalid message options.
- Enforces Protobuf for messages, ensuring backward compatibility.
- Distributed tracing enabled with Otel: linked traces from publisher to consumer.
- Chaos Monkey: simulate duplicate messages, publish failure etc.
Usage
Consumer Groups
let context = AppContext::new(); // your app context
let config = amqpsy::AmqpConfig {
connection_string: "<your connection string>".to_string(),
};
#[rustfmt::skip]
ConsumerGroup::builder(
"app_name",
config,
context,
)
.for_topic_exchange("ledger")
// Using default settings
.consume(handlers::CreateOutboundHandler).await?
// Using custom settings
.consume_with_config(handlers::OutboundAcceptedHandler, ConsumerConfig::default().with_worker_count(5)).await?
.then()
.for_topic_exchange("provider")
.consume(handlers::provider::InboundTransactionSettledHandler).await?
.consume(handlers::provider::OutboundAcceptedHandler).await?
.consume(handlers::provider::OutboundSettledHandler).await?
.consume(handlers::provider::OutboundFailedHandler).await?
.run_until_shutdown().await?; // This will block until shutdown
// Example handler
#[derive(Clone, Debug)]
pub struct CreateOutboundHandler;
impl AmqpHandler for CreateOutboundHandler {
type Message = CreateOutboundTransaction; // Must be a Protobuffer Message
type Context = AppContext;
async fn handle(
&self,
context: Self::Context,
message: Self::Message,
) -> Result<(), ConsumerError> {
// Do something with the message
// Or return error to nack the message
// return Err(ConsumerError::Retry(...) // means the message retried before sent to DLQ
// return Err(ConsumerError::Fatal(..)) // means the message sent to DLQ
// return Err(ConsumerError::Invalid(..)) // means the message is dropped
//
// Also use the fluent API to handle errors
//
// context
// .db.add_transaction(&message).await
// .or_transient_error()?; // Other Options: or_fatal_error()? or_invalid_error()?
Ok(()) // means the message is Ack'd
}
}
Publishers
Pick one of three option using the AmqpPublisher::new_*
methods.
- Command: Publisher confirm and mandatory routing enabled
- Event: Publisher confirm enabled but no mandatory routing enabled
- Event without Publisher confirmation: No publisher confirm and no mandatory routing
// commands::CreateOutboundTransaction is a Proto message
let command =
AmqpPublisher::<commands::CreateOutboundTransaction>::new_for_command(
config,
"exchange_name",
)
.await?;
let event = AmqpPublisher::<events::OutboundTransactionSettled>::new_for_event(
config,
"exchange_name",
)
.await?; // or new_for_event_without_publisher_confirmation
// Publish a command
// Command has publisher confirm and mandatory routing enabled
command.publish(&command).await?;
// Publish an event
// Event has publisher confirm enabled but no mandatory routing enabled
event.publish(&event).await?;
Distributed Tracing
Traces are linked between publisher and consumer with necessary tags set on them (prefixed with amqpsy.*
)
Chaos
Simulate duplicate mesages, publish failure etc. in your system.
Example shows a system with 10% duplicate messages and 10% publish failure.
- Enable feature
chaos
- Specify one of the chaos parameteres as environment variables.
AMQPSY_CHAOS_PUBLISHER_DUPLICATE_PERCENTAGE
: % of publish will be sent more than once to the broker.AMQPSY_CHAOS_PUBLISHER_FAILURE_PERCENTAGE
: % of publish will fail due to random error.
If you enable chaos
feature but don't specify any parameter - the application will crash at startup.
Dependencies
~14–28MB
~415K SLoC