11 releases (7 breaking)
new 0.16.0 | Jan 15, 2025 |
---|---|
0.15.0 | Dec 11, 2024 |
0.14.0 | Nov 14, 2024 |
0.13.0 | Nov 1, 2024 |
0.9.3 | Oct 30, 2024 |
#688 in Web programming
146 downloads per month
285KB
6.5K
SLoC
go-zoom-kinesis ð
A robust, production-ready AWS Kinesis stream processor with checkpointing and retry capabilities. Built with reliability and performance in mind.
Features ð
- âĻ Automatic checkpointing with multiple storage backends
- ð Configurable retry logic with exponential backoff
- ðžïļ Comprehensive error handling
- ð Multiple shard processing
- ðĨ DynamoDB checkpoint storage support
- ð Detailed tracing and monitoring
- ðĶ Graceful shutdown handling
- ðĒŠ Production-ready with extensive test coverage
- ð§ Configurable stream position initialization
- ð Smart checkpoint recovery with fallback options
Basic Usage ð
use go_zoom_kinesis::{
KinesisProcessor, ProcessorConfig, RecordProcessor,
processor::RecordMetadata, processor::InitialPosition,
store::InMemoryCheckpointStore,
monitoring::MonitoringConfig,
error::{ProcessorError, ProcessingError},
};
use aws_sdk_kinesis::{Client, types::Record};
use std::time::Duration;
use async_trait::async_trait;
#[derive(Clone)]
struct MyProcessor;
#[async_trait]
impl RecordProcessor for MyProcessor {
type Item = ();
async fn process_record<'a>(
&self,
record: &'a Record,
metadata: RecordMetadata<'a>,
) -> Result<Option<Self::Item>, ProcessingError> {
println!("Processing record: {:?}", record);
Ok(None)
}
}
#[tokio::main]
async fn main() -> Result<(), ProcessorError> {
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = Client::new(&config);
let config = ProcessorConfig {
stream_name: "my-stream".to_string(),
batch_size: 100,
api_timeout: Duration::from_secs(30),
processing_timeout: Duration::from_secs(300),
max_retries: Some(3),
shard_refresh_interval: Duration::from_secs(60),
initial_position: InitialPosition::TrimHorizon,
prefer_stored_checkpoint: true,
monitoring: MonitoringConfig {
enabled: true,
..Default::default()
},
..Default::default()
};
let processor = MyProcessor;
let store = InMemoryCheckpointStore::new();
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let (processor, _monitoring_rx) = KinesisProcessor::new(
config,
processor,
client,
store,
);
processor.run(shutdown_rx).await
}
Contributing ðŠ
Contributions are welcome! Please feel free to submit a Pull Request.
License ð
This project is licensed under the MIT License - see the LICENSE file for details.
Support ð
If you have any questions or run into issues, please open an issue on GitHub.
Dependencies
~26â36MB
~492K SLoC