#flow #data-flow #subscriber #queue #flows #synchronous #bounded

codas-flow

Low-latency, high-throughput bounded queues ("data flows") for (a)synchronous and event-driven systems

1 unstable release

0.1.0 Nov 6, 2024

#771 in Data structures

AGPL-3.0-only

24KB
343 lines

Low-latency, high-throughput bounded queues ("data flows") for (a)synchronous and event-driven systems, inspired by the LMAX Disruptor and built for codas.

What's Here

This crate provides the flow data structure: A "ring" buffer which concurrent, (a)synchronous tasks can publish data to and receive data from.

flows work kind of like the broadcast channels in Tokio, with some key differences:

  1. Zero-Copy Multicast Reads: Every data published to a flow is immediately available to each subscriber, in parallel, with no copies or cloning.

  2. Lock-Free by default: No locks or mutexes are used when publishing or receiving data from the flow.

    If the dynamic-sub feature is enabled, a single lock is used when publishing data to coordinate access to the set of subscribers. This lock may be replaced in a future update by a list supporting atomic updates.

  3. Broad Compatibility:

  • no-std by default.
  • async and synchronous versions of all APIs.
  • async functionality doesn't depend on a specific runtime or framework (not even futures!).

flows work wherever channels or queues would work, but they're built specifically for systems that need the same data processed concurrently (or in parallel) by multiple tasks.

Examples

Flows are created with Flow::new, which returns a tuple of (flow, subscribers):

use codas_flow::*;

// Create a flow with a capacity of 32 strings,
// and one subscriber.
let (mut flow, [mut sub]) = Flow::<String>::new(32).unwrap();

// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());

// Receive the next published data sequence from the flow.
let seq = sub.try_next().unwrap();
assert_eq!("Hello!", *seq);

Data is published into a flow via Flow::try_next (or await Flow::next), which returns an UnpublishedData reference. Once this reference is published (via UnpublishedData::publish), or dropped, it becomes receivable by every subscriber.

Data is received from a flow via FlowSubscriber::try_next (or await FlowSubscriber::next), which returns a PublishedData reference.

Adding Subscribers

Using slice patterns, any number of subscribers can be returned by Flow::new:

use codas_flow::*;

// Create a flow with a capacity of 32 strings,
// and 2 subscribers.
let (mut flow, [mut sub_a, mut sub_b]) = Flow::<String>::new(32).unwrap();

Adding Subscribers Dynamically

When the dynamic-sub feature is enabled, new subscribers can be attached to a flow even after it's created:

use codas_flow::*;

// Create a flow with a capacity of 32 strings,
// and no subscribers.
let (mut flow, []) = Flow::<String>::new(32).unwrap();

// Attach a new subscriber.
let mut sub = flow.subscribe();

Optional Features

This crate offers the following optional features:

  • dynamic-sub: Enables APIs for adding new subscribers to existing Flows. This feature introduces spin-locks to some critical sections in order to safely add subscribers to active flows.

None of these features are enabled by default.

License

Copyright 2024 Alicorn Systems, Inc.

Licensed under the GNU Affero General Public License version 3, as published by the Free Software Foundation. Refer to the license file for more information.

If you have any questions, please reach out to [hello@alicorn.systems].

Dependencies

~0.3–0.8MB
~18K SLoC