#future #async #runtimes #async-task #waiter #top-level #points

nightly waker-waiter

Helps async runtimes interoperate with arbitrary futures

4 releases (2 breaking)

0.3.0 Jun 15, 2024
0.2.1 Sep 5, 2023
0.2.0 Jul 24, 2023
0.1.0 Mar 19, 2023

#1934 in Asynchronous

MIT license

28KB
511 lines

waker-waiter


lib.rs:

This library helps async runtimes support the execution of arbitrary futures, by enabling futures to provide their own event polling logic. It is an attempt to implement the approach described by Context reactor hook.

There are two integration points:

Only one WakerWaiter can be registered on a TopLevelPoller. If more than one future relies on the same event polling logic, the futures should coordinate and share the same WakerWaiter.

Example of a future registering a WakerWaiter

#
static REACTOR: Mutex<Option<Arc<Reactor>>> = Mutex::new(None);

struct Reactor {
    waiter: Option<WakerWaiter>,
}

impl Reactor {
    fn current() -> Arc<Reactor> {
        let mut reactor = REACTOR.lock().unwrap();

        if reactor.is_none() {
            let r = Arc::new(Reactor { waiter: None });

            let waiter = Arc::new(ReactorWaiter {
                reactor: Arc::downgrade(&r),
            }).into();

            // SAFETY: nobody else could be borrowing right now
            let r = unsafe {
                let r = (Arc::into_raw(r) as *mut Reactor).as_mut().unwrap();
                r.waiter = Some(waiter);

                Arc::from_raw(r as *const Reactor)
            };

            *reactor = Some(r);
        }

        Arc::clone(reactor.as_ref().unwrap())
    }

    fn waiter<'a>(self: &'a Arc<Self>) -> &'a WakerWaiter {
        self.waiter.as_ref().unwrap()
    }
}

struct ReactorWaiter {
    reactor: Weak<Reactor>,
}

impl WakerWait for ReactorWaiter {
    fn wait(self: &Arc<Self>) {
        // ... blocking poll for events ...
        todo!();
    }

    fn canceler(self: &Arc<Self>) -> WakerWaiterCanceler {
        // ... provide a way to unblock the above ...
        todo!();
    }
}

struct MyFuture;

impl Future for MyFuture {
#
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let p = match get_poller(cx) {
            Some(p) => p,
            None => panic!("MyFuture requires context to provide TopLevelPoller"),
        };

        if p.set_waiter(Reactor::current().waiter()).is_err() {
            panic!("Incompatible waiter already assigned to TopLevelPoller");
        }

        // ... register waker, perform I/O, etc ...
    }
}

Example of an executor providing a TopLevelPoller

#![feature(local_waker)]
#![feature(context_ext)]
struct ThreadWaker {
    thread: Thread,
    waiter: Arc<Mutex<Option<WakerWaiter>>>,
}

impl Wake for ThreadWaker {
    fn wake(self: Arc<Self>) {
        if thread::current().id() == self.thread.id() {
            // if we were woken in the same thread as execution,
            // then the wake was caused by the WakerWaiter which
            // will return control without any signaling needed
            return;
        }

        let waiter = self.waiter.lock().unwrap().clone();

        if let Some(waiter) = waiter {
            // if a waiter was configured, then the execution thread
            // will be blocking on it and we'll need to unblock it
            waiter.canceler().cancel();
        } else {
            // if a waiter was not configured, then the execution
            // thread will be asleep with a standard thread park
            self.thread.unpark();
        }
    }
}

#[derive(Clone)]
struct MyTopLevelPoller {
    waiter: Arc<Mutex<Option<WakerWaiter>>>,
}

impl TopLevelPoller for MyTopLevelPoller {
    fn set_waiter(&mut self, waiter: &WakerWaiter) -> Result<(), SetWaiterError> {
        let self_waiter = &mut *self.waiter.lock().unwrap();

        if let Some(cur) = self_waiter {
            if cur == waiter {
                return Ok(()); // already set to this waiter
            } else {
                return Err(SetWaiterError); // already set to a different waiter
            }
        }

        *self_waiter = Some(waiter.clone());

        Ok(())
    }
}

let waiter = Arc::new(Mutex::new(None));
let waker = Arc::new(ThreadWaker {
    thread: thread::current(),
    waiter: Arc::clone(&waiter),
}).into();
let mut cx = Context::from_waker(&waker);
let mut poller = MyTopLevelPoller { waiter };

let mut fut = pin!(async { /* ... */ });

loop {
   let result = {
       let mut a = Anyable::new(&mut poller as &mut dyn TopLevelPoller);
       let mut cx = ContextBuilder::from_waker(&waker).ext(a.as_any()).build();

       fut.as_mut().poll(&mut cx)
   };

   match result {
        Poll::Ready(res) => break res,
        Poll::Pending => {
            let waiter = poller.waiter.lock().unwrap().clone();

            // if a waiter is configured then block on it. else do a
            // standard thread park
            match waiter {
                Some(waiter) => waiter.wait(),
                None => thread::park(),
            }
        }
    }
}

No runtime deps