#mqtt #publisher #actor #adapter #topic #message #port

mqtt_actor

An mqtt adapters (from port and adapters) to be used with actix actors

7 releases (2 stable)

1.0.1 Dec 8, 2022
0.2.5 Dec 6, 2022

#798 in Asynchronous

22 downloads per month

MIT/Apache

25KB
313 lines

Purpouse

The goal is use Mqtt with actors.

While using ports and adpates ( a.k.a. hexagonal architecture) we may want to use actix actors to implement the core. Specially if multiple protocols are to be used.

If one of those protocols is MQTT then this is the adapter you need.

Description

This crate is asyncrhonous and should be run within actix_rs executor.

Tough the name suggest this crate is about implementing the adapter as an actor, actually is not pure actor.

It has an actor to let you have an address to allow the core sending messages to topics: MqttPublisher, but the main job is carried by the struct: MqttConnectionManager, including connecting to the server and creating the former actor.

architecture

Usage

Create an instance of MqttConnectionManager. Use the from method and provide all the connection parameters.

Then add the subscriptions you may need before starting the MqttConnectionManager`.

Do all steps inside a actix_rs executor.

For example (more and better examples in the documentation)

#[actix_rt::main]
async fn main() {
    let my_actor_address = MyActor::new().start();

    let (publisher, listener) = MqttConnectionManager::from(
        String::from(CLIENT_NAME),
        String::from(IP),
        1883,
        Some(String::from(USER)),
        Some(String::from(PASS)),
        })
        .add_subscription( Box::new(DumpHandler{topic: String::from("testing_topic/a")} ))
        .add_subscription( Box::new(MyHandler{topic: String::from("testing_topic/b"), address: my_actor_address} ))
        .start().await;
    listener.await;
}

Sending Messages

Externd the MqttPublisher with the handler of message of your choice.

Use the methods defined in MqttPublisherto send the message

pub struct StringMessage{
    pub payload : String,
    pub topic: String,
}

impl Message for StringMessage {
    type Result = ();
}

impl Handler<StringMessage> for MqttPublisher {
    type Result = ();

    fn handle(&mut self, msg: StringMessage, _: &mut Self::Context) -> Self::Result {
        self.publish_str(&msg.topic , &msg.payload);
        ()
    }
}

Normal responsivities of this impl should limit to:

  • know the topic
  • serialize the message

Testing

MqttPublisher is a type alias that maps to MqttPublisherActor. But during unit testing it is mapped to MqttPublisherActor_Mock.

This mock does not connect to anyplace and does not need the MqttConnectionManager to be initialized or even created.

Helpers

The crate includes a few elements that are not needed but can be useful.

subscriptions

Two subscriptions are provided for both: testing and example.

DumpHandler is the minimal implementation. Shows the subscription to a topic and prints (dumps) the received message in the standard output. It can be used in order to check if the message is read or not during manual tests.

EchoHandler is a demo on how to implment a handler that can use the internal Actor MqttPublisher to send a message. This is the equivalent of a actix-web handler (request-response). This handler can also be used as-is to implement a keep-alive mechaninsm for an external watchdog.

publisher

There is only one provided handler, it extend MqttPublisher to accept StringMessage.

This message is used by EchoHandler but is not limited to it.

You can use it to send any String to any topic.

mocks

MqttPublisherActor_Mock is where the alias MqttPublisher points during unit testing (not any other kind of test). Therefore your unit tests does not need a MQTT connection.

Any message received by this mock will be accepted and no painc should be expected regardless of the state of the actor or the connector manager.

(capabilities of this mock are changing, cannot describe it here)

Dependencies

~5–14MB
~171K SLoC