#pub-sub #async #sub

async_pub_sub

A library aiming at making async pub-sub easier in Rust

1 unstable release

Uses new Rust 2024

new 0.1.2 Apr 18, 2025
0.1.1 Apr 13, 2025
0.1.0 Apr 13, 2025

#365 in Asynchronous

Download history 183/week @ 2025-04-09

183 downloads per month
Used in async_pub_sub_macros

MIT license

47KB
767 lines

Async Pub Sub

A library that aims at making the pub/sub pattern easy to use in asynchronous Rust.

Overview

This crate provides a flexible and efficient foundation for building publish-subscribe systems in Rust using asynchronous programming. It includes:

  • Core Abstractions: Publisher, Subscriber traits for defining publishers and subscribers.
  • Derive Macros: Convenient macros available using the macros features to automatically generate publisher and subscriber implementations (see async_pub_sub_macros for more details).
  • Extensibility: Middleware layers for publishers and subscribers to add custom logic like logging or debugging.
  • Example Implementations: Ready-to-use implementations for common use cases.

Features

  • Asynchronous: Built for async rust.
  • Flexible: Generic implementation allowing to use custom messages.
  • Extensible: Easily add custom middleware layers.
  • Macro Support: Simplify implementation with derive macros.

Getting Started

Add async_pub_sub to your Cargo.toml:

[dependencies]
async_pub_sub = { version = "0.1.0", features = ["macros"] } # Replace with the latest version

Example

use async_pub_sub::{
    Publisher, PublisherImpl, Subscriber, SubscriberImpl,
    macros::{DerivePublisher, DeriveSubscriber, routes, rpc_interface},
};

// subscriber wrapper example
#[derive(DeriveSubscriber)]
struct MySubscriber<S>
where
    S: Subscriber,
{
    inner_subscriber: S,
}

// publisher wrapper example
#[derive(DerivePublisher)]
struct MyPublisher<P>
where
    P: Publisher,
{
    inner_publisher: P,
}

// rpc_interface example
// This macro generates elements allowing to perform RPC calls
// between the publisher and subscriber.
#[rpc_interface]
trait MyServiceInterface {
    async fn foo(&self) -> &'static str;
    async fn bar(&mut self, value: i32) -> Result<(), String>;
}

#[derive(DeriveSubscriber)]
struct MyService {
    // `MyServiceInterfaceMessage` is
    // generated by the `rpc_interface` macro
    #[subscriber(MyServiceInterfaceMessage)]
    subscriber: SubscriberImpl<MyServiceInterfaceMessage>,
    value: i32,
}

impl MyServiceInterface for MyService {
    async fn foo(&self) -> &'static str {
        "Hello from MyService!"
    }

    async fn bar(&mut self, value: i32) -> Result<(), String> {
        println!("Received value: {}", value);
        self.value = value;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let mut publisher = MyPublisher {
        inner_publisher: PublisherImpl::new("publisher", 1),
    };
    let mut subscriber = MySubscriber {
        inner_subscriber: SubscriberImpl::new("subscriber"),
    };

    let mut my_service = MyService {
        subscriber: SubscriberImpl::new("my_service"),
        value: 0,
    };

    // The `MyServiceInterfaceClient` is generated by the `rpc_interface` macro
    // and allows to perform RPC calls from the publisher to the subscriber.
    let mut client = MyServiceInterfaceClient::new(PublisherImpl::new("client", 1));

    routes!(
        publisher -> subscriber: i32,
        client -> my_service: MyServiceInterfaceMessage,
    )
    .unwrap();

    let publisher_task = tokio::spawn(async move {
        publisher.publish(42).await.unwrap();
    });

    let subscriber_task = tokio::spawn(async move {
        let message = subscriber.receive().await;
        println!("message: {}", message)
    });

    let client_task = tokio::spawn(async move {
        let response = client.foo().await;
        println!("response to foo(): {}", response);

        let response = client.bar(100).await;
        println!("response to bar(): {:?}", response);
    });

    tokio::spawn(async move {
        // The run method is generated automatically for my_service as it implements
        // the `MyServiceInterface` trait and is a subscriber of the `MyServiceInterfaceMessage`.
        // It will handle incoming messages and perform the corresponding RPC calls.
        // The run method is blocking, so it should be run in a separate task.
        my_service.run().await;
    });

    client_task.await.unwrap();
    publisher_task.await.unwrap();
    subscriber_task.await.unwrap();
}

For more examples see the examples/ directory for usage examples.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Note

This project is inspired by the Tower project

Dependencies

~0.7–1MB
~19K SLoC