46 stable releases
Uses new Rust 2024
new 2.0.0 | Apr 15, 2025 |
---|---|
1.30.0 | Feb 8, 2025 |
1.29.1 | Jan 29, 2025 |
1.29.0 | Dec 19, 2024 |
1.22.1 | Jul 14, 2024 |
#138 in Concurrency
102 downloads per month
Used in 2 crates
240KB
4.5K
SLoC
orx-concurrent-iter
A thread-safe and ergonomic concurrent iterator trait and efficient lock-free implementations.
This crate focuses on enabling ergonomic concurrent programs without sacrificing efficiency.
A. Ergonomics
A ConcurrentIter
can be safely shared among threads using a shared reference; and multiple threads can iterate over it concurrently.
As expected, it will yield each element only once and in order.
A.1. next and while let
Just like a regular iterator, a concurrent iterator has a next
method returning and Option
of the element type.
use orx_concurrent_iter::*;
use core::time::Duration;
let vec = vec!['a', 'b', 'c'];
let con_iter = vec.con_iter(); // concurrent iterator
std::thread::scope(|s| {
s.spawn(|| { // 1st thread
let first = con_iter.next();
assert_eq!(first, Some(&'a'));
// make sure that 2nd thread pulls 'b' and 'c'
std::thread::sleep(Duration::from_secs(2));
assert_eq!(con_iter.next(), None);
});
s.spawn(|| { // 2nd thread
// make sure that 1st thread pulls 'a' beforehand
std::thread::sleep(Duration::from_secs(1));
let second = con_iter.next();
assert_eq!(second, Some(&'b'));
let third = con_iter.next();
assert_eq!(third, Some(&'c'));
assert_eq!(con_iter.next(), None);
});
});
The signature of the next
method allows convenient while let
loops.
The following example demonstrates a straightforward parallel processing implementation, where 100 elements of the input data will be processed in parallel by 4 threads.
Note that the code does not look different than its sequential counterpart, except for the spawn
calls.
use orx_concurrent_iter::*;
let num_threads = 4;
let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
let con_iter = data.con_iter();
let process = |_x: &String| { /* assume actual work */ };
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
while let Some(value) = con_iter.next() {
process(value);
}
});
}
});
A.2. Pullers (within-thread Iterators) and for
Although rust's while let
loops are already very convenient, we cannot use for
loops because a concurrent iterator is not a regular Iterator
.
However, we can create one or more item pullers from a concurrent iterator within multiple threads, and the ItemPuller
implements Iterator
.
Although we use a puller similar to a regular iterator; it is created from, connected to and pulls elements from a ConcurrentIter
.
This trick first enables for
loops.
use orx_concurrent_iter::*;
let num_threads = 4;
let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
let con_iter = data.con_iter();
let process = |_x: &String| { /* assume actual work */ };
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
for value in con_iter.item_puller() {
process(value);
}
});
}
});
The actual benefit, however, is to enable the convenient and composable iterator methods in concurrent programs.
Consider the following parallel_reduce operation, which is a simple yet efficient parallelization of the reduce
method.
Notice that:
- The entire implementation is a chain of Iterator methods fitting in four lines.
- We compute the reduction in two folds:
- We map each thread to the reduction of the elements that it pulls.
- We ignore (filter_map) the results that are None; this can be observed when a thread cannot find any elements to pull.
- Then, we reduce the reduced results returned by each thread to get the final result.
use orx_concurrent_iter::*;
fn parallel_reduce<T, F>(
num_threads: usize,
con_iter: impl ConcurrentIter<Item = T>,
reduce: F,
) -> Option<T>
where
T: Send + Sync,
F: Fn(T, T) -> T + Send + Sync,
{
std::thread::scope(|s| {
(0..num_threads)
.map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
.filter_map(|x| x.join().unwrap()) // join threads, ignore None's
.reduce(&reduce) // reduce thread results to final result
})
}
// test
let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
assert_eq!(sum, None);
let sum = parallel_reduce(8, (0..3).into_con_iter(), |a, b| a + b);
assert_eq!(sum, Some(3));
let n = 10_000;
let data: Vec<_> = (0..n).collect();
let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
assert_eq!(sum, Some(n * (n - 1) / 2));
B. Efficiency
The concurrent iterator trait definition and lock-free implementations are designed to achieve high performance concurrent programs.
B.1. Iteration in Chunks
A major difference of a concurrent iterator from a sequential iterator is the concurrent state to be maintained by atomic variables. Every time elements are pulled from the concurrent iterator, this state is updated. These updates can be considered as the overhead of the concurrency.
Whenever the task to be performed over each element of the iterator is large enough, the overhead will be negligible. In other cases, it is important to reduce the number of state updates in order to improve performance.
A straightforward way to achieve this is by pulling in chunks rather than one element at a time. This can be done by a ChunkPuller
.
Note that puling in chunks does not mean cloning or moving the elements; it only refers to reserving multiple elements at once for a thread that calls the pull.
Consider the parallel processing example above, in which 4 threads would have pulled 100 elements in a total of 100 next
calls. This means that the concurrent state would have been updated 100 times. The following version, on the other hand, will reach the con_iter
only 10 times and at each access the pulling thread will reserve 10 consecutive elements.
use orx_concurrent_iter::*;
let num_threads = 4;
let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
let con_iter = data.con_iter();
let process = |_x: &String| { /* assume actual work */ };
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
let mut puller = con_iter.chunk_puller(10);
while let Some(chunk) = puller.pull() {
for value in chunk {
process(value);
}
}
});
}
});
This is a simple but a very important optimization technique in performance critical programs where the process
is considerably small.
However, notice that we now need to have nested iterators. This is the explicit version and gives us the ability to operate on the chunk
whenever needed. When we only need to perform on individual elements, we can flatten the chunk puller. Similar to the ItemPuller
, a FlattenedChunkPuller
also implements Iterator, and hence, brings benefits of iterator composability to concurrent programs.
The following parallel_reduce implementation adds the iteration-by-chunks optimization to the previous version; while keeping the implementation simple, still fitting to four lines by flattening the chunk iterator to a regular Iterator.
use orx_concurrent_iter::*;
fn parallel_reduce<T, F>(
num_threads: usize,
chunk: usize,
con_iter: impl ConcurrentIter<Item = T>,
reduce: F,
) -> Option<T>
where
T: Send + Sync,
F: Fn(T, T) -> T + Send + Sync,
{
std::thread::scope(|s| {
(0..num_threads)
.map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
.filter_map(|x| x.join().unwrap()) // join threads, ignore None's
.reduce(&reduce) // reduce thread results to final result
})
}
let n = 10_000;
let data: Vec<_> = (0..n).collect();
let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
assert_eq!(sum, Some(n * (n - 1) / 2));
B.2. Early Exit
We do not always want to iterate over all elements. In certain programs, we would like to abort iteration as soon as we achieve a certain goal. Sequential iterator's find
method is an example use case where we stop iteration as soon as an item satisfies a predicate.
In order to achieve this in a concurrent iteration, we can conveniently use the skip_to_end
method. After any of the threads calls this method, succeeding pull calls will receive None, allowing the calling threads to early-exit.
This is demonstrated in the following parallel_find implementation:
- The thread that pulls "33" will immediately send feedback to the concurrent iterator to terminate.
- After this point, all pull or next calls from the concurrent iterator will return None. This will allow all other threads to immediately return None.
It is possible that a couple of other threads pull "34" and "35, while the winner thread is testing "33" against the predicate. However, once they complete testing these elements, they will not find a new item and exit early.
use orx_concurrent_iter::*;
fn parallel_find<T, F>(
num_threads: usize,
con_iter: impl ConcurrentIter<Item = T>,
predicate: F,
) -> Option<T>
where
T: Send + Sync,
F: Fn(&T) -> bool + Send + Sync,
{
std::thread::scope(|s| {
(0..num_threads)
.map(|_| {
s.spawn(|| {
con_iter
.item_puller()
.find(&predicate)
// once found, immediately jump to end
.inspect(|_| con_iter.skip_to_end())
})
})
.filter_map(|x| x.join().unwrap())
.next()
})
}
let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
assert_eq!(value, Some(&33.to_string()));
C. Any Iterator as a Concurrent Iterator
The IterIntoConcurrentIter
trait allows to convert any generic sequential Iterator into a ConcurrentIter
.
This makes it very convenient to safely share any iterator across threads using a shared reference as demonstrated below.
use orx_concurrent_iter::*;
let num_threads = 4;
let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
// an arbitrary iterator
let iter = data
.into_iter()
.filter(|x| !x.starts_with('3'))
.map(|x| format!("{x}!"));
// converted into a concurrent iterator and shared with multiple threads
let con_iter = iter.iter_into_con_iter();
let process = |_x: String| { /* assume actual work */ };
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
while let Some(value) = con_iter.next() {
assert!(!value.starts_with('3'));
assert!(value.ends_with('!'));
process(value);
}
});
}
});
However, due to the fact that the provider is any iterator type (we lack any further useful information to optimize), pulling of elements from the iterator is synchronized. This means that:
- Whenever possible, it is always more efficient to create the concurrent iterator from the concrete type which does not require the abovementioned synchronization.
- When the
process
is large enough; i.e., when the heavy computation load is on the processing side rather than on pulling of the items, the impact of synchronization diminishes and we benefit from parallelization.
Benchmarks benches/con_iter_of_iter.rs and benches/con_iter_of_iter_short.rs aim to test the performance of concurrent iterators created from generic iterators. We observe that significant performance improvements can be achieved through parallelization by concurrent iterators despite the impact of synchronization.
Process | Input Size | Method | Computation Time (ms) |
---|---|---|---|
short process |
65536 | sequential | 4.65 |
rayon | 15.71 | ||
ConcurrentIter (chunk_size = 1) | 15.72 | ||
ConcurrentIter (chunk_size = 64) | 1.80 | ||
long process |
65536 | sequential | 24.30 |
rayon | 23.08 | ||
ConcurrentIter (chunk_size = 1) | 19.09 | ||
ConcurrentIter (chunk_size = 64) | 7.27 |
Default parallel iterator of rayon created by the par_bridge is used which might use available threads. For the concurrent iterator tests, 8 threads are spawned for each test case.
D. Implementations
The following implementations of concurrent iterators are provided in this crate.
Type | ConcurrentIterable con_iter |
IntoConcurrentIter into_con_iter |
IterIntoConcurrentIter iter_into_con_iter |
---|---|---|---|
I: Iterator<Item = T> |
T |
||
&[T] |
&T |
&T |
|
Vec<T> |
&T |
T |
|
Range<T> |
T |
T |
The following are in progress:
- Implementing concurrent iterators for remaining standard collection types in this crate.
- Implementing concurrent iterators for other collection types in the respective crates.
Relation to orx_parallel
Any collection or generator type that can create a ConcurrentIter
can be efficiently parallelized using orx_parallel
.
Notice that straightforward implementations of several parallel computations are provided as examples throughout the documentation. They demonstrate that ConcurrentIter
establishes the input side of parallel computation. There are two additional required pieces:
- Concurrent collections to write the results to.
- Consider, for instance, the parallelized map operation followed by a collect. Each element of the concurrent iterator must be mapped to a value which must be then safely and efficiently written to the output.
- A parallel runner.
- Concurrent iterators allow to set the chunk size, or even, pull chunks with dynamic chunks sizes that can be changed during the iteration depending on observations.
- Similarly, degree of parallelization can be determined taking the input data and computation into account.
- These decisions must be taken by a parallel runner.
Building on top of concurrent iterators, these missing pieces are implemented in the orx_parallel crate which allows for high performance and configurable parallel computations expressed as compositions of parallel iterator methods.
E. Creating Concurrent Iterators
We can create concurrent iterators using two methods con_iter
and into_con_iter
. These methods belong to three traits which are explained below.
In addition, a concurrent iterator can be created from a generic iterator using
iter_into_con_iter
; however, this is explained separately above and made explicit.
E.1. IntoConcurrentIter
This trait represents all types that can be consumed and converted into a concurrent iterator.
Concurrent counterpart of the standard IntoIterator
trait.
For instance, Vec<T>
implements IntoIterator<Item = T>
; and it also implements IntoConcurrentIter<Item = T>
method.
vec.into_iter()
consumesvec
and returns an iterator yielding items ofT
.vec.into_con_iter()
consumesvec
and returns a concurrent iterator yielding items ofT
.
E.2. ConcurrentIterable
This trait represents all types that can repeatedly create concurrent iterators for its elements, without consuming the type.
Concurrent counterpart of the Iterable
trait of the orx_iterable create.
For instance, &Range<usize>
implements Iterable<Item = usize>
; and it also implements ConcurrentIterable<Item = usize>
method.
range.iter()
returns an iterator yielding items ofusize
without consumingrange
.range.con_iter()
returns a concurrent iterator yielding items ofusize
without consumingrange
.
E.3. ConcurrentCollection
This trait represents all types that can both repeatedly create concurrent iterators for references of its elements, and also converted into a concurrent iterator of its elements.
Concurrent counterpart of the Collection
trait of the orx_iterable create.
Vec<T>
both implements Collection<Item = T>
and ConcurrentCollection<Item = T>
.
- Create iterators of references without consuming the
vec
.vec.iter()
returns an iterator yielding items of&T
.vec.con_iter()
returns a concurrent iterator yielding items of&T
.
- Consume
vec
and convert it into an iterator.vec.into_iter()
consumesvec
and returns an iterator yielding items ofT
.vec.into_con_iter()
consumesvec
and returns a concurrent iterator yielding items ofT
.
Contributing
Contributions are welcome! If you notice an error, have a question or think something could be improved, please open an issue or create a PR.
License
Dual-licensed under Apache 2.0 or MIT.
Dependencies
~150KB