8 releases
Uses new Rust 2024
new 0.3.5 | Apr 18, 2025 |
---|---|
0.3.4 | Apr 18, 2025 |
0.2.4 | Apr 15, 2025 |
0.1.0 | Apr 12, 2025 |
#202 in Asynchronous
345 downloads per month
37KB
760 lines
Queue Workers
⚠️ Work in Progress: This crate is under active development and is not yet ready for production use. The API is unstable and may undergo significant changes. Feel free to experiment and provide feedback, but please do not use in production environments.
A Redis-backed job queue system for Rust applications with support for retries and concurrent workers.
Features
- Redis-backed persistent job queue
- Automatic job retries with configurable attempts and delay
- Concurrent worker support
- Async/await based API
- Serializable jobs using Serde
- Type-safe job definitions
Prerequisites
- Rust 1.86.0 or later
- Redis server (local or remote)
- Docker and Docker Compose (for development)
Quick Start
- Add the dependency to your
Cargo.toml
:
[dependencies]
queue_workers = "0.1.0"
- Define your job:
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use queue_workers::job::Job;
#[derive(Debug, Serialize, Deserialize)]
struct EmailJob {
id: String,
to: String,
subject: String,
body: String,
}
#[async_trait]
impl Job for EmailJob {
type Output = String;
type Error = String;
async fn execute(&self) -> Result<Self::Output, Self::Error> {
// Implement your job logic here
Ok(format!("Email sent to {}", self.to))
}
}
- Create a queue and worker:
use queue_workers::{redis_queue::RedisQueue, worker::{Worker, WorkerConfig}};
use std::time::Duration;
#[tokio::main]
async fn main() {
// Initialize the queue
let queue = RedisQueue::<EmailJob>::new(
"redis://127.0.0.1:6379",
"email_queue"
).expect("Failed to create queue");
// Configure the worker
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
// Create and start the worker
let worker = Worker::new(queue.clone(), config);
// Push a job
let job = EmailJob {
id: "email-1".to_string(),
to: "user@example.com".to_string(),
subject: "Hello".to_string(),
body: "World".to_string(),
};
queue.push(job).await.expect("Failed to push job");
// Start processing jobs
worker.start().await.expect("Worker failed");
}
Development Setup
- Clone the repository:
git clone https://github.com/yourusername/queue_workers.git
cd queue_workers
- Install development dependencies:
rustup component add rustfmt
rustup component add clippy
- Set up git hooks:
chmod +x scripts/setup-git-hooks.sh
./scripts/setup-git-hooks.sh
- Start Redis using Docker Compose:
docker-compose up -d redis
- Run the tests:
cargo test
Code Quality
This project enforces code quality through:
- Formatting using
rustfmt
- Linting using
clippy
To manually run the checks:
# Check formatting
cargo fmt -- --check
# Run clippy
cargo clippy -- -D warnings
These checks run automatically:
- As a pre-commit hook when committing changes
- In CI/CD pipeline for all pull requests
Configuration
Worker Configuration
The WorkerConfig
struct allows you to customize worker behavior:
let config = WorkerConfig {
retry_attempts: 3, // Number of retry attempts for failed jobs
retry_delay: Duration::from_secs(5), // Delay between retries
shutdown_timeout: Duration::from_secs(30), // Graceful shutdown timeout
};
Redis Configuration
The Redis queue can be configured with a Redis URL and queue name:
let queue = RedisQueue::<MyJob>::new(
"redis://username:password@hostname:6379/0", // Redis URL with authentication
"my_queue_name"
).expect("Failed to create queue");
Queue Types
The queue supports both FIFO (First In, First Out) and LIFO (Last In, First Out) behaviors:
use queue_workers::{redis_queue::RedisQueue, queue::QueueType};
// Create a FIFO queue (default behavior)
let fifo_queue = RedisQueue::<MyJob>::new(redis_url, "fifo_queue")?;
// Create a LIFO queue
let lifo_queue = RedisQueue::<MyJob>::with_type(
redis_url,
"lifo_queue",
QueueType::LIFO
)?;
- FIFO: Jobs are processed in the order they were added (oldest first)
- LIFO: Jobs are processed in reverse order (newest first)
Running Multiple Workers
You can run multiple workers processing the same queue:
let queue = RedisQueue::<EmailJob>::new(redis_url, "email_queue")?;
// Spawn multiple workers
for _ in 0..3 {
let worker_queue = queue.clone();
let worker = Worker::new(worker_queue, WorkerConfig::default());
tokio::spawn(async move {
worker.start().await.expect("Worker failed");
});
}
Error Handling
The library provides a custom error type QueueWorkerError
that covers various failure scenarios:
- Redis connection issues
- Serialization errors
- Job not found
- Worker errors
- Connection timeouts
Worker Types
The library provides two types of workers:
Sequential Worker
Processes jobs one at a time, with retry support:
use queue_workers::{
redis_queue::RedisQueue,
worker::{Worker, WorkerConfig}
};
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
let worker = Worker::new(queue.clone(), config);
worker.start().await?;
Concurrent Worker
Processes multiple jobs in parallel:
use queue_workers::{
redis_queue::RedisQueue,
concurrent_worker::{ConcurrentWorker, ConcurrentWorkerConfig}
};
let config = ConcurrentWorkerConfig {
max_concurrent_jobs: 5, // Process 5 jobs simultaneously
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
let worker = ConcurrentWorker::new(queue.clone(), config);
worker.start().await?;
// Or with shutdown support:
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
let worker = ConcurrentWorker::new(queue.clone(), config);
worker.start_with_shutdown(shutdown_rx).await?;
Logging
This library uses the log
crate as a logging facade, allowing you to choose your preferred logging implementation. For basic usage:
// Using env_logger
env_logger::init();
// Or using tracing
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
For detailed logging configuration, including production setups and testing configurations, see LOGGING.md.
Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Dependencies
~10–19MB
~251K SLoC