5 releases (breaking)
0.4.0 | Jan 2, 2025 |
---|---|
0.3.0 | Sep 10, 2024 |
0.2.0 | Feb 1, 2024 |
0.1.0 | Jan 27, 2024 |
0.0.0 | Nov 22, 2023 |
#2230 in Embedded development
174 downloads per month
Used in edge-net
11KB
136 lines
edge-mqtt
A wrapper for the rumqttc
crate that adapts it to the async MQTT traits of the embedded-svc
crate.
NOTE: Needs STD!
The plan for the future is to retire this crate in favor of rust-mqtt once the latter gets MQTT 3.1 compatibility, and implements a more ergonomic API where sending can be done independently from receiving MQTT messages.
... or implement a true no_std
no-alloc alternative - just like all other edge-*
crates - if rust-mqtt
does not see further development.
Example
use async_compat::CompatExt;
use embedded_svc::mqtt::client::asynch::{Client, Connection, Publish, QoS};
use embedded_svc::mqtt::client::Event;
use embassy_futures::select::{select, Either};
use embassy_time::{Duration, Timer};
use edge_mqtt::io::{AsyncClient, MqttClient, MqttConnection, MqttOptions};
use log::*;
const MQTT_HOST: &str = "broker.emqx.io";
const MQTT_PORT: u16 = 1883;
const MQTT_CLIENT_ID: &str = "edge-mqtt-demo";
const MQTT_TOPIC: &str = "edge-mqtt-demo";
fn main() {
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);
let (client, conn) = mqtt_create(MQTT_CLIENT_ID, MQTT_HOST, MQTT_PORT).unwrap();
futures_lite::future::block_on(
run(client, conn, MQTT_TOPIC).compat(), /* necessary for tokio */
)
.unwrap()
}
async fn run<M, C>(mut client: M, mut connection: C, topic: &str) -> Result<(), anyhow::Error>
where
M: Client + Publish + 'static,
M::Error: std::error::Error + Send + Sync + 'static,
C: Connection + 'static,
{
info!("About to start the MQTT client");
info!("MQTT client started");
client.subscribe(topic, QoS::AtMostOnce).await?;
info!("Subscribed to topic \"{topic}\"");
let res = select(
async move {
info!("MQTT Listening for messages");
while let Ok(event) = connection.next().await {
info!("[Queue] Event: {}", event.payload());
}
info!("Connection closed");
Ok(())
},
async move {
// Just to give a chance of our connection to get even the first published message
Timer::after(Duration::from_millis(500)).await;
let payload = "Hello from edge-mqtt-demo!";
loop {
client
.publish(topic, QoS::AtMostOnce, false, payload.as_bytes())
.await?;
info!("Published \"{payload}\" to topic \"{topic}\"");
let sleep_secs = 2;
info!("Now sleeping for {sleep_secs}s...");
Timer::after(Duration::from_secs(sleep_secs)).await;
}
},
)
.await;
match res {
Either::First(res) => res,
Either::Second(res) => res,
}
}
fn mqtt_create(
client_id: &str,
host: &str,
port: u16,
) -> Result<(MqttClient, MqttConnection), anyhow::Error> {
let mut mqtt_options = MqttOptions::new(client_id, host, port);
mqtt_options.set_keep_alive(core::time::Duration::from_secs(10));
let (rumqttc_client, rumqttc_eventloop) = AsyncClient::new(mqtt_options, 10);
let mqtt_client = MqttClient::new(rumqttc_client);
let mqtt_conn = MqttConnection::new(rumqttc_eventloop);
Ok((mqtt_client, mqtt_conn))
}
Dependencies
~4–13MB
~150K SLoC