#ring-buffer #broadcast-channel #aeron-c-bindings #aeron-rb

rusteron-rb

Provides ring buffer and broadcast functionalities via aeron c bindings, allowing efficient, low-latency message passing between different threads or processes. This module implements Single Producer, Single Consumer (SPSC) ring buffers, Multi-Producer, Single Consumer (MPSC) ring buffers, and broadcast channels.

33 releases

new 0.1.66 Jan 17, 2025
0.1.65 Jan 16, 2025
0.1.47 Dec 31, 2024
0.1.36 Nov 12, 2024

#356 in Network programming

Download history 295/week @ 2024-11-01 262/week @ 2024-11-08 32/week @ 2024-11-15 4/week @ 2024-11-22 46/week @ 2024-11-29 36/week @ 2024-12-06 8/week @ 2024-12-13 139/week @ 2024-12-20 870/week @ 2024-12-27 324/week @ 2025-01-03 1201/week @ 2025-01-10

2,535 downloads per month

MIT/Apache

14MB
250K SLoC

Java 163K SLoC // 0.2% comments C 43K SLoC // 0.0% comments C++ 40K SLoC // 0.1% comments Rust 3K SLoC // 0.0% comments AsciiDoc 638 SLoC // 0.1% comments Batch 435 SLoC // 0.6% comments PowerShell 170 SLoC // 0.1% comments Python 72 SLoC Shell 40 SLoC // 0.0% comments

Contains (JAR file, 63KB) aeron/gradle/wrapper/gradle-wrapper.jar

rusteron-rb

rusteron-rb is a core component of the rusteron project, providing ring buffer and broadcast functionalities to interact with the Aeron messaging system in a Rust environment. It enables Rust developers to leverage Aeron's high-performance, low-latency communication protocols.

Overview

The rusteron-rb module acts as a Rust wrapper around the Aeron C ring buffer API. It offers functions for establishing connections, transmitting and receiving messages, and broadcasting data streams, allowing seamless communication between distributed applications. Since it is built on top of Aeron's C bindings, this library operates in an unsafe context, requiring extra care from developers to ensure correctness.

Note: Since this module leverages Aeron C bindings, it is inherently unsafe and should be used with caution. Incorrect usage can lead to undefined behavior, such as segmentation faults.

Features

  • Broadcast Receiver: Receive messages from Aeron broadcast channels using AeronBroadcastReceiver.
  • Broadcast Transmitter: Send messages to Aeron broadcast channels using AeronBroadcastTransmitter.
  • MPSC Ring Buffer: Multi-producer, single-consumer ring buffer implementation using AeronMpscRb.
  • SPSC Ring Buffer: Single-producer, single-consumer ring buffer implementation using AeronSpscRb.

Usage Example

Single Producer, Single Consumer Ring Buffer

This example demonstrates how to use the AeronSpscRb to create a simple single producer, single consumer ring buffer.

use rusteron_rb::*;
use std::error::Error;

fn main() -> Result<(), Box<dyn Error>> {
    let rb = AeronSpscRb::new_with_capacity(1024 * 1024, 1024)?;

    // Producer writes data to the ring buffer
    for i in 0..50 {
        let idx = rb.try_claim(i + 1, 4);
        assert!(idx >= 0);
        let slot = rb.buffer_at_mut(idx as usize, 4);
        slot[0] = i as u8;
        rb.commit(idx)?;
    }
    // another way to commit the data
    for i in 0..50 {
        let mut slice = rb.try_claim_slice(i + 1, 4).unwrap();
        slice[0] = i as u8;
        // optional, if you don't call commit/abort will automatically commit
        slice.commit()?;
    }

    // Consumer reads data from the ring buffer
    struct Reader;
    impl AeronRingBufferHandlerCallback for Reader {
        fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) {
            println!("msg_type_id: {}, buffer: {:?}", msg_type_id, buffer);
            assert_eq!(buffer[0], (msg_type_id - 1) as u8);
        }
    }

    let handler = AeronRingBufferHandlerWrapper::new(Reader);
    for _ in 0..10 {
        let read = rb.read_msgs(&handler, 10);
        assert_eq!(10, read);
    }

    Ok(())
}

