3 releases
Uses new Rust 2024
new 0.1.2 | Apr 10, 2025 |
---|---|
0.1.1 | Apr 10, 2025 |
0.1.0 | Apr 10, 2025 |
#411 in Asynchronous
41 downloads per month
61KB
1K
SLoC
prk_eventbus: Event Bus Service
The Event Bus Service is a versatile, WebSocket-based event bus built in Rust using the prk_eventbus
library. It enables asynchronous, decoupled communication for distributed systems or within a single application, with SQLite-backed persistent storage. Whether embedded in an Axum server, run as a standalone service, or used for simple pub-sub scenarios, it supports robust publish-subscribe patterns with features like event batching, TTL, and acknowledgment.
Features
- WebSocket Interface: Real-time event handling via a WebSocket endpoint.
- Persistent Storage: SQLite storage for event durability and optional replay.
- Flexible Deployment: Run standalone, embed in Axum, or use in microservices.
- Scalable Design: Handles multiple clients with configurable channel capacity.
- Event Metadata: Supports topics, payloads, TTL, message IDs, and more.
- Batching: Publish multiple events efficiently in a single operation.
Prerequisites
- Rust: Version 1.65 or higher (with
cargo
). - SQLite: Embedded via
sqlx
; no separate installation needed.
Installation
-
Clone the Repository:
git clone https://github.com/prk-Jr/prk_eventbus.git cd prk_eventbus
-
Install Dependencies:
cargo build
-
Run the Service:
- Standalone:
cargo run
(see Standalone Usage). - Axum-Embedded: Configure as per Axum Integration.
- Standalone:
Configuration
- Port: Set via
serve
(standalone) orSocketAddr
(Axum) (e.g.,"127.0.0.1:3000"
). - Database: Adjust SQLite path in
SQLiteStorage::new
(e.g.,"eventbus.db"
). - WebSocket Path: Defaults to
/ws
; customize viaaxum_router
nesting. - Client Settings: Tune
ClientConfig
(e.g.,reconnect_interval
,max_retries
).
Usage
Standalone Usage (Server + Pub/Sub)
Run the event bus as a standalone server with a publisher and subscriber in one process, ideal for testing or simple applications.
Example: Chat Simulation stress test
- Server: Hosts the event bus at
ws://127.0.0.1:3000/ws
. - Publisher: Sends chat messages on
chat.user1
. - Subscriber: Listens to
chat.*
, acknowledges messages, and times out after 3 seconds.
use std::time::Duration;
use prk_eventbus::{adapters::{WsConfig, WsTransport}, client::{ClientConfig, EventBusClient}, core::error::EventBusError, storage::dummy_storage::NoStorage};
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() -> Result<(), EventBusError> {
#[cfg(feature = "tracing")]
{
tracing_subscriber::fmt()
.with_env_filter(std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()))
.init();
}
let server_handle: JoinHandle<()> = tokio::spawn(async {
let ws_config = WsConfig {
channel_capacity: 1000,
auto_ack: true,
};
let transport: WsTransport<NoStorage> = WsTransport::new(None, ws_config);
transport.serve("127.0.0.1:3000").await.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
let config = ClientConfig {
url: "ws://127.0.0.1:3000/ws".to_string(),
reconnect_interval: Duration::from_secs(2),
max_retries: 5,
};
let mut publisher = EventBusClient::connect(config.clone()).await?;
let mut subscriber = EventBusClient::connect(config).await?;
subscriber.subscribe("chat.*", None).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let sub_handle: JoinHandle<Result<(), EventBusError>> = tokio::spawn(async move {
#[cfg(feature = "tracing")]
tracing::info!("Subscriber task started");
let mut received = 0;
loop {
let mut messages = subscriber.messages();
match tokio::time::timeout(Duration::from_secs(5), messages.next()).await {
Ok(Ok(msg)) => {
let payload = String::from_utf8_lossy(&msg.payload);
#[cfg(feature = "tracing")]
if received % 100 == 0 {
tracing::info!(received = received, "Subscriber received message: {}", payload);
}
subscriber.acknowledge(msg.seq, msg.message_id).await?;
received += 1;
if received == 1000 { break; }
}
Ok(Err(e)) => return Err(e),
Err(_) => {
#[cfg(feature = "tracing")]
tracing::warn!(received = received, "Subscriber timed out");
break;
}
}
}
println!("Subscriber received {} messages", received);
Ok(())
});
for i in 0..1000 {
publisher.publish("chat.stress", &format!("Stress message {}", i), None, Some(3600)).await?;
}
tokio::time::sleep(Duration::from_secs(5)).await;
drop(publisher);
let _ = sub_handle.await;
server_handle.abort();
println!("Stress test completed!");
Ok(())
}
cargo run --features storage_tracing
Key Steps:
- Spawn a
WsTransport
server task. - Connect a publisher and subscriber via
EventBusClient
. - Publish single messages (
"Hello from User1!"
) and batches with TTL (3600s). - Subscriber processes messages until timeout, then exits.
Output:
Subscriber received: [chat.user1] Hello from User1!
Subscriber received: [chat.user1] How's it going?
Subscriber received: [chat.user1] Batch message 1
Subscriber received: [chat.user1] Batch message 2
Test completed!
Run with tracing: RUST_LOG=debug cargo run
.
Axum Integration (Producer)
Embed the event bus in an Axum server to host it alongside HTTP routes, publishing events internally.
Example: User Management Service
- Setup: Runs at
http://127.0.0.1:3000
with event bus atws://127.0.0.1:3000/ws
. - Routes:
POST /api/users
creates users and publishesuser.created
. - Client: Internal
EventBusClient
connects lazily to publish events.
Key Function:
pub fn axum_router<T: Clone + Sync + Send + 'static>(&self, state: T) -> Router<T> {
let storage = self.storage.clone();
let bus = self.bus.clone();
Router::new().route("/ws", get({
let storage = storage.clone();
move |ws| Self::handle_ws(ws, bus.clone(), storage.clone())
})).with_state(state)
}
Usage:
- Merge
WsTransport::axum_router
into the Axum router. - Publish events from endpoints:
curl -X POST http://127.0.0.1:3000/api/users -d '{"id": 1, "username": "alice"}'
→user.created
.
Microservice Integration (Consumer)
Connect a separate microservice to the event bus to subscribe and process events, enabling cross-service synchronization.
Example: User Sync Service
- Setup: Runs at
http://127.0.0.1:5000
, connects tows://127.0.0.1:3000/ws
. - Subscription: Listens to
user.*
in a background task. - Processing: On
user.created
, deserializes the payload and saves the user locally.
Workflow:
- Axum server publishes
user.created
. - Consumer receives it, logs, and syncs the user to its database.
Output:
Received message: CoreMessage { topic: "user.created", payload: "{\"id\":1,\"username\":\"alice\"}"... }
User created: User { id: 1, username: "alice" }
Event Format
Events are JSON-serialized:
topic
: String (e.g.,"user.created"
,"chat.user1"
).payload
: Bytes/String (e.g.,{"id": 1, "username": "alice"}
,"Hello from User1!"
).message_id
: Optional string.ttl
: Optional integer (seconds).seq
: Auto-incremented sequence for acknowledgment.
SQLite (eventbus.db
):
SELECT * FROM messages;
-- seq | topic | payload | metadata | ttl | status
-- 1 | user.created | {"id": 1, "username": "alice"} | {} | 0 | pending
-- 2 | chat.user1 | Hello from User1! | {} | 3600 | processed
Persistence
- Stored in
eventbus.db
withmessages
andprocessed_messages
tables. - Use
acknowledge
to mark events as processed (consumer example). - Replay events by subscribing with a starting
seq
(if supported).
Running with Microservices
- Standalone Chat:
cargo run
→ Runs server, publisher, and subscriber in one.
- Producer (Axum):
cargo run
→ Hosts atws://127.0.0.1:3000/ws
.- Test:
curl -X POST http://127.0.0.1:3000/api/users -d '{"id": 1, "username": "alice"}'
.
- Consumer (Microservice):
cargo run
→ Connects tows://127.0.0.1:3000/ws
, syncs users.
Troubleshooting
- Connection Issues: Verify WebSocket URL and server status.
- Event Loss: Check subscription timing (add delays if needed) or persistence settings.
- Timeouts: Adjust
tokio::time::timeout
durations in subscribers. - Tracing: Enable with
RUST_LOG=debug cargo run
for detailed logs.
Contributing
Fork, branch, commit, and submit a pull request:
git checkout -b feature/your-feature
git commit -m "Add your feature"
git push origin feature/your-feature
License
MIT License. See LICENSE.
Acknowledgments
Dependencies
~20–38MB
~696K SLoC