13 releases (7 breaking)

new 0.7.0 Nov 21, 2024
0.6.0 Oct 31, 2024
0.4.3 Jul 30, 2024
0.4.2 Feb 16, 2024
0.0.0 Sep 16, 2021

#352 in Network programming

Download history 545/week @ 2024-08-01 565/week @ 2024-08-08 567/week @ 2024-08-15 475/week @ 2024-08-22 117/week @ 2024-08-29 164/week @ 2024-09-05 162/week @ 2024-09-12 177/week @ 2024-09-19 206/week @ 2024-09-26 115/week @ 2024-10-03 105/week @ 2024-10-10 99/week @ 2024-10-17 161/week @ 2024-10-24 369/week @ 2024-10-31 170/week @ 2024-11-07 263/week @ 2024-11-14

1,004 downloads per month

Apache-2.0 OR MPL-2.0

400KB
11K SLoC

RabbitMQ Streams Client for Rust


A Rust client for RabbitMQ Streams

Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide provides comprehensive information on installation, usage, and examples.

Table of Contents

  1. Introduction
  2. Installation
  3. Getting Started
  4. Usage
  5. Examples
  6. Development

Introduction

The RabbitMQ Stream Rust Client is a library designed for integrating Rust applications with RabbitMQ streams efficiently. It supports high throughput and low latency message streaming.

Installation

Install from crates.io

[dependencies]
rabbitmq-stream-client = "*"

Then run cargo build to include it in your project.

Getting Started

This section covers the initial setup and necessary steps to incorporate the RabbitMQ Stream client into your Rust application.

Ensure RabbitMQ server with stream support is installed. The main access point is Environment, which is used to connect to a node.

use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;

Environment with TLS

use rabbitmq_stream_client::Environment;

let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
.add_root_certificates(String::from(".ci/certs/ca_certificate.pem"))
.build();

// Use this configuration if you want to trust the certificates
// without providing the root certificate
let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
     .trust_certificates(true)
     .build();

let environment = Environment::builder()
    .host("localhost")
    .port(5551) // specify the TLS port of the node
    .tls(tls_configuration)
    .build()

Environment with a load balancer

See the documentation about the stream and load-balancer.

use rabbitmq_stream_client::Environment;


let environment = Environment::builder()
    .load_balancer_mode(true)
    .build()

Publishing messages

You can publish messages with three different methods:

  • send: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the example
  • batch_send: asynchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the example
  • send_with_confirm: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the example

Consuming messages

As streams never delete any messages, any consumer can start reading/consuming from any point in the log

See the Consuming section part of the streaming doc for further info (Most of the examples refer to Java but applies for ths library too):

Consuming messages from a stream

See also the Rust streaming tutorial-2 on how consume messages starting from different positions and how to enable Server-Side Offset Tracking too:

RabbitMQ Streams - Rust tutorial 2

and the relative examples from the tutorials:

Rust tutorials examples

See also a simple example here on how to consume from a stream:

Consuming messages from a stream example

Super Stream

The client supports the super-stream functionality.

A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.

See the blog post for more info.

You can use SuperStreamProducer and SuperStreamConsumer classes which internally uses producers and consumers to operate on the componsing streams.

SuperstreamProducers can act in Hashing and Routing Key mode.

See the Java documentation for more details (Same concepts apply here):

Super Stream Producer - Java doc

Have a look to the examples to see on how to work with super streams.

See the Super Stream Producer Example using Hashing mmh3 mode

See the Super Stream Producer Example using Routing key mode

See the Super Stream Consumer Example

Single active consumer

The client supports the single-active-consumer feature:

single-active-consumer feature

See the Java doc for further information (Same concepts apply here):

Single-Active-Consumer Java doc

See the Rust full example here:

Single-Active-Consumer-Full-Example

Filtering

Filtering is a new streaming feature enabled from RabbitMQ 3.13 based on Bloom filter. RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages.

See the Java documentation for more details (Same concepts apply here):

Filtering - Java Doc

See Rust filtering examples:

See the Producer with filtering Example

See the Consumer with filtering Example

Examples

Refer to the examples directory for detailed code samples illustrating various use cases like error handling, batch processing, super streams and different ways to send messages.

Development

Compiling

git clone https://github.com/rabbitmq/rabbitmq-stream-rust-client .
make build

Running Tests

To run tests you need to have a running RabbitMQ Stream node with a TLS configuration. It is mandatory to use make rabbitmq-server to create a TLS configuration compatible with the tests. See the Environment TLS tests for more details.

make rabbitmq-server
make test

Running Benchmarks

make rabbitmq-server
make run-benchmark

Contributing

Contributions are welcome! Please read our contributing guide to understand how to submit issues, enhancements, or patches.

License

See the LICENSE file for details.

Dependencies

~16–26MB
~462K SLoC