#worker #jobs #queue #redis #async

queue_workers

A Redis-backed job queue system for Rust applications

8 releases

Uses new Rust 2024

new 0.3.5 Apr 18, 2025
0.3.4 Apr 18, 2025
0.2.4 Apr 15, 2025
0.1.0 Apr 12, 2025

#202 in Asynchronous

Download history 345/week @ 2025-04-11

345 downloads per month

MIT license

37KB
760 lines

Queue Workers

⚠️ Work in Progress: This crate is under active development and is not yet ready for production use. The API is unstable and may undergo significant changes. Feel free to experiment and provide feedback, but please do not use in production environments.

A Redis-backed job queue system for Rust applications with support for retries and concurrent workers.

Features

  • Redis-backed persistent job queue
  • Automatic job retries with configurable attempts and delay
  • Concurrent worker support
  • Async/await based API
  • Serializable jobs using Serde
  • Type-safe job definitions

Prerequisites

  • Rust 1.86.0 or later
  • Redis server (local or remote)
  • Docker and Docker Compose (for development)

Quick Start

  1. Add the dependency to your Cargo.toml:
[dependencies]
queue_workers = "0.1.0"
  1. Define your job:
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use queue_workers::job::Job;

#[derive(Debug, Serialize, Deserialize)]
struct EmailJob {
    id: String,
    to: String,
    subject: String,
    body: String,
}

#[async_trait]
impl Job for EmailJob {
    type Output = String;
    type Error = String;

    async fn execute(&self) -> Result<Self::Output, Self::Error> {
        // Implement your job logic here
        Ok(format!("Email sent to {}", self.to))
    }
}
  1. Create a queue and worker:
use queue_workers::{redis_queue::RedisQueue, worker::{Worker, WorkerConfig}};
use std::time::Duration;

#[tokio::main]
async fn main() {
    // Initialize the queue
    let queue = RedisQueue::<EmailJob>::new(
        "redis://127.0.0.1:6379",
        "email_queue"
    ).expect("Failed to create queue");

    // Configure the worker
    let config = WorkerConfig {
        retry_attempts: 3,
        retry_delay: Duration::from_secs(5),
        shutdown_timeout: Duration::from_secs(30),
    };

    // Create and start the worker
    let worker = Worker::new(queue.clone(), config);
    
    // Push a job
    let job = EmailJob {
        id: "email-1".to_string(),
        to: "user@example.com".to_string(),
        subject: "Hello".to_string(),
        body: "World".to_string(),
    };
    
    queue.push(job).await.expect("Failed to push job");

    // Start processing jobs
    worker.start().await.expect("Worker failed");
}

Development Setup

  1. Clone the repository:
git clone https://github.com/yourusername/queue_workers.git
cd queue_workers
  1. Install development dependencies:
rustup component add rustfmt
rustup component add clippy
  1. Set up git hooks:
chmod +x scripts/setup-git-hooks.sh
./scripts/setup-git-hooks.sh
  1. Start Redis using Docker Compose:
docker-compose up -d redis
  1. Run the tests:
cargo test

Code Quality

This project enforces code quality through:

  • Formatting using rustfmt
  • Linting using clippy

To manually run the checks:

# Check formatting
cargo fmt -- --check

# Run clippy
cargo clippy -- -D warnings

These checks run automatically:

  • As a pre-commit hook when committing changes
  • In CI/CD pipeline for all pull requests

Configuration

Worker Configuration

The WorkerConfig struct allows you to customize worker behavior:

let config = WorkerConfig {
    retry_attempts: 3,        // Number of retry attempts for failed jobs
    retry_delay: Duration::from_secs(5),  // Delay between retries
    shutdown_timeout: Duration::from_secs(30),  // Graceful shutdown timeout
};

Redis Configuration

The Redis queue can be configured with a Redis URL and queue name:

let queue = RedisQueue::<MyJob>::new(
    "redis://username:password@hostname:6379/0",  // Redis URL with authentication
    "my_queue_name"
).expect("Failed to create queue");

Queue Types

The queue supports both FIFO (First In, First Out) and LIFO (Last In, First Out) behaviors:

use queue_workers::{redis_queue::RedisQueue, queue::QueueType};

// Create a FIFO queue (default behavior)
let fifo_queue = RedisQueue::<MyJob>::new(redis_url, "fifo_queue")?;

// Create a LIFO queue
let lifo_queue = RedisQueue::<MyJob>::with_type(
    redis_url,
    "lifo_queue",
    QueueType::LIFO
)?;
  • FIFO: Jobs are processed in the order they were added (oldest first)
  • LIFO: Jobs are processed in reverse order (newest first)

Running Multiple Workers

You can run multiple workers processing the same queue:

let queue = RedisQueue::<EmailJob>::new(redis_url, "email_queue")?;

// Spawn multiple workers
for _ in 0..3 {
    let worker_queue = queue.clone();
    let worker = Worker::new(worker_queue, WorkerConfig::default());
    
    tokio::spawn(async move {
        worker.start().await.expect("Worker failed");
    });
}

Error Handling

The library provides a custom error type QueueWorkerError that covers various failure scenarios:

  • Redis connection issues
  • Serialization errors
  • Job not found
  • Worker errors
  • Connection timeouts

Worker Types

The library provides two types of workers:

Sequential Worker

Processes jobs one at a time, with retry support:

use queue_workers::{
    redis_queue::RedisQueue,
    worker::{Worker, WorkerConfig}
};

let config = WorkerConfig {
    retry_attempts: 3,
    retry_delay: Duration::from_secs(5),
    shutdown_timeout: Duration::from_secs(30),
};

let worker = Worker::new(queue.clone(), config);
worker.start().await?;

Concurrent Worker

Processes multiple jobs in parallel:

use queue_workers::{
    redis_queue::RedisQueue,
    concurrent_worker::{ConcurrentWorker, ConcurrentWorkerConfig}
};

let config = ConcurrentWorkerConfig {
    max_concurrent_jobs: 5,  // Process 5 jobs simultaneously
    retry_attempts: 3,
    retry_delay: Duration::from_secs(5),
    shutdown_timeout: Duration::from_secs(30),
};

let worker = ConcurrentWorker::new(queue.clone(), config);
worker.start().await?;

// Or with shutdown support:
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
let worker = ConcurrentWorker::new(queue.clone(), config);
worker.start_with_shutdown(shutdown_rx).await?;

Logging

This library uses the log crate as a logging facade, allowing you to choose your preferred logging implementation. For basic usage:

// Using env_logger
env_logger::init();

// Or using tracing
tracing_subscriber::fmt()
    .with_env_filter(EnvFilter::from_default_env())
    .init();

For detailed logging configuration, including production setups and testing configurations, see LOGGING.md.

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Dependencies

~10–19MB
~251K SLoC