1 unstable release
0.1.0 | Sep 22, 2024 |
---|
#430 in Data structures
26 downloads per month
48KB
911 lines
crdt-lite
1. Rust implementation of a generic CRDT.
This implementation includes:
- Logical Clock: To manage causality.
- CRDT Structure: Generic over key (
K
) and value (V
) types. - Record Management: Handling inserts, updates, deletes with tombstones.
- Conflict Resolution: Based on column versions, site IDs, and sequence numbers.
- Merging Mechanism: To synchronize state across nodes.
- Comprehensive Tests: Using Rust's
#[test]
framework to ensure correctness.
2. Detailed Explanation
2.1. LogicalClock
The LogicalClock
maintains a monotonically increasing counter to preserve causality across events.
#[derive(Debug, Clone)]
struct LogicalClock {
time: u64,
}
impl LogicalClock {
fn new() -> Self { /* ... */ }
fn tick(&mut self) -> u64 { /* ... */ }
fn update(&mut self, received_time: u64) -> u64 { /* ... */ }
fn current_time(&self) -> u64 { /* ... */ }
}
- tick(): Increments the clock for each local event.
- update(): Updates the clock based on incoming changes to maintain causality.
- current_time(): Retrieves the current clock time.
2.2. ColumnVersion
ColumnVersion
tracks the versioning information for each column within a record, enabling fine-grained conflict resolution.
#[derive(Debug, Clone, PartialEq, Eq)]
struct ColumnVersion {
col_version: u64,
db_version: u64,
site_id: u64,
seq: u64,
}
impl ColumnVersion {
fn new(col_version: u64, db_version: u64, site_id: u64, seq: u64) -> Self { /* ... */ }
}
- col_version: Number of times the column has been updated.
- db_version: Logical clock time when the column was last updated.
- site_id: Identifier of the node where the update originated.
- seq: Sequence number for ordering updates from the same node.
2.3. Record
Record
represents an individual record in the CRDT, containing field values and their corresponding version information.
#[derive(Debug, Clone)]
struct Record<V> {
fields: HashMap<String, V>,
column_versions: HashMap<String, ColumnVersion>,
}
impl<V> Record<V> {
fn new(fields: HashMap<String, V>, column_versions: HashMap<String, ColumnVersion>) -> Self { /* ... */ }
}
2.4. CRDT
The CRDT
struct manages the overall state, including data records, tombstones for deletions, and methods to manipulate and merge data.
#[derive(Debug, Clone)]
struct CRDT<K, V>
where
K: Eq + Hash + Clone,
{
node_id: u64,
clock: LogicalClock,
data: HashMap<K, Record<V>>,
tombstones: HashSet<K>,
}
- node_id: Unique identifier for the node (site).
- clock: Instance of
LogicalClock
. - data: Stores records, keyed by their unique identifier.
- tombstones: Tracks deleted records to prevent resurrection.
2.5. Methods
2.5.1. insert
Inserts a new record if it's not already tombstoned.
fn insert(&mut self, record_id: K, fields: HashMap<String, V>) { /* ... */ }
- Checks: Prevents re-insertion of tombstoned records.
- Initialization: Sets
col_version
to 1 for all columns.
2.5.2. update
Updates existing record fields if the record isn't tombstoned.
fn update(&mut self, record_id: &K, updates: HashMap<String, V>) { /* ... */ }
- Checks: Ignores updates on tombstoned or non-existent records.
- Versioning: Increments
col_version
andseq
for updated columns.
2.5.3. delete
Deletes a record by marking it as tombstoned.
fn delete(&mut self, record_id: &K) { /* ... */ }
- Checks: Prevents duplicate deletions.
- Tombstone: Inserts into
tombstones
and removes fromdata
.
2.5.4. get_changes_since
Retrieves all changes since a specified last_db_version
.
fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, V>> { /* ... */ }
- Returns: A vector of
Change
structs representing the modifications.
2.5.5. merge_changes
Merges incoming changes into the CRDT, resolving conflicts based on versioning and site IDs.
fn merge_changes(&mut self, changes: &[Change<K, V>]) { /* ... */ }
- Conflict Resolution:
- Higher
col_version
: Accepts the change. - Equal
col_version
:- Deletion Prioritized: Deletions take precedence over insertions/updates.
- Site ID & Seq: If both changes are deletions, use
site_id
andseq
as tie-breakers.
- Higher
2.5.6. print_data
Prints the current state for debugging.
fn print_data(&self) { /* ... */ }
2.6. Change Struct
Represents a single change in the CRDT.
#[derive(Debug, Clone)]
struct Change<K, V> {
record_id: K,
col_name: String,
value: Option<V>,
col_version: u64,
db_version: u64,
site_id: u64,
seq: u64,
}
- record_id: Identifier of the record.
- col_name: Name of the column.
- value: New value for the column (None for deletions).
- col_version: Version of the column.
- db_version: Logical clock time of the change.
- site_id: Originating node's ID.
- seq: Sequence number for ordering.
2.7. Tests
Comprehensive tests ensure the correctness of the CRDT implementation. They cover scenarios like:
- Basic insertions and merges.
- Concurrent updates with conflict resolution.
- Deletions and tombstone handling.
- Preventing re-insertion after deletion.
- Logical clock updates.
Running Tests
-
Ensure Rust is Installed: If not, install it from rustup.rs.
-
Save the Code: Save the above code in a file named
crdt.rs
. -
Create a New Cargo Project:
cargo new crdt_project cd crdt_project
-
Replace
src/lib.rs
withcrdt.rs
: Alternatively, placecrdt.rs
insrc/
and adjust accordingly. -
Add Dependencies: Add
uuid
toCargo.toml
for generating unique identifiers.[dependencies] uuid = { version = "1.2", features = ["v4"] }
-
Run Tests:
cargo test
All tests should pass, indicating that the CRDT behaves as expected.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Dependencies
~0.7–1.2MB
~25K SLoC