2 unstable releases
0.4.1 | Mar 18, 2025 |
---|---|
0.1.0 | Nov 6, 2024 |
#179 in Profiling
117 downloads per month
240KB
5K
SLoC
Low-latency, high-throughput bounded queues ("data flows")
for (a)synchronous and event-driven systems, inspired by the
LMAX Disruptor
and built for codas
.
What's Here
This crate provides the flow
data structure: A
"ring" buffer
which concurrent, (a)synchronous tasks can publish data to
and receive data from.
flow
s work kind of like the broadcast
channels in
Tokio,
with some key differences:
-
Zero-Copy Multicast Reads: Every data published to a flow is immediately available to each subscriber, in parallel, with no copies or cloning.
-
Lock-Free by default: No locks or mutexes are used when publishing or receiving data from the
flow
on supported targets. -
Broad Compatibility:
no-std
by default.async
and synchronous APIs.async
functionality doesn't depend on a specific runtime or framework (not evenfutures
!).
flow
s work wherever channels or queues would work, but they're built
specifically for systems that need the same data processed concurrently
(or in parallel) by multiple tasks.
Examples
Flows are created with Flow::new
, which returns a
tuple of (flow, subscribers)
:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and one subscriber.
let (mut flow, [mut sub]) = Flow::<String>::new(32);
// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());
// Receive the next published data sequence from the flow.
let seq = sub.try_next().unwrap();
assert_eq!("Hello!", *seq);
Data is published into a flow
via Flow::try_next
(or await Flow::next
), which returns an UnpublishedData
reference. Once this reference is published (via UnpublishedData::publish
),
or dropped, it becomes receivable by every subscriber.
Data is received from a flow
via FlowSubscriber::try_next
(or await FlowSubscriber::next
), which returns a PublishedData
reference.
Subscribers
Using
slice patterns,
any number of subscribers can be returned by Flow::new
:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and 2 subscribers.
let (mut flow, [mut sub_a, mut sub_b]) = Flow::<String>::new(32);
New subscribers cannot be added to an active flow. To overcome this challenge, any subscriber can be wrapped in a Stage.
Stages
A stage is a dynamic group of data processors that share a single subscriber:
# use core::sync::atomic::Ordering;
# use portable_atomic::AtomicU64;
# use portable_atomic_util::Arc;
use codas_flow::{*, stage::*};
// Create a flow.
let (mut flow, [mut sub]) = Flow::<String>::new(32);
// Wrap the subscriber in a processing stage.
let mut stage = Stage::from(sub);
// Add a data processor to the stage; an indefinite
// number of processors can be added to a stage, even
// while the flow is active.
let calls = Arc::new(AtomicU64::new(0));
let closure_calls = calls.clone();
stage.add_proc(move |proc: &mut Proc, data: &String| {
assert_eq!("Hello!", *data);
closure_calls.add(1, Ordering::SeqCst);
});
// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());
// Run processors for a set of data in the flow.
stage.proc();
assert_eq!(1, calls.load(Ordering::SeqCst));
Stages only receive data from the flow when one of the
Stage::proc*
functions is invoked; refer to the Stage
docs for more information.
Lock-Free Targets
This crate uses AtomicU64
to coordinate flow
access
without locks. This type is lock-free where possible, but may use locks on some platforms or compile targets.
This section contains a list of the primary targets supported by this crate, along with their support for lock-free behavior.
Target | Lock-Free? |
---|---|
aarch64-unknown-linux-gnu (64-Bit Linux ARM) |
Yes |
aarch64-apple-darwin (64-Bit MacOS ARM) |
Yes |
x86_64-unknown-linux-gnu (64-Bit Linux) |
Yes |
x86_64-apple-darwin (64-Bit MacOS) |
Yes |
x86_64-pc-windows-gnu (64-Bit Windows) |
Yes |
wasm32-unknown-unknown (WASM) |
Yes1 |
armv7-unknown-linux-gnueabihf (ARM Cortex A7 and A8) |
No2 |
riscv32i-unknown-none-elf (ESP 32) |
No |
1 WASM targets don't technically support atomic instructions. However, because WASM code is executed in a single-thread, regular variables are simply substituted for their atomic counterparts, enabling full lock-free support.
2 Confirmation required; a safe assumption is that 32-bit targets don't support atomic operations on 64-bit values.
Relative Performance ("Benchmarks")
Operation | codas (flow) |
codas (stage) |
codas (stage w/ tokio-yield) |
tokio (mpsc) |
tokio (broadcast) |
---|---|---|---|---|---|
Move -> Read |
55ns (18M/s) |
26ns (38M/s) |
19ns (53M/s) |
- | - |
Move -> Take |
- | - | - | 70ns (14M/s) |
- |
Move -> Clone |
- | - | - | - | 33ns (30M/s) |
Comparitive performance of different scenarios we've written benchmarks for. Exact numbers will vary between platforms.
License
Copyright 2025 Alicorn Systems, LLC.
Licensed under the GNU Affero General Public License version 3, as published by the Free Software Foundation. Refer to the license file for more information.
If you have any questions, please reach out to [hello@alicorn.systems
].
Dependencies
~5–6.5MB
~145K SLoC