#kafka #thread-pool #plain-text #mtls #ssl #env-var #publishing

kafka-threadpool

An async rust threadpool for publishing messages to kafka using SSL (mTLS) or PLAINTEXT protocols

13 stable releases

1.0.12 Sep 22, 2022
1.0.1 Sep 21, 2022

#334 in Asynchronous

Download history 24/week @ 2024-09-22 10/week @ 2024-09-29

62 downloads per month
Used in restapi

MIT license

67KB
929 lines

Kafka Threadpool for Rust with mTLS Support

An async rust threadpool for publishing messages to kafka using SSL (mTLS) or PLAINTEXT protocols.

Architecture

This is a work in progress. The architecture will likely change over time. For now here's the latest reference architecture:

kafka-threadpool Reference Architecture

Background

Please refer to the blog post for more information on this repo.

Configuration

Supported Environment Variables

Environment Variable Name Purpose / Value
KAFKA_ENABLED toggle the kafka_threadpool on with: true or 1 anything else disables the threadpool
KAFKA_LOG_LABEL tracking label that shows up in all crate logs
KAFKA_BROKERS comma-delimited list of brokers (host1:port,host2:port,host3:port)
KAFKA_TOPICS comma-delimited list of supported topics
KAFKA_PUBLISH_RETRY_INTERVAL_SEC number of seconds to sleep before each publish retry
KAFKA_PUBLISH_IDLE_INTERVAL_SEC number of seconds to sleep if there are no message to process
KAFKA_NUM_THREADS number of threads for the threadpool
KAFKA_TLS_CLIENT_KEY optional - path to the kafka mTLS key
KAFKA_TLS_CLIENT_CERT optional - path to the kafka mTLS certificate
KAFKA_TLS_CLIENT_CA optional - path to the kafka mTLS certificate authority (CA)
KAFKA_METADATA_COUNT_MSG_OFFSETS optional - set to anything but true to bypass counting the offsets

Getting Started

Please ensure your kafka cluster is running before starting. If you need help running a kafka cluster please refer to the rust-with-strimzi-kafka-tls repo for more details.

Set up the Environment Variables

You can create an ./env/kafka.env file storing the environment variables to make your producer and consumer consistent (and ready for podman/docker or kubernetes):

export KAFKA_ENABLED=1
export KAFKA_LOG_LABEL="ktp"
export KAFKA_BROKERS="host1:port,host2:port,host3:port"
export KAFKA_TOPICS="testing"
export KAFKA_PUBLISH_RETRY_INTERVAL_SEC="1.0"
export KAFKA_NUM_THREADS="5"
export KAFKA_TLS_CLIENT_CA="PATH_TO_TLS_CA_FILE"
export KAFKA_TLS_CLIENT_CERT="PATH_TO_TLS_CERT_FILE"
export KAFKA_TLS_CLIENT_KEY="PATH_TO_TLS_KEY_FILE"
export KAFKA_METADATA_COUNT_MSG_OFFSETS="true"

Load the Environment

source ./env/kafka.env

Start the Kafka Threadpool and Publish 100 Messages

The included ./examples/start-threadpool.rs example will connect to the kafka cluster based off the environment configuration and publish 100 messages into the kafka testing topic.

cargo build --example start-threadpool
export RUST_BACKTRACE=1
export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
./target/debug/examples/start-threadpool

Consume Messages

To consume the newly-published test messages from the testing topic, you can use your own consumer or the rust-with-strimzi-kafka-and-tls/examples/run-consumer.rs example:

# from the rust-with-strimzi-kafka-and-tls directory:
cargo build --example run-consumer
export RUST_BACKTRACE=1
export RUST_LOG=info,rdkafka=info
./target/debug/examples/run-consumer -g rust-consumer-testing -t testing

Get Kafka Cluster Metadata for All Topics, Partitions, ISR, and Offsets

Run the ./examples/get-all-metadata.rs example:

cargo build --example get-all-metadata
export RUST_BACKTRACE=1
export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
./target/debug/examples/get-all-metadata

Get Kafka Cluster Metadata for a Single Topic including Partitions, ISR and Offsets

  1. Set the Topic Name as an Environment Variable

    export KAFKA_TOPIC=testing
    
  2. Run the ./examples/get-metadata-for-topic.rs example:

    cargo build --example get-metadata-for-topic
    export RUST_BACKTRACE=1
    export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
    ./target/debug/examples/get-metadata-for-topic
    

Dependencies

~16–30MB
~422K SLoC