22 releases

new 0.1.21 Mar 18, 2025
0.1.20 Mar 17, 2025
0.1.19 Feb 27, 2025
0.1.16 Dec 17, 2024

#325 in Database interfaces

Download history 670/week @ 2024-12-04 814/week @ 2024-12-11 193/week @ 2024-12-18 2/week @ 2025-01-08 24/week @ 2025-01-15 4/week @ 2025-01-22 9/week @ 2025-02-05 27/week @ 2025-02-12 74/week @ 2025-02-19 714/week @ 2025-02-26 23/week @ 2025-03-05 46/week @ 2025-03-12

857 downloads per month

Apache-2.0

49KB
1K SLoC

Rust Source SDK for Drasi

This library provides the building blocks and infrastructure to implement a Drasi Source in Rust.

Getting started

Install the package

cargo add drasi-source-sdk

Example

Proxy

#[tokio::main]
async fn main() {
    let proxy = SourceProxyBuilder::new()
        .with_stream_producer(my_stream)
        .without_context()
        .build();

        proxy.start().await;    
}

async fn my_stream(_context: (), req: BootstrapRequest) -> Result<BootstrapStream, BootstrapError> {
    let stream = stream! {
        if req.node_labels.contains(&"Location".to_string()) {
            yield Ok(SourceElement::Node { 
                id: "Location-A".to_string(), 
                labels: vec!["Location".to_string()], 
                properties: vec![
                    ("longitude".to_string(), Value::Number(Number::from_f64(50.1).unwrap())),
                    ("latitude".to_string(), Value::Number(Number::from_f64(60.7).unwrap())),
                ].into_iter().collect(),
            });    

            yield Ok(SourceElement::Node { 
                id: "Location-B".to_string(), 
                labels: vec!["Location".to_string()], 
                properties: vec![
                    ("longitude".to_string(), Value::Number(Number::from_f64(58.9).unwrap())),
                    ("latitude".to_string(), Value::Number(Number::from_f64(72.1).unwrap())),
                ].into_iter().collect(),
            });    
        }
    };

    Ok(Box::pin(stream))
}

Reactivator

#[tokio::main]
async fn main() {
    let mut reactivator = ReactivatorBuilder::new()
        .with_stream_producer(my_stream)
        .without_context()
        .build()
        .await;

    reactivator.start().await;
}

async fn my_stream(_context: (), state_store: Arc<dyn StateStore + Send + Sync>) -> Result<ChangeStream, ReactivatorError> {
    
    let mut cursor = match state_store.get("cursor").await.unwrap() {
        Some(cursor) => u64::from_be_bytes(cursor.try_into().unwrap()),
        None => 0,
    };    
    
    let result = stream! {
        let start_location_id = "Location-A";        

        loop {
            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() * 1000;            

            cursor += 1;
            let vehicle_id = format!("vehicle-{}", cursor);
            let vehicle_node = SourceElement::Node {
                id: vehicle_id.clone(),
                labels: vec!["Vehicle".to_string()],
                properties: vec![
                    ("name".to_string(), Value::String(format!("Vehicle {}", cursor))),
                ].into_iter().collect(),
            };

            yield SourceChange::new(ChangeOp::Create, vehicle_node, time, cursor, None);

            cursor += 1;
            let vehicle_location_relation = SourceElement::Relation {
                id: format!("vehicle-loc-{}", cursor),
                start_id: vehicle_id,
                end_id: start_location_id.to_string(),
                labels: vec!["LOCATED_AT".to_string()],
                properties: Map::new(),
            };

            yield SourceChange::new(ChangeOp::Create, vehicle_location_relation, time, cursor, None);

            state_store.put("cursor", cursor.to_be_bytes().to_vec()).await.unwrap();
        }
        
    };

    Ok(Box::pin(result))
}

Dependencies

~20–33MB
~485K SLoC