1 unstable release

0.1.0 Dec 3, 2024

#1007 in Rust patterns


Used in caryatid_process

Apache-2.0

25KB
313 lines

Caryatid module SDK

This is an SDK for building modules in Caryatid. It provides a #[module] macro which does most of the hard work - all you need to do is implement an init function, which gets passed a context which contains the message bus and global configuration, and a config which is the subset of configuration for your particular module.

You need to choose a message type that this module will handle. This can be a generic format like JSON, or a specific Message enum for all the messages in your application. The message type has to be common across all modules built into a single process.

Template module

So a module looks like this:

use caryatid_sdk::{Context, MessageBusExt, Module, module};
use std::sync::Arc;
use anyhow::Result;
use config::Config;

// Define a message type
type MType = serde_json::Value;

// Define it as a module, with a name and description
#[module(
    message_type(MType),
    name = "my-module",
    description = "An example module"
)]
pub struct MyModule;

impl MyModule {
    // Implement the single initialisation function, with application
    // Context and this module's Config
    fn init(&self, context: Arc<Context<MType>>, config: Arc<Config>) -> Result<()> {
      // ... Read configuration from config
      // ... Register message subscribers and/or request handlers on context.message_bus
      // ... Start other async processes
    }
}

Message bus

The SDK provides a MessageBus trait which is implemented by a collection of units in process. As far as a module is concerned it doesn't need to know what kind of message bus it is talking to, whether messages are routed internal or externally, or both. It just publishes and subscribes on the bus given in context.message_bus.

Simple pub-sub

For pure publish-subscribe, where messages are events describing some change in the system, and don't require a response, you can call context.message_bus.publish() and context.message_bus.subscribe():

Publishing

Assuming the module is created with message_type serde_json::Value as above:

    let topic = "test.simple";
    let test_message = Arc::new(json!({
        "message": "Hello, world!",
    }));

    context.message_bus.publish(topic, test_message)
      .await
      .expect("Failed to publish message");

Note that the message must be wrapped in an Arc to be passed around. You can see that publish is async, so this needs to be called in an async context.

Subscribing

    let topic = "test.simple";

    context.message_bus.subscribe(&topic,
                                  |message: Arc<serde_json::Value>| {
       info!("Received: {:?}", message);
    })?;

The subscriber is a closure (lambda) which takes an Arc containing the message type.

A full version of this can be found in examples/simple.

Request-response

The SDK also offers a request-response model, layered on top of the basic pub-sub in an implementation-independent way. In this case you call context.message_bus.request() and context.message_bus.handle():

Requesting

    let topic = "test.simple";
    let test_message = Arc::new(json!({
        "message": "Hello, world!",
    }));

    match context.message_bus.request(topic, test_message).await {
      Ok(response) => { info!("Got response: {:?}", response); },
      Err(e) => { error!("Got error: {e}"); }
    }

request() returns a Result, which either contains an Arc of the response message, or an error if nothing responded (after a timeout). Note that application-level errors need to be encoded into the message type - in this case, in the JSON object.

Responding

Requests are responded to by handlers, which are usually async functions taking and returning an Arc with the message type.

    async fn handler(message: Arc<Value>) -> Arc<Value> {
        let response = json!({
          "message": "Pleased to meet you!",
        });

        Arc::new(response)
    }

    let topic = "test.simple";
    context.message_bus.handle(topic, handler)?;

Note that as above the handler must return a valid message - it cannot return an Error.

A full version of request/response can be found in examples/request.

Typed Messages

It is perfectly possible to communicate with JSON objects, or even strings, and some applications may wish to do so. However, there is a cost both in execution time and code complexity in packing and unpacking JSON for every message.

The Caryatid SDK therefore provides a way to specify messages as native Rust types. Since in any real-world system there will be more than one type, this is done by defining a system-wide enum of all the messages - e.g.

#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct Test {
    pub data: String,
    pub number: i64
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Message {
    None(()),                 // Just so we have a simple default
    Test(Test),               // A custom struct
    String(String),           // Simple string
    JSON(serde_json::Value),  // Any JSON value
}

impl Default for Message {
    fn default() -> Self {
        Message::None(())
    }
}

The enum Message has a custom struct Test but also general options String and JSON which can be used for extensibility.

Typed subscription

A subscriber module using this Message type would now look like this:

// ... other imports ...
use crate::message::Message;

#[module(
    message_type(Message),
    name = "typed-subscriber",
    description = "Typed subscriber module"
)]
pub struct TypedSubscriber;

impl TypedSubscriber {

    fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {

        let topic = "test.simple";
        context.message_bus.subscribe(topic, |message: Arc<Message>| {
            match message.as_ref()
            {
                Message::None(_) => error!("Received empty message!"),
                Message::Test(test) => info!("Received test: {} {}", test.data, test.number),
                Message::String(s) => info!("Received string {s}"),
                Message::JSON(json) => info!("Received JSON {:?}", json),
                _ => error!("Unexpected message type")
            }
        })?;
    }

This example matches every possible option - normally a subscriber would just test for one, and error if it received anything else.

Typed publication

The publisher using just one of the options of Message, would look like this:

use crate::message::{Test, Message};

#[module(
    message_type(Message),
    name = "typed-publisher",
    description = "Typed publisher module"
)]
pub struct TypedPublisher;

impl TypedPublisher {

    // Context and this module's Config
    fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
        let message_bus = context.message_bus.clone();
        let topic = "test.simple";

        tokio::spawn(async move {
            let test_message = Message::Test(Test {
                data: "Hello, world!".to_string(),
                number: 42
            });
            message_bus.publish(topic, Arc::new(test_message))
                .await
                .expect("Failed to publish message");
        });
    }
}

The full versions of TypedSubscriber and TypedPublisher can be found in examples/typed.

Module configuration

The module's init function is also passed a config::Config extracted from the global configuration file specifically for this module. You can use this like any other config to get configuration values.

For example, if your module was called my-module and the global configuration file contained:

[module.my-module]
topic = "test.simple"
count = 10

You could get these values with

        let topic = config.get_string("topic")?;
        let count = config.get::<u32>("count")?;

(better error handling or defaulting is left as an exercise for the reader!)

Dependencies

~7–14MB
~166K SLoC