Multi-Producer, Single Consumer Ring Buffer

The following example demonstrates how to use the AeronMpscRb for a multi-producer, single consumer scenario, enabling multiple producers to write to the same ring buffer while a single consumer reads from it.

use rusteron_rb::*;
use std::error::Error;

fn main() -> Result<(), Box<dyn Error>> {
    let rb = AeronMpscRb::new_with_capacity(1024 * 1024, 1024)?;

    // Producers write data to the ring buffer
    for i in 0..100 {
        let idx = rb.try_claim(i + 1, 4);
        assert!(idx >= 0);
        let slot = rb.buffer_at_mut(idx as usize, 4);
        slot[0] = i as u8;
        rb.commit(idx)?;
    }

    // Consumer reads data from the ring buffer
    struct Reader;
    impl AeronRingBufferHandlerCallback for Reader {
        fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) {
            println!("msg_type_id: {}, buffer: {:?}", msg_type_id, buffer);
            assert_eq!(buffer[0], (msg_type_id - 1) as u8);
        }
    }

    let handler = AeronRingBufferHandlerWrapper::new(Reader);
    for _ in 0..10 {
        let read = rb.read_msgs(&handler, 10);
        assert_eq!(10, read);
    }

    Ok(())
}

Broadcast Transmitter and Receiver

This example demonstrates how to set up a broadcast transmitter and receiver. The transmitter sends messages that are then received by the receiver, illustrating a simple broadcast communication scenario.

use rusteron_rb::*;
use std::error::Error;

fn main() -> Result<(), Box<dyn Error>> {
    // Set up broadcast transmitter and receiver
    let mut vec = vec![0u8; 1024 * 1024 + AERON_BROADCAST_BUFFER_TRAILER_LENGTH];
    let transmitter = AeronBroadcastTransmitter::from_slice(vec.as_mut_slice(), 1024)?;
    let receiver = AeronBroadcastReceiver::from_slice(vec.as_mut_slice())?;

    // Transmit messages
    for i in 0..100 {
        let mut msg = [0u8; 4];
        msg[0] = i as u8;
        let idx = transmitter.transmit_msg(i + 1, &msg).unwrap();
        println!("sent {}", idx);
        assert!(idx >= 0);
    }

    // Receive messages
    struct Reader;
    impl AeronBroadcastReceiverHandlerCallback for Reader {
        fn handle_aeron_broadcast_receiver_handler(&mut self, msg_type_id: i32, buffer: &mut [u8]) {
            println!("msg_type_id: {}, buffer: {:?}", msg_type_id, buffer);
            assert_eq!(buffer[0], (msg_type_id - 1) as u8);
        }
    }

    let handler = Handler::leak(Reader {});
    for _ in 0..100 {
        let read = receiver.receive(Some(&handler)).unwrap();
        println!("read {}", read);
        assert!(read > 0);
    }

    Ok(())
}

Installation

Add the following to your Cargo.toml file to include rusteron-rb:

dynamic lib

[dependencies]
rusteron-rb = "0.1"

static lib

[dependencies]
rusteron-rb = { version = "0.1", features = ["static"] }

Ensure you have also set up the necessary Aeron C libraries required by rusteron-rb.

Safety Considerations

Since rusteron-rb relies on Aeron C bindings, it involves unsafe Rust code. Users must ensure:

  • Resources are properly managed.
  • Proper synchronization when accessing shared data in a multi-threaded environment.

Failing to uphold these safety measures can lead to crashes or undefined behavior.

Contributing

Contributions are welcome! Please feel free to open issues, submit pull requests, or suggest new features. We're particularly interested in:

  • Feedback on API usability.
  • Bug reports and feature requests.
  • Documentation improvements.

If you wish to contribute, refer to our contributing guidelines.

License

This project is dual-licensed under either the MIT License or the Apache License 2.0. You may choose which one to use.

Feel free to reach out with any questions or suggestions via GitHub Issues!

Dependencies

~4–14MB
~173K SLoC