1 unstable release
0.1.0 | Nov 6, 2024 |
---|
#771 in Data structures
24KB
343 lines
Low-latency, high-throughput bounded queues ("data flows")
for (a)synchronous and event-driven systems, inspired by the
LMAX Disruptor
and built for codas
.
What's Here
This crate provides the flow
data structure: A
"ring" buffer
which concurrent, (a)synchronous tasks can publish data to
and receive data from.
flow
s work kind of like the broadcast
channels in
Tokio,
with some key differences:
-
Zero-Copy Multicast Reads: Every data published to a flow is immediately available to each subscriber, in parallel, with no copies or cloning.
-
Lock-Free by default: No locks or mutexes are used when publishing or receiving data from the
flow
.If the
dynamic-sub
feature is enabled, a single lock is used when publishing data to coordinate access to the set of subscribers. This lock may be replaced in a future update by a list supporting atomic updates. -
Broad Compatibility:
no-std
by default.async
and synchronous versions of all APIs.async
functionality doesn't depend on a specific runtime or framework (not evenfutures
!).
flow
s work wherever channels or queues would work, but they're built
specifically for systems that need the same data processed concurrently
(or in parallel) by multiple tasks.
Examples
Flows are created with Flow::new
, which returns a
tuple of (flow, subscribers)
:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and one subscriber.
let (mut flow, [mut sub]) = Flow::<String>::new(32).unwrap();
// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());
// Receive the next published data sequence from the flow.
let seq = sub.try_next().unwrap();
assert_eq!("Hello!", *seq);
Data is published into a flow
via Flow::try_next
(or await Flow::next
), which returns an UnpublishedData
reference. Once this reference is published (via UnpublishedData::publish
),
or dropped, it becomes receivable by every subscriber.
Data is received from a flow
via FlowSubscriber::try_next
(or await FlowSubscriber::next
), which returns a PublishedData
reference.
Adding Subscribers
Using
slice patterns,
any number of subscribers
can be returned by Flow::new
:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and 2 subscribers.
let (mut flow, [mut sub_a, mut sub_b]) = Flow::<String>::new(32).unwrap();
Adding Subscribers Dynamically
When the dynamic-sub
feature is enabled, new subscribers
can be attached to a flow
even after it's created:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and no subscribers.
let (mut flow, []) = Flow::<String>::new(32).unwrap();
// Attach a new subscriber.
let mut sub = flow.subscribe();
Optional Features
This crate offers the following optional features:
dynamic-sub
: Enables APIs for adding new subscribers to existing Flows. This feature introduces spin-locks to some critical sections in order to safely add subscribers to active flows.
None of these features are enabled by default.
License
Copyright 2024 Alicorn Systems, Inc.
Licensed under the GNU Affero General Public License version 3, as published by the Free Software Foundation. Refer to the license file for more information.
If you have any questions, please reach out to [hello@alicorn.systems
].
Dependencies
~0.3–0.8MB
~18K SLoC