2 releases

new 0.1.1 Mar 10, 2025
0.1.0 Mar 10, 2025

#3 in #fm

Download history 152/week @ 2025-03-05

152 downloads per month

MIT license

32KB
736 lines

Rocketman

A modular(ish) jetstream consumer. Backed by Tungstenite.

Installation

[dependencies]
rocketman = "latest" # pyt the latest version here
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Usage

#[tokio::main]
async fn main() {
    // init the builder
    let opts = JetstreamOptions::builder()
        // your EXACT nsids
        .wanted_collections(vec!["com.example.cool.nsid".to_string()])
        .build();
    // create the jetstream connector
    let jetstream = JetstreamConnection::new(opts);

    // create your ingestors
    let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
    ingestors.insert(
        // your EXACT nsid
        "com.example.cool.nsid".to_string(),
        Box::new(MyCoolIngestor),
    );


    // tracks the last message we've processed
    let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));

    // get channels
    let msg_rx = jetstream.get_msg_rx();
    let reconnect_tx = jetstream.get_reconnect_tx();

    // spawn a task to process messages from the queue.
    // this is a simple implementation, you can use a more complex one based on needs.
    let c_cursor = cursor.clone();
    tokio::spawn(async move {
        while let Ok(message) = msg_rx.recv_async().await {
            if let Err(e) =
                handler::handle_message(message, &ingestors, reconnect_tx.clone(), c_cursor.clone())
                    .await
            {
                error!("Error processing message: {}", e);
            };
        }
    });

    // connect to jetstream
    // retries internally, but may fail if there is an extreme error.
    if let Err(e) = jetstream.connect(cursor.clone()).await {
        error!("Failed to connect to Jetstream: {}", e);
        std::process::exit(1);
    }
}

pub struct MyCoolIngestor;

/// A cool ingestor implementation. Will just print the message. Does not do verification.
impl LexiconIngestor for MyCoolIngestor {
    async fn ingest(&self, message: Event<Value>) -> Result<()> {
        info!("{:?}", message);
        // Process message for default lexicon.
        Ok(())
    }
}

gratz

Based heavily on phil's jetstream consumer on atcosm constellation.

Dependencies

~10–20MB
~244K SLoC