#events #dispatcher #message #async #send-message #tokio #unbounded-channel

synapps

Simple event dispatcher for Rust applications. It allows senders to send messages to topics. Subscribers will then receive the message based on their subscription policy.

1 unstable release

0.2.0 Jan 2, 2025

#881 in Asynchronous

Download history 133/week @ 2025-01-01 7/week @ 2025-01-08

140 downloads per month

Custom license

21KB
313 lines

Synapps

Description

Synapps is a simple event dispatcher for Rust applications. It allows senders to send messages to topics. Subscribers will then receive the message based on their subscription policy.

It proposes a very naïve (yet sufficient, at least for me) approach to handle event driven development.

The EventDispatcher is responsible for managing the distribution of events to various subscribers based on specific patterns. Here's a high-level overview of its functionality:

Event Registration: Subscribers can register to receive events that match a particular pattern. This is typically done through a PatternValidator which determines if an event's topic matches the subscriber's interest.

Event Dispatching: The dispatcher receives events from producers and forwards them to the appropriate subscribers whose pattern validators match the event's topic.

Concurrency Handling: The dispatcher operates asynchronously, allowing multiple producers and subscribers to interact concurrently without blocking each other.

Lifecycle Management: The dispatcher manages the lifecycle of event processing, ensuring that it continues to run as long as there are active producers and subscribers. It exits cleanly when there are no more producers sending events.

In summary, the EventDispatcher acts as a mediator that routes events from producers to subscribers based on matching patterns, handling concurrency and lifecycle management in an asynchronous environment.

use chrono::Utc;
use serde::Serialize;
use std::{sync::Arc, time::Duration};
use tokio::{sync::mpsc::unbounded_channel, task::yield_now, time::timeout, select};

use synapps::*;

#[derive(Serialize, Clone, PartialEq, Eq)]
pub struct Message(String);

impl Event for Message {}

#[tokio::test]
async fn test() {
    let validator = Arc::new(TruePatternValidator);
    let mut builder = EventDispatcherBuilder::<Message>::default();
    let sender = builder.get_producer();
    let mut receiver1 = builder.register("service", validator.clone());
    let mut receiver2 = builder.register("service", validator.clone());
    let mut receiver3 = builder.register("other_service", validator);
    let dispatcher = builder.build();
    let task = tokio::spawn(async move {
        dispatcher.execute().await.unwrap();
    });

    let event_message = EventMessage {
        timestamp: Utc::now(),
        sender: "other_service".to_string(),
        topic: "topic".to_string(),
        event: Message("This is a message".to_string()),
    };
    sender.send(event_message.clone()).unwrap();

    // Drop the sender to simulate no senders left
    drop(sender);

    // make tokio scheduler to switch task.
    yield_now().await;

    let message1 = receiver1.try_recv().expect("Expecting a message from receiver1, got none.");
    assert_eq!(event_message, message1);

    let message2 = receiver2.try_recv().expect("Expecting a message from receiver2, got none.");
    assert_eq!(event_message, message2);

    // since receiver3 is registered as `other_service`, it should not
    // receive the message from `other_service`
    receiver3.try_recv().expect_err("Expecting no message from receiver3.");

   // Since there is no more messages and the sender is dropped, the dispatcher
   // should exit.
   select! {
       _ = task => Ok(()),
       _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => Err(()),
   }
   .expect("The dispatcher did not exit.");
}

Dependencies

~3.5–9.5MB
~86K SLoC