59 releases

0.21.1 Dec 16, 2020
0.20.23 Oct 15, 2020
0.20.21 Jul 31, 2020
0.20.2 Mar 29, 2020
0.1.4 Dec 23, 2018

#3 in #amazon-sqs

Download history 25/week @ 2024-07-06 17/week @ 2024-07-27 63/week @ 2024-09-28

63 downloads per month
Used in 2 crates

MIT license

77KB
2K SLoC

sqs-lambda

A Rust crate for writing AWS Lambdas that are triggered by SQS

This library allows you to turn single-event SQS-triggered lambdas into something that's more like a streaming processor. This is useful for high throughput worklodas.

This example shows using this library alongside the lambda_runtime.

In this example we are:

  • Triggered by SqsEvents from S3 Notify Events
  • The S3PayloadRetriever uses the ZstdDecoder to download and decompress the payload
  • We spawn 40 EventProcessors, wrapping our CustomService
  • The completion handler will merge completed events before uploading to S3
fn my_handler(event: SqsEvent, ctx: Context) -> Result<(), HandlerError> {
    tokio_compat::run_std(
        async {

            let queue_url = std::env::var("QUEUE_URL").expect("QUEUE_URL");
            info!("Queue Url: {}", queue_url);
            let output_bucket = "event-destination-bucket";

            let region = {
                let region_str = std::env::var("AWS_REGION").expect("AWS_REGION");
                Region::from_str(&region_str).expect("Region error")
            };

            info!("Defining consume policy");
            let consume_policy = ConsumePolicy::new(
                ctx, // Use the Context.deadline from the lambda_runtime
                Duration::from_secs(2), // Stop consuming when there's 2 seconds left in the runtime
                3, // If we get 3 empty receives in a row, stop consuming
            );

            info!("Defining consume policy");
            let (tx, shutdown_notify) = tokio::sync::oneshot::channel();

            info!("SqsConsumer");
            let sqs_consumer = SqsConsumerActor::new(
                SqsConsumer::new(SqsClient::new(region.clone()), queue_url.clone(), consume_policy, tx)
            );

            info!("SqsCompletionHandler");
            let sqs_completion_handler = SqsCompletionHandlerActor::new(
                SqsCompletionHandler::new(
                    SqsClient::new(region.clone()),
                    queue_url.to_string(),
                    SubgraphSerializer { proto: Vec::with_capacity(1024) },
                    S3EventEmitter::new(
                        S3Client::new(region.clone()),
                        bucket.to_owned(),
                        time_based_key_fn,
                    ),
                    CompletionPolicy::new(
                        1000, // Buffer up to 1000 messages
                        Duration::from_secs(30), // Buffer for up to 30 seconds
                    ),
                )
            );

            info!("EventProcessors");
            let event_processors: Vec<_> = (0..40)
                .map(|_| {
                    EventProcessorActor::new(EventProcessor::new(
                        sqs_consumer.clone(),
                        sqs_completion_handler.clone(),
                        CustomService {},
                        S3EventRetriever::new(S3Client::new(region.clone()), ZstdDecoder::default()),
                    ))
                })
                .collect();

            info!("Start Processing");

            futures::future::join_all(event_processors.iter().map(|ep| ep.start_processing())).await;

            let mut proc_iter = event_processors.iter().cycle();
            for event in event.records {
                let next_proc = proc_iter.next().unwrap();
                next_proc.process_event(
                    map_sqs_message(event)
                ).await;
            }

            info!("Waiting for shutdown notification");
            // Wait for the consumers to shutdown
            let _ = shutdown_notify.await;

            tokio::time::delay_for(Duration::from_millis(100)).await;
            info!("Consumer shutdown");

        });


    info!("Completed execution");
    Ok(())
}

Dependencies

~31–43MB
~769K SLoC