2 releases
new 0.39.0-flyradar.1 | Mar 12, 2025 |
---|
#10 in #nats-client
745KB
13K
SLoC
This repository is a patched fork of the async-nats crate, with modifications to enable IPC communication with the Fly agent. These changes were made to address immediate needs independently of the upstream project.
The primary modifications include support for IPC communication, which isn't available in the original crate. This fork is published to crates.io under a different name to allow immediate use.
If you're looking for the original implementation, please visit the official NATS.rs repository.
lib.rs
:
A Rust asynchronous client for the NATS.io ecosystem.
To access the repository, you can clone it by running:
git clone https://github.com/nats-io/nats.rs
NATS.io is a simple, secure, and high-performance open-source messaging system designed for cloud-native applications, IoT messaging, and microservices architectures.
Note: The synchronous NATS API is deprecated and no longer actively maintained. If you need to use the deprecated synchronous API, you can refer to: https://crates.io/crates/nats
For more information on NATS.io visit: https://nats.io
Examples
Below, you can find some basic examples on how to use this library.
For more details, please refer to the specific methods and structures documentation.
Complete example
Connect to the NATS server, publish messages and subscribe to receive messages.
use bytes::Bytes;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
// Connect to the NATS server
let client = async_nats::connect("demo.nats.io").await?;
// Subscribe to the "messages" subject
let mut subscriber = client.subscribe("messages").await?;
// Publish messages to the "messages" subject
for _ in 0..10 {
client.publish("messages", "data".into()).await?;
}
// Receive and process messages
while let Some(message) = subscriber.next().await {
println!("Received message {:?}", message);
}
Ok(())
}
Publish
Connect to the NATS server and publish messages to a subject.
// Connect to the NATS server
let client = async_nats::connect("demo.nats.io").await?;
// Prepare the subject and data
let subject = "foo";
let data = Bytes::from("bar");
// Publish messages to the NATS server
for _ in 0..10 {
client.publish(subject, data.clone()).await?;
}
// Flush internal buffer before exiting to make sure all messages are sent
client.flush().await?;
Subscribe
Connect to the NATS server, subscribe to a subject and receive messages.
// Connect to the NATS server
let client = async_nats::connect("demo.nats.io").await?;
// Subscribe to the "foo" subject
let mut subscriber = client.subscribe("foo").await.unwrap();
// Receive and process messages
while let Some(message) = subscriber.next().await {
println!("Received message {:?}", message);
}
JetStream
To access JetStream API, create a JetStream jetstream::Context.
// Connect to the NATS server
let client = async_nats::connect("demo.nats.io").await?;
// Create a JetStream context.
let jetstream = async_nats::jetstream::new(client);
// Publish JetStream messages, manage streams, consumers, etc.
jetstream.publish("foo", "bar".into()).await?;
Key-value Store
Key-value Store is accessed through jetstream::Context.
// Connect to the NATS server
let client = async_nats::connect("demo.nats.io").await?;
// Create a JetStream context.
let jetstream = async_nats::jetstream::new(client);
// Access an existing key-value.
let kv = jetstream.get_key_value("store").await?;
Object Store store
Object Store is accessed through jetstream::Context.
// Connect to the NATS server
let client = async_nats::connect("demo.nats.io").await?;
// Create a JetStream context.
let jetstream = async_nats::jetstream::new(client);
// Access an existing key-value.
let kv = jetstream.get_object_store("store").await?;
Service API
Service API is accessible through [Client] after importing its trait.
use async_nats::service::ServiceExt;
// Connect to the NATS server
let client = async_nats::connect("demo.nats.io").await?;
let mut service = client
.service_builder()
.description("some service")
.stats_handler(|endpoint, stats| serde_json::json!({ "endpoint": endpoint }))
.start("products", "1.0.0")
.await?;
Dependencies
~163MB
~3M SLoC