7 releases (4 stable)
1.2.0 | Jul 27, 2021 |
---|---|
1.1.0 | Jun 23, 2020 |
0.1.2 | Sep 11, 2019 |
0.1.1 | Aug 27, 2019 |
0.1.0 | Jul 15, 2019 |
#474 in Algorithms
84 downloads per month
Used in 3 crates
79KB
1.5K
SLoC
Rust library of batching algorithm implementations.
Batching works by accumulating items and later automatically flushing them all together when the batch has reached a limit. All items collected in the single batch are available at once for further processing (e.g. batch insert into a database).
These implementations will construct batches based on:
- limit of the number of items collected in a batch,
- limit of time duration since the first item appended to the batch,
- calling one of the batch consuming methods,
- sending flush command between batch items (channel-based implementations).
See documentation of available algorithms.
Example
Collect batches of items from two streams by reaching different individual batch limits and using Flush
command.
use multistream_batch::channel::multi_buf_batch::MultiBufBatchChannel;
use multistream_batch::channel::multi_buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;
// Create producer thread with a channel-based, multi-stream batching implementation configured with a maximum size
// of 4 items (for each stream) and a maximum batch duration since the first received item of 200 ms.
let mut batch = MultiBufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
// Send a sequence of `Append` commands with integer stream key and item value
sender.send(Append(1, 1)).unwrap();
sender.send(Append(0, 1)).unwrap();
sender.send(Append(1, 2)).unwrap();
sender.send(Append(0, 2)).unwrap();
sender.send(Append(1, 3)).unwrap();
sender.send(Append(0, 3)).unwrap();
sender.send(Append(1, 4)).unwrap();
// At this point batch with stream key `1` should have reached its capacity of 4 items
sender.send(Append(0, 4)).unwrap();
// At this point batch with stream key `0` should have reached its capacity of 4 items
// Send some more to buffer up for next batch
sender.send(Append(0, 5)).unwrap();
sender.send(Append(1, 5)).unwrap();
sender.send(Append(1, 6)).unwrap();
sender.send(Append(0, 6)).unwrap();
// Introduce delay to trigger maximum duration timeout
std::thread::sleep(Duration::from_millis(400));
// Send items that will be flushed by `Flush` command
sender.send(Append(0, 7)).unwrap();
sender.send(Append(1, 7)).unwrap();
sender.send(Append(1, 8)).unwrap();
sender.send(Append(0, 8)).unwrap();
// Flush outstanding items for batch with stream key `1` and `0`
sender.send(Flush(1)).unwrap();
sender.send(Flush(0)).unwrap();
// Last buffered up items will be flushed automatically when this thread exits
sender.send(Append(0, 9)).unwrap();
sender.send(Append(1, 9)).unwrap();
sender.send(Append(1, 10)).unwrap();
sender.send(Append(0, 10)).unwrap();
// Exiting closure will shutdown the producer thread
});
// Batches flushed due to individual batch size limit
assert_matches!(batch.next(), Ok((1, drain)) =>
assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);
assert_matches!(batch.next(), Ok((0, drain)) =>
assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);
// Batches flushed due to duration limit
assert_matches!(batch.next(), Ok((0, drain)) =>
assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);
assert_matches!(batch.next(), Ok((1, drain)) =>
assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);
// Batches flushed by sending `Flush` command starting from batch with stream key `1`
assert_matches!(batch.next(), Ok((1, drain)) =>
assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);
assert_matches!(batch.next(), Ok((0, drain)) =>
assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);
// Batches flushed by dropping sender (thread exit)
assert_matches!(batch.next(), Ok((0, drain)) =>
assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);
assert_matches!(batch.next(), Ok((1, drain)) =>
assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);
Dependencies
~395KB