4 releases
0.2.2 | Aug 26, 2024 |
---|---|
0.2.1 | Aug 26, 2024 |
0.2.0 | Aug 26, 2024 |
0.1.0 | Jan 26, 2024 |
#335 in Concurrency
21KB
411 lines
a scheduler of dag/pipeline computation graph.
- dag scheduler and nodes. each node of a dag graph will launch multi threads for computation.
- src/scheduler.rs
- scr/nodes.rs
- pipline scheduler and nodes. num of nodes means num of threads in pipeline
- pipeline/ppl_scheduler.rs
- pipeline/ppl_node.rs
pipeline_scheduler
#[derive(Debug)]
pub struct DummyMsg {
pub val: usize,
}
pub struct SourceNode{
counter: Arc<Mutex<usize>>
}
impl SourceNode {
pub fn new(num: usize) -> Vec<Box<dyn TPplNode<MsgType = DummyMsg> + Sync>> {
(0..num).into_iter()
.map(|_|
Box::new(SourceNode{counter: Arc::new(Mutex::new(2))})
as Box<dyn TPplNode<MsgType = DummyMsg> + Sync>)
.collect()
}
}
impl TPplNode for SourceNode {
type MsgType = DummyMsg;
fn work_fn(&mut self, inp_v: Option<Self::MsgType>) -> Option<Vec<Self::MsgType>> {
let mut v = self.counter.lock().unwrap();
if *v == 0 {
None
} else {
let old = *v;
*v -= 1;
Some(vec![DummyMsg { val: old}])
}
}
fn name(&self) -> &str {
"SourceNode"
}
}
pub struct MiddleNode {
}
impl MiddleNode {
pub fn new(num: usize) -> Vec<Box<dyn TPplNode<MsgType = DummyMsg> + Sync>> {
(0..num).into_iter()
.map(|_|
Box::new(MiddleNode{})
as Box<dyn TPplNode<MsgType = DummyMsg> + Sync>)
.collect()
}
}
impl TPplNode for MiddleNode{
type MsgType = DummyMsg;
fn work_fn(&mut self, inp_v: Option<Self::MsgType>) -> Option<Vec<Self::MsgType>> {
inp_v.and_then(|v| Some(vec![DummyMsg{val: v.val * 10}]))
}
fn name(&self) -> &str {
"MiddleNode"
}
}
pub struct SinkNode {
}
impl SinkNode {
pub fn new(num: usize) -> Vec<Box<dyn TPplNode<MsgType = DummyMsg> + Sync>> {
(0..num).into_iter()
.map(|_|
Box::new(SinkNode{})
as Box<dyn TPplNode<MsgType = DummyMsg> + Sync>)
.collect()
}
}
impl TPplNode for SinkNode {
type MsgType = DummyMsg ;
fn work_fn(&mut self, inp_v: Option<Self::MsgType>) -> Option<Vec<Self::MsgType>> {
println!("v:{:?}", inp_v.unwrap());
None
}
fn name(&self) -> &str {
"SinkNode"
}
}
let source_nodes = SourceNode::new(5);
let middle_nodes = MiddleNode::new(3);
let sink_nodes = SinkNode::new(4);
let mut pipeline = PplScheduler::new();
pipeline.add_single_worker_nodes(NodeType::Source(2), source_nodes);
pipeline.add_single_worker_nodes(NodeType::Middle(2), middle_nodes);
pipeline.add_single_worker_nodes(NodeType::Sink, sink_nodes);
join_all_nodes_handles(pipeline.start(None).0);
Dependencies
~485KB