ostd/sync/
wait.rs

1// SPDX-License-Identifier: MPL-2.0
2use vstd::atomic_ghost::*;
3use vstd::prelude::*;
4
5use alloc::{collections::VecDeque, sync::Arc};
6use core::sync::atomic::{AtomicBool, Ordering};
7
8use super::{LocalIrqDisabled, SpinLock};
9use crate::task::{scheduler, Task};
10
11// # Explanation on the memory orders
12//
13// ```
14// [CPU 1 (the waker)]     [CPU 2 (the waiter)]
15// cond = true;
16// wake_up();
17//                         wait();
18//                         if cond { /* .. */ }
19// ```
20//
21// As soon as the waiter is woken up by the waker, it must see the true condition. This is
22// trivially satisfied if `wake_up()` and `wait()` synchronize with a lock. But if they synchronize
23// with an atomic variable, `wake_up()` must access the variable with `Ordering::Release` and
24// `wait()` must access the variable with `Ordering::Acquire`.
25//
26// Examples of `wake_up()`:
27//  - `WaitQueue::wake_one()`
28//  - `WaitQueue::wake_all()`
29//  - `Waker::wake_up()`
30//
31// Examples of `wait()`:
32//  - `WaitQueue::wait_until()`
33//  - `Waiter::wait()`
34//  - `Waiter::drop()`
35//
36// Note that dropping a waiter must be treated as a `wait()` with zero timeout, because we need to
37// make sure that the wake event isn't lost in this case.
38
39verus! {
40
41pub tracked struct WaitQueueGhost {
42    pub queued_wakers: Seq<int>,
43}
44
45struct_with_invariants! {
46
47/// A wait queue.
48///
49/// One may wait on a wait queue to put its executing thread to sleep.
50/// Multiple threads may be the waiters of a wait queue.
51/// Other threads may invoke the `wake`-family methods of a wait queue to
52/// wake up one or many waiting threads.
53pub struct WaitQueue {
54    // A copy of `wakers.len()`, used for the lock-free fast path in `wake_one` and `wake_all`.
55    num_wakers: AtomicU32<_, WaitQueueGhost, _>,
56    wakers: SpinLock<VecDeque<Arc<Waker>>, LocalIrqDisabled>,
57}
58
59closed spec fn wf(self) -> bool {
60    invariant on num_wakers is (v: u32, g: WaitQueueGhost) {
61        &&& g.queued_wakers.len() == v as int
62    }
63}
64}
65
66impl WaitQueue {
67    #[verifier::type_invariant]
68    pub closed spec fn type_inv(self) -> bool {
69        self.wf()
70    }
71}
72
73impl WaitQueue {
74    /// Creates a new, empty wait queue.
75    #[verifier::external_body]
76    pub const fn new() -> Self {
77        WaitQueue {
78            num_wakers: AtomicU32::new(
79                Ghost(()),
80                0,
81                Tracked(WaitQueueGhost { queued_wakers: seq![] }),
82            ),
83            wakers: SpinLock::new(VecDeque::new()),
84        }
85    }
86
87    /// Waits until some condition is met.
88    ///
89    /// This method takes a closure that tests a user-given condition.
90    /// The method only returns if the condition returns `Some(_)`.
91    /// A waker thread should first make the condition `Some(_)`, then invoke the
92    /// `wake`-family method. This ordering is important to ensure that waiter
93    /// threads do not lose any wakeup notifications.
94    ///
95    /// By taking a condition closure, this wait-wakeup mechanism becomes
96    /// more efficient and robust.
97    #[track_caller]
98    #[verus_spec(ret =>
99        requires
100            cond.requires(()),
101        ensures
102            cond.ensures((), Some(ret)),
103    )]
104    pub fn wait_until<F, R>(&self, mut cond: F) -> R where F: FnMut() -> Option<R> {
105        if let Some(res) = cond() {
106            return res;
107        }
108        let (waiter, _) = Waiter::new_pair();
109        let cond = ||
110            {
111                self.enqueue(waiter.waker());
112                cond()
113            };
114        proof!{
115            admit(); // FIXME: The Verus type inference of the closure is wrong if we add an `ensures` clause to the closure.
116        }
117        waiter.wait_until_or_cancelled(cond, || -> (ret:Result<(), ()>) ensures ret == Ok::<(),()>(()) { Ok::<(), ()>(()) }).unwrap()
118    }
119
120    /// Wakes up one waiting thread, if there is one at the point of time when this method is
121    /// called, returning whether such a thread was woken up.
122    #[verifier::external_body]
123    pub fn wake_one(&self) -> (r: bool)
124    {
125        // Fast path
126        if self.is_empty() {
127            return false;
128        }
129        loop {
130            let mut wakers = self.wakers.lock();
131            let Some(waker) = wakers.pop_front() else {
132                return false;
133            };
134            atomic_with_ghost! {
135                self.num_wakers => fetch_sub(1);
136                update prev -> next;
137                ghost g => {
138                    g = WaitQueueGhost { queued_wakers: g.queued_wakers.drop_first() };
139                }
140            };
141            // Avoid holding lock when calling `wake_up`
142            drop(wakers);
143
144            if waker.wake_up() {
145                return true;
146            }
147        }
148    }
149
150    /// Wakes up all waiting threads, returning the number of threads that were woken up.
151    #[verifier::external_body]
152    pub fn wake_all(&self) -> (r: usize)
153    {
154        // Fast path
155        if self.is_empty() {
156            return 0;
157        }
158        let mut num_woken = 0;
159
160        loop {
161            let mut wakers = self.wakers.lock();
162            let Some(waker) = wakers.pop_front() else {
163                break ;
164            };
165            atomic_with_ghost! {
166                self.num_wakers => fetch_sub(1);
167                update prev -> next;
168                ghost g => {
169                    g = WaitQueueGhost { queued_wakers: g.queued_wakers.drop_first() };
170                }
171            };
172            // Avoid holding lock when calling `wake_up`
173            drop(wakers);
174
175            if waker.wake_up() {
176                num_woken += 1;
177            }
178        }
179
180        num_woken
181    }
182
183    #[verifier::external_body]
184    fn is_empty(&self) -> bool {
185        self.num_wakers.load() == 0
186    }
187
188    /// Enqueues the input [`Waker`] to the wait queue.
189    #[doc(hidden)]
190    #[verifier::external_body]
191    pub fn enqueue(&self, waker: Arc<Waker>)
192    {
193        let mut wakers = self.wakers.lock();
194        wakers.push_back(waker);
195        atomic_with_ghost! {
196            self.num_wakers => fetch_add(1);
197            update prev -> next;
198            ghost g => {
199                g = WaitQueueGhost { queued_wakers: g.queued_wakers.push(waker.id()) };
200            }
201        };
202    }
203}
204
205impl Default for WaitQueue {
206    #[verifier::external_body]
207    fn default() -> Self {
208        Self::new()
209    }
210}
211
212/// A waiter that can put the current thread to sleep until it is woken up by the associated
213/// [`Waker`].
214///
215/// By definition, a waiter belongs to the current thread, so it cannot be sent to another thread
216/// and its reference cannot be shared between threads.
217pub struct Waiter {
218    waker: Arc<Waker>,
219}
220
221impl !Send for Waiter {
222
223}
224
225impl !Sync for Waiter {
226
227}
228
229impl Waiter {
230    /// Checks if the input waker is the associated waker of the current waiter.
231    pub closed spec fn rel_waker(self, waker: Arc<Waker>) -> bool {
232        self.waker == waker
233    }
234
235    /// Abstract identity of the paired waker.
236    pub closed spec fn waker_id(self) -> int {
237        self.waker.id()
238    }
239}
240
241/// A waker that can wake up the associated [`Waiter`].
242///
243/// A waker can be created by calling [`Waiter::new_pair`]. This method creates an `Arc<Waker>` that can
244/// be used across different threads.
245pub struct Waker {
246    has_woken: AtomicBool,
247    task: Arc<Task>,
248    v_id: Ghost<int>,
249}
250
251impl Waker {
252    /// Abstract identity used by the queue model.
253    pub closed spec fn id(self) -> int {
254        self.v_id@
255    }
256}
257
258#[verus_verify]
259impl Waiter {
260    /// Creates a waiter and its associated [`Waker`].
261    #[verus_spec(ret =>
262        ensures
263            ret.0.rel_waker(ret.1),
264    )]
265    pub fn new_pair() -> (Self, Arc<Waker>) {
266        proof_decl! {
267            let ghost waker_id: int = arbitrary();
268        }
269        let waker = Arc::new(
270            Waker {
271                has_woken: AtomicBool::new(false),
272                // task: Task::current().unwrap().cloned(),
273                task: Arc::new(Task {  }),
274                v_id: Ghost(waker_id),
275            },
276        );
277        let waiter = Self { waker: waker.clone() };
278        (waiter, waker)
279    }
280
281    /// Waits until the waiter is woken up by calling [`Waker::wake_up`] on the associated
282    /// [`Waker`].
283    ///
284    /// This method returns immediately if the waiter has been woken since the end of the last call
285    /// to this method (or since the waiter was created, if this method has not been called
286    /// before). Otherwise, it puts the current thread to sleep until the waiter is woken up.
287    #[track_caller]
288    pub fn wait(&self) {
289        self.waker.do_wait();
290    }
291
292    /// Waits until some condition is met or the cancel condition becomes true.
293    ///
294    /// This method will return `Ok(_)` if the condition returns `Some(_)`, and will stop waiting
295    /// if the cancel condition returns `Err(_)`. In this situation, this method will return the `Err(_)`
296    /// generated by the cancel condition.
297    #[verus_spec(ret =>
298        requires
299            cond.requires(()),
300            cancel_cond.requires(()),
301        ensures
302            match ret {
303                Ok(res) => cond.ensures((),Some(res)),
304                Err(e) => cancel_cond.ensures((), Err(e)),
305            },
306    )]
307    #[track_caller]
308    #[verifier::exec_allows_no_decreases_clause]
309    pub fn wait_until_or_cancelled<F, R, FCancel, E>(
310        &self,
311        mut cond: F,
312        cancel_cond: FCancel,
313    ) -> core::result::Result<R, E> where
314        F: FnMut() -> Option<R>,
315        FCancel: Fn() -> core::result::Result<(), E>,
316     {
317        let mut cond = cond;
318        #[verus_spec(invariant
319            cond.requires(()),
320            cancel_cond.requires(()),
321        )]
322        loop {
323            if let Some(res) = cond() {
324                assert(cond.ensures((), Some(res)));
325                proof! { admit(); } // FIXME: https://github.com/verus-lang/verus/issues/2295
326                return Ok(res);
327            };
328            if let Err(e) = cancel_cond() {
329                // Close the waker and check again to avoid missing a wake event.
330                self.waker.close();
331                proof! { admit(); } // FIXME: https://github.com/verus-lang/verus/issues/2295
332                return cond().ok_or(e);
333            }
334            self.wait();
335        }
336    }
337
338    /// Gets the associated [`Waker`] of the current waiter.
339    #[verus_spec(ret =>
340        ensures
341            self.rel_waker(ret),
342    )]
343    pub fn waker(&self) -> Arc<Waker> {
344        self.waker.clone()
345    }
346
347    /// Returns the task that the associated waker will attempt to wake up.
348    pub fn task(&self) -> &Arc<Task> {
349        &self.waker.task
350    }
351}
352
353impl Drop for Waiter {
354    #[verifier::external_body]
355    fn drop(&mut self)
356        opens_invariants none
357        no_unwind
358    {
359        // When dropping the waiter, we need to close the waker to ensure that if someone wants to
360        // wake up the waiter afterwards, they will perform a no-op.
361        self.waker.close();
362    }
363}
364
365impl Waker {
366    /// Wakes up the associated [`Waiter`].
367    ///
368    /// This method returns `true` if the waiter is woken by this call. It returns `false` if the
369    /// waiter has already been woken by a previous call to the method, or if the waiter has been
370    /// dropped.
371    ///
372    /// Note that if this method returns `true`, it implies that the wake event will be properly
373    /// delivered, _or_ that the waiter will be dropped after being woken. It's up to the caller to
374    /// handle the latter case properly to avoid missing the wake event.
375    #[verifier::external_body]
376    pub fn wake_up(&self) -> bool {
377        if self.has_woken.swap(true, Ordering::Release) {
378            return false;
379        }
380        scheduler::unpark_target(self.task.clone());
381
382        true
383    }
384
385    #[track_caller]
386    #[verifier::external_body]
387    fn do_wait(&self) {
388        while !self.has_woken.swap(false, Ordering::Acquire) {
389            scheduler::park_current(|| self.has_woken.load(Ordering::Acquire));
390        }
391    }
392
393    #[verifier::external_body]
394    fn close(&self) {
395        // This must use `Ordering::Acquire`, although we do not care about the return value. See
396        // the memory order explanation at the top of the file for details.
397        let _ = self.has_woken.swap(true, Ordering::Acquire);
398    }
399}
400
401} // verus!
402#[cfg(ktest)]
403mod test {
404    use super::*;
405    use crate::{prelude::*, task::TaskOptions};
406
407    fn queue_wake<F>(wake: F)
408    where
409        F: Fn(&WaitQueue) + Sync + Send + 'static,
410    {
411        let queue = Arc::new(WaitQueue::new());
412        let queue_cloned = queue.clone();
413
414        let cond = Arc::new(AtomicBool::new(false));
415        let cond_cloned = cond.clone();
416
417        TaskOptions::new(move || {
418            Task::yield_now();
419
420            cond_cloned.store(true, Ordering::Relaxed);
421            wake(&queue_cloned);
422        })
423        .data(())
424        .spawn()
425        .unwrap();
426
427        queue.wait_until(|| cond.load(Ordering::Relaxed).then_some(()));
428
429        assert!(cond.load(Ordering::Relaxed));
430    }
431
432    #[ktest]
433    fn queue_wake_one() {
434        queue_wake(|queue| {
435            queue.wake_one();
436        });
437    }
438
439    #[ktest]
440    fn queue_wake_all() {
441        queue_wake(|queue| {
442            queue.wake_all();
443        });
444    }
445
446    #[ktest]
447    fn waiter_wake_twice() {
448        let (_waiter, waker) = Waiter::new_pair();
449
450        assert!(waker.wake_up());
451        assert!(!waker.wake_up());
452    }
453
454    #[ktest]
455    fn waiter_wake_drop() {
456        let (waiter, waker) = Waiter::new_pair();
457
458        drop(waiter);
459        assert!(!waker.wake_up());
460    }
461
462    #[ktest]
463    fn waiter_wake_async() {
464        let (waiter, waker) = Waiter::new_pair();
465
466        let cond = Arc::new(AtomicBool::new(false));
467        let cond_cloned = cond.clone();
468
469        TaskOptions::new(move || {
470            Task::yield_now();
471
472            cond_cloned.store(true, Ordering::Relaxed);
473            assert!(waker.wake_up());
474        })
475        .data(())
476        .spawn()
477        .unwrap();
478
479        waiter.wait();
480
481        assert!(cond.load(Ordering::Relaxed));
482    }
483
484    #[ktest]
485    fn waiter_wake_reorder() {
486        let (waiter, waker) = Waiter::new_pair();
487
488        let cond = Arc::new(AtomicBool::new(false));
489        let cond_cloned = cond.clone();
490
491        let (waiter2, waker2) = Waiter::new_pair();
492
493        let cond2 = Arc::new(AtomicBool::new(false));
494        let cond2_cloned = cond2.clone();
495
496        TaskOptions::new(move || {
497            Task::yield_now();
498
499            cond2_cloned.store(true, Ordering::Relaxed);
500            assert!(waker2.wake_up());
501
502            Task::yield_now();
503
504            cond_cloned.store(true, Ordering::Relaxed);
505            assert!(waker.wake_up());
506        })
507        .data(())
508        .spawn()
509        .unwrap();
510
511        waiter.wait();
512        assert!(cond.load(Ordering::Relaxed));
513
514        waiter2.wait();
515        assert!(cond2.load(Ordering::Relaxed));
516    }
517}