4 releases

0.2.1 Nov 15, 2024
0.2.0 Nov 15, 2024
0.1.1 Nov 13, 2024
0.1.0 Nov 13, 2024

#4 in #job-queue

Download history 188/week @ 2024-11-09 112/week @ 2024-11-16

300 downloads per month

AGPL-3.0

54KB
1K SLoC

BunBun-Worker

❗ Disclaimer
This crate is still under development, meaning api's may change on every commit...

Introduction

bunbun-worker was made to provide a panic-safe multithreaded job-runner server & client for microservices and alike. It supports RPC and regular (non-rpc) calls. As of right now only rabbitmq is supported but gRPC will be added too.

Rpc

Remote procedure call, as it's name says is a message that can be sent to a remote microservice to be processed and the result to be returned. In bunbun-worker it's implemented by the following example:

sequenceDiagram
    ServiceA->>+ServiceB: Hey, could you do this job for me?
    Note right of ServiceB: ServiceB does the job
    ServiceB->>+ServiceA: Sure, here is the data result
  1. ServiceA creates a callback queue that the response shall be sent to.
  2. ServiceA sends a json job message to an already declared queue on a rabbitmq server.
  3. ServiceB is listening on that queue for messages and spawns a new thread for each received.
  4. Once ServiceB has finished the work, using the received messages header it responds to the callback queue with the correlation-id.

Non-rpc

In bunbun-worker regular jobs are called non-rpc jobs, indicating that the response is not awaited.

Installation

Get directly from crates.io

[dependencies]
bunbun-worker = { version = "0.2.0" }

or get it directly from source

[dependencies]
bunbun-worker = { git = "https://git.4o1x5.dev/4o1x5/bunbun-worker", branch = "master" }

Usage

This example uses DTO as a way to transfer data between services.
Add futures via cargo add futures

// server.rs
#[derive(Clone, Debug)]
pub struct State {
    pub something: String,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJob {
    send_to: String,
    contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJobResult {
    contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum EmailJobResultError {
    Errored,
}
impl RPCTask for EmailJob {
        type ErroredResult = EmailJobResultError;
        type Result = EmailJobResult;
        type State = State;
        fn run(
            self,
            state: Arc<Self::State>,
        ) -> futures::prelude::future::BoxFuture<'static, Result<Self::Result, Self::ErroredResult>>
        {
            async move {
                tracing::info!("Sent email to {}", self.send_to);
                tokio::time::sleep(Duration::from_secs(2)).await;
                return Ok(EmailJobResult {
                    contents: self.contents.clone(),
                });
            }
            .boxed()
        }
}

#[tokio::main]
async fn main() {
    let mut listener = Worker::new(
        env::var("AMQP_SERVER_URL").unwrap(),
        WorkerConfig::default(),
    )
    .await;
    listener
        .add_rpc_consumer::<EmailJob>(
            Arc::new(State {
                something: "test".into(),
            }),
            ListenerConfig::default("emailjob").set_prefetch_count(100),
        );
    listener.start_all_listeners().await;
}
// client.rs
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJob {
    send_to: String,
    contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub struct EmailJobResult {
    contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum EmailJobResultError {
    Errored,
}

// Implement the client side trait, so the caller knows what the return types are
impl RPCClientTask for EmailJob {
    type ErroredResult = EmailJobResultError;
    type Result = EmailJobResult;
}


#[tokio::main]
async fn main() {
 let client = Client::new(env::var("AMQP_SERVER_URL").unwrap().as_str())
            .await
            .unwrap();
        let result = client
            .rpc_call::<EmailJob>(
                EmailJob {
                    send_to: "someone".into(),
                    contents: "something".into(),
                },
                BasicCallOptions::default("emailjob").timeout(Duration::from_secs(3)),
            )
            .await
            .unwrap();
        println!("{:?}",result);
}

Message versioning

In this crate message versioning is done by including v1.0.0 or such on the end of the queue name, instead of including it in the headers of a message. This reduces the amount of redelivered messages.
The following example will send a job to a queue named emailjob-v1.0.0.

let result = client
        .rpc_call::<EmailJob>(
            EmailJob {
                send_to: "someone".into(),
                contents: "something".into(),
            },
            BasicCallOptions::default("emailjob")
                .timeout(Duration::from_secs(3))
                .message_version("v1.0.0")
        )
        .await
        .unwrap();

Limitations

  1. Currently some unwrap()'s are called inside the code and may results in panics (not in the job-runner).
  2. limited API

Bugs department

Since the code is hosted on a private git instance (as of right now) any bugs shall be discussed in 4o1x5's project room.

License

Licensed under GNU AFFERO GENERAL PUBLIC LICENSE

Contribution

Currently this library does not accept any contributors, as it's hosted on a private git server.

Credits

This package was made with the help of-

Dependencies

~11–23MB
~332K SLoC