#stream #callback #mongo-db #events #collection #operation #handle

rs-mongo-stream

Wrapper on mongo to easier the way to handle mongo stream

3 unstable releases

new 0.3.1 Mar 8, 2025
0.3.0 Mar 8, 2025
0.2.0 Mar 7, 2025
0.1.0 Mar 7, 2025
0.0.1 Mar 6, 2025

#608 in Database interfaces

GPL-3.0 license

29KB
348 lines

rs-mongo-stream

A MongoDB change stream library for Rust applications

GitHub last commit CI Codecov Docs Crates.io crates.io

Overview

rs-mongo-stream is a Rust library that makes it easy to work with MongoDB change streams. It provides a simple interface to monitor and react to changes in your MongoDB collections by registering callbacks for different database events (insertions, updates, and deletions).

Features

  • Event-based architecture: Register callbacks for insert, update, and delete operations
  • Asynchronous support: Built with Tokio for fully asynchronous operation
  • Automatic reconnection: Handles stream disruptions gracefully
  • Implements Stream trait: Can be used with standard Rust stream operations
  • Type-safe callbacks: Strong typing for your event handlers

Installation

Add the crate to your Cargo.toml:

[dependencies]
rs-mongo-stream = "0.1.0"
mongodb = "2.4.0"
tokio = { version = "1", features = ["full"] }

Usage Example

use mongodb::{Client, options::ClientOptions};
use rs_mongo_stream::{MongoStream, Event};
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to MongoDB
    let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
    let client = Client::with_options(client_options)?;
    let db = client.database("my_database");
    
    // Create the stream monitor
    let mut stream = MongoStream::new(client.clone(), db);
    
    // Register callbacks for a collection
    stream.add_callback("users", Event::Insert("".to_string()), |event| {
        Box::pin(async move {
            println!("New user inserted: {:?}", event);
            // Handle the insertion...
        })
    });
    
    stream.add_callback("users", Event::Update("".to_string()), |event| {
        Box::pin(async move {
            println!("User updated: {:?}", event);
            // Handle the update...
        })
    });
    
    stream.add_callback("users", Event::Delete("".to_string()), |event| {
        Box::pin(async move {
            println!("User deleted: {:?}", event);
            // Handle the deletion...
        })
    });
    
    // Start monitoring the collection
    stream.start_stream("users").await?;
    
    Ok(())
}

Error Handling

The library provides a custom MongoStreamError type that wraps errors from the MongoDB driver and adds context when needed.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Dependencies

~27–38MB
~705K SLoC