#subscriber #stage #benchmark

codas-flow

Low-latency, high-throughput bounded queues ("data flows") for (a)synchronous and event-driven systems

2 unstable releases

0.4.1 Mar 18, 2025
0.1.0 Nov 6, 2024

#179 in Profiling

Download history 70/week @ 2025-03-12 47/week @ 2025-03-19

117 downloads per month

AGPL-3.0-only

240KB
5K SLoC

Low-latency, high-throughput bounded queues ("data flows") for (a)synchronous and event-driven systems, inspired by the LMAX Disruptor and built for codas.

What's Here

This crate provides the flow data structure: A "ring" buffer which concurrent, (a)synchronous tasks can publish data to and receive data from.

flows work kind of like the broadcast channels in Tokio, with some key differences:

  1. Zero-Copy Multicast Reads: Every data published to a flow is immediately available to each subscriber, in parallel, with no copies or cloning.

  2. Lock-Free by default: No locks or mutexes are used when publishing or receiving data from the flow on supported targets.

  3. Broad Compatibility:

  • no-std by default.
  • async and synchronous APIs.
  • async functionality doesn't depend on a specific runtime or framework (not even futures!).

flows work wherever channels or queues would work, but they're built specifically for systems that need the same data processed concurrently (or in parallel) by multiple tasks.

Examples

Flows are created with Flow::new, which returns a tuple of (flow, subscribers):

use codas_flow::*;

// Create a flow with a capacity of 32 strings,
// and one subscriber.
let (mut flow, [mut sub]) = Flow::<String>::new(32);

// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());

// Receive the next published data sequence from the flow.
let seq = sub.try_next().unwrap();
assert_eq!("Hello!", *seq);

Data is published into a flow via Flow::try_next (or await Flow::next), which returns an UnpublishedData reference. Once this reference is published (via UnpublishedData::publish), or dropped, it becomes receivable by every subscriber.

Data is received from a flow via FlowSubscriber::try_next (or await FlowSubscriber::next), which returns a PublishedData reference.

Subscribers

Using slice patterns, any number of subscribers can be returned by Flow::new:

use codas_flow::*;

// Create a flow with a capacity of 32 strings,
// and 2 subscribers.
let (mut flow, [mut sub_a, mut sub_b]) = Flow::<String>::new(32);

New subscribers cannot be added to an active flow. To overcome this challenge, any subscriber can be wrapped in a Stage.

Stages

A stage is a dynamic group of data processors that share a single subscriber:

# use core::sync::atomic::Ordering;
# use portable_atomic::AtomicU64;
# use portable_atomic_util::Arc;
use codas_flow::{*, stage::*};

// Create a flow.
let (mut flow, [mut sub]) = Flow::<String>::new(32);

// Wrap the subscriber in a processing stage.
let mut stage = Stage::from(sub);

// Add a data processor to the stage; an indefinite 
// number of processors can be added to a stage, even
// while the flow is active.
let calls = Arc::new(AtomicU64::new(0));
let closure_calls = calls.clone();
stage.add_proc(move |proc: &mut Proc, data: &String| {
   assert_eq!("Hello!", *data);
   closure_calls.add(1, Ordering::SeqCst);
});

// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());

// Run processors for a set of data in the flow.
stage.proc();
assert_eq!(1, calls.load(Ordering::SeqCst));

Stages only receive data from the flow when one of the Stage::proc* functions is invoked; refer to the Stage docs for more information.

Lock-Free Targets

This crate uses AtomicU64 to coordinate flow access without locks. This type is lock-free where possible, but may use locks on some platforms or compile targets.

This section contains a list of the primary targets supported by this crate, along with their support for lock-free behavior.

Target Lock-Free?
aarch64-unknown-linux-gnu (64-Bit Linux ARM) Yes
aarch64-apple-darwin (64-Bit MacOS ARM) Yes
x86_64-unknown-linux-gnu (64-Bit Linux) Yes
x86_64-apple-darwin (64-Bit MacOS) Yes
x86_64-pc-windows-gnu (64-Bit Windows) Yes
wasm32-unknown-unknown (WASM) Yes1
armv7-unknown-linux-gnueabihf (ARM Cortex A7 and A8) No2
riscv32i-unknown-none-elf (ESP 32) No

1 WASM targets don't technically support atomic instructions. However, because WASM code is executed in a single-thread, regular variables are simply substituted for their atomic counterparts, enabling full lock-free support.

2 Confirmation required; a safe assumption is that 32-bit targets don't support atomic operations on 64-bit values.

Relative Performance ("Benchmarks")

Operation codas (flow) codas (stage) codas (stage w/ tokio-yield) tokio (mpsc) tokio (broadcast)
Move -> Read 55ns (18M/s) 26ns (38M/s) 19ns (53M/s) - -
Move -> Take - - - 70ns (14M/s) -
Move -> Clone - - - - 33ns (30M/s)

Comparitive performance of different scenarios we've written benchmarks for. Exact numbers will vary between platforms.

License

Copyright 2025 Alicorn Systems, LLC.

Licensed under the GNU Affero General Public License version 3, as published by the Free Software Foundation. Refer to the license file for more information.

If you have any questions, please reach out to [hello@alicorn.systems].

Dependencies

~5–6.5MB
~145K SLoC