#kafka #rdkafka #programming #rust #zirv

zirv-kafka

A convinient wrapper for rdkafka

4 releases

Uses new Rust 2024

new 0.2.1 Apr 13, 2025
0.2.0 Apr 12, 2025
0.1.1 Apr 12, 2025
0.1.0 Apr 11, 2025

#915 in Development tools

Download history 166/week @ 2025-04-06

171 downloads per month

MIT/Apache

17KB
144 lines

zirv-kafka

A convenient wrapper around the rdkafka crate that simplifies working with Apache Kafka in Rust applications.

crates.io Documentation CI Pipeline License

Features

  • Easy Kafka producer initialization and management through global instance
  • Simplified message production with convenient macros
  • Streamlined consumer setup and message handling
  • Background thread management for continuous message polling

Installation

Add zirv-kafka to your Cargo.toml:

[dependencies]
zirv-kafka = "0.2.0"

Producer Usage

Initialize the producer

Initialize the Kafka producer early in your application:

use zirv_kafka::init_kafka_producer;

#[tokio::main]
async fn main() {
    init_kafka_producer!().await;
    
    // Your application logic...
}

Send messages

Send messages to Kafka topics using the produce_message! macro:

use zirv_kafka::produce_message;

async fn notify_user_updated(user_id: &str, data: &str) {
    produce_message!("user-updated", user_id, data).await;
}

For more direct control, you can access the producer directly:

use zirv_kafka::get_kafka_producer;
use rdkafka::producer::FutureRecord;

async fn send_custom_message() {
    let producer = get_kafka_producer!();
    let record = FutureRecord::to("my-topic")
        .payload("message content")
        .key("message-key");
    
    let result = producer.send(record, std::time::Duration::from_secs(1)).await;
    // Handle the result...
}

Consumer Usage

Create a consumer and start processing messages

Use the start_base_consumer! macro to initialize a consumer and process messages:

use zirv_kafka::start_base_consumer;

fn main() {
    // Start a consumer that processes messages from the "user-events" topic
    let handle = start_base_consumer!("user-service", &["user-events"], |msg_result| {
        match msg_result {
            Ok(msg) => {
                if let Some(payload) = msg.payload_view::<str>() {
                    if let Ok(content) = payload {
                        println!("Received message: {}", content);
                        // Process the message...
                    }
                }
            },
            Err(e) => eprintln!("Error while consuming message: {:?}", e),
        }
    });
    
    // The consumer runs in the background
    
    // When you're done with the consumer (optional)
    // handle.join().unwrap();
}

For more control over the consumer, you can use the lower-level functions:

use rdkafka::message::BorrowedMessage;
use std::sync::Arc;
use zirv_kafka::consumer::{init_base_consumer, start_consumer_thread};

fn main() {
    let consumer = init_base_consumer("my-app", &["events-topic"])
        .expect("Failed to create consumer");
    
    let arc_consumer = Arc::new(consumer);
    
    let handle = start_consumer_thread(arc_consumer, |msg| {
        // Custom message handling logic
    });
    
    // Later, when shutting down:
    // handle.join().unwrap();
}

Configuration

By default, the library uses the following configuration:

  • KAFKA_BROKERS environment variable with a default of "localhost:9092" for consumers
  • For producers, it uses zirv-config to read:
    • kafka.bootstrap_servers (default: "localhost:9092")
    • kafka.message_timeout_ms (default: 5000)

Requirements

  • Rust (2021 edition or later)
  • librdkafka development libraries if using dynamic linking

License

This project is licensed under either:

Dependencies

~14–20MB
~234K SLoC