8 releases

0.1.0 Dec 23, 2024
0.0.8 Dec 23, 2024

#402 in Database interfaces

Download history 165/week @ 2024-12-12 548/week @ 2024-12-19 54/week @ 2024-12-26

767 downloads per month

MIT license

355KB
897 lines

amqpsy

Extremely opinionated AMQP PubSub library


Built on top of lapin - this library is designed to be a simple and easy to use AMQP PubSub library. It's extremely opinionated and comes with some batteries* included.

Features

  • Publishers with default configurations for Events and Commands.
  • Consumers have DLQ enabled by default.
  • Fluent interface for consumer groups.
  • Ergonomic error handling API: retry, DLQ, or invalid message options.
  • Enforces Protobuf for messages, ensuring backward compatibility.
  • Distributed tracing enabled with Otel: linked traces from publisher to consumer.
  • Chaos Monkey: simulate duplicate messages, publish failure etc.

Usage

Consumer Groups

    let context = AppContext::new(); // your app context
    let config = amqpsy::AmqpConfig {
        connection_string: "<your connection string>".to_string(),
    };

    #[rustfmt::skip]
    ConsumerGroup::builder(
        "app_name",
        config,
        context,
    )
    .for_topic_exchange("ledger")
        // Using default settings
        .consume(handlers::CreateOutboundHandler).await?
        // Using custom settings
        .consume_with_config(handlers::OutboundAcceptedHandler, ConsumerConfig::default().with_worker_count(5)).await?
    .then()
    .for_topic_exchange("provider")
        .consume(handlers::provider::InboundTransactionSettledHandler).await?
        .consume(handlers::provider::OutboundAcceptedHandler).await?
        .consume(handlers::provider::OutboundSettledHandler).await?
        .consume(handlers::provider::OutboundFailedHandler).await?
    .run_until_shutdown().await?; // This will block until shutdown

    // Example handler
    #[derive(Clone, Debug)]
    pub struct CreateOutboundHandler;

    impl AmqpHandler for CreateOutboundHandler {
        type Message = CreateOutboundTransaction; // Must be a Protobuffer Message
        type Context = AppContext;

        async fn handle(
            &self,
            context: Self::Context,
            message: Self::Message,
        ) -> Result<(), ConsumerError> {
            // Do something with the message

            // Or return error to nack the message
            // return Err(ConsumerError::Retry(...) // means the message retried before sent to DLQ
            // return Err(ConsumerError::Fatal(..)) // means the message sent to DLQ
            // return Err(ConsumerError::Invalid(..)) // means the message is dropped
            //
            // Also use the fluent API to handle errors
            //
            // context
            //      .db.add_transaction(&message).await
            //      .or_transient_error()?; // Other Options: or_fatal_error()? or_invalid_error()?

            Ok(()) // means the message is Ack'd
        }
    }

Publishers

Pick one of three option using the AmqpPublisher::new_* methods.

  • Command: Publisher confirm and mandatory routing enabled
  • Event: Publisher confirm enabled but no mandatory routing enabled
  • Event without Publisher confirmation: No publisher confirm and no mandatory routing
// commands::CreateOutboundTransaction is a Proto message
let command =
    AmqpPublisher::<commands::CreateOutboundTransaction>::new_for_command(
        config,
        "exchange_name",
    )
    .await?;
let event = AmqpPublisher::<events::OutboundTransactionSettled>::new_for_event(
    config,
    "exchange_name",
)
.await?; // or new_for_event_without_publisher_confirmation

// Publish a command
// Command has publisher confirm and mandatory routing enabled
command.publish(&command).await?;

// Publish an event
// Event has publisher confirm enabled but no mandatory routing enabled
event.publish(&event).await?;

Distributed Tracing

Traces are linked between publisher and consumer with necessary tags set on them (prefixed with amqpsy.*)

example 1

example 2

Chaos

Simulate duplicate mesages, publish failure etc. in your system.

Example shows a system with 10% duplicate messages and 10% publish failure. chaos example 2

  • Enable feature chaos
  • Specify one of the chaos parameteres as environment variables.
    • AMQPSY_CHAOS_PUBLISHER_DUPLICATE_PERCENTAGE: % of publish will be sent more than once to the broker.
    • AMQPSY_CHAOS_PUBLISHER_FAILURE_PERCENTAGE: % of publish will fail due to random error.

If you enable chaos feature but don't specify any parameter - the application will crash at startup.

Dependencies

~14–28MB
~415K SLoC