Skip to main content

ostd/sync/
wait.rs

1// SPDX-License-Identifier: MPL-2.0
2use vstd::atomic_ghost::*;
3use vstd::prelude::*;
4
5use alloc::{collections::VecDeque, sync::Arc};
6use core::intrinsics::atomic_cxchg;
7use core::sync::atomic::{/*AtomicBool,*/ Ordering};
8
9use super::{LocalIrqDisabled, SpinLock};
10use crate::task::{Task, scheduler};
11
12// # Explanation on the memory orders
13//
14// ```
15// [CPU 1 (the waker)]     [CPU 2 (the waiter)]
16// cond = true;
17// wake_up();
18//                         wait();
19//                         if cond { /* .. */ }
20// ```
21//
22// As soon as the waiter is woken up by the waker, it must see the true condition. This is
23// trivially satisfied if `wake_up()` and `wait()` synchronize with a lock. But if they synchronize
24// with an atomic variable, `wake_up()` must access the variable with `Ordering::Release` and
25// `wait()` must access the variable with `Ordering::Acquire`.
26//
27// Examples of `wake_up()`:
28//  - `WaitQueue::wake_one()`
29//  - `WaitQueue::wake_all()`
30//  - `Waker::wake_up()`
31//
32// Examples of `wait()`:
33//  - `WaitQueue::wait_until()`
34//  - `Waiter::wait()`
35//  - `Waiter::drop()`
36//
37// Note that dropping a waiter must be treated as a `wait()` with zero timeout, because we need to
38// make sure that the wake event isn't lost in this case.
39
40verus! {
41
42struct_with_invariants! {
43
44/// A wait queue.
45///
46/// One may wait on a wait queue to put its executing thread to sleep.
47/// Multiple threads may be the waiters of a wait queue.
48/// Other threads may invoke the `wake`-family methods of a wait queue to
49/// wake up one or many waiting threads.
50pub struct WaitQueue {
51    // A copy of `wakers.len()`, used for the lock-free fast path in `wake_one` and `wake_all`.
52    num_wakers: AtomicU32<_, (), _>,
53    wakers: SpinLock<VecDeque<Arc<Waker>>, LocalIrqDisabled>,
54}
55
56closed spec fn wf(self) -> bool {
57    invariant on num_wakers is (v: u32, g: ()) {
58        true
59    }
60}
61}
62
63impl WaitQueue {
64    #[verifier::type_invariant]
65    pub closed spec fn type_inv(self) -> bool {
66        self.wf()
67    }
68}
69
70impl WaitQueue {
71    /// Creates a new, empty wait queue.
72    pub const fn new() -> Self {
73        WaitQueue {
74            num_wakers: AtomicU32::new(Ghost(()), 0, Tracked(())),
75            wakers: SpinLock::new(VecDeque::new()),
76        }
77    }
78
79    /// Waits until some condition is met.
80    ///
81    /// This method takes a closure that tests a user-given condition.
82    /// The method only returns if the condition returns `Some(_)`.
83    /// A waker thread should first make the condition `Some(_)`, then invoke the
84    /// `wake`-family method. This ordering is important to ensure that waiter
85    /// threads do not lose any wakeup notifications.
86    ///
87    /// By taking a condition closure, this wait-wakeup mechanism becomes
88    /// more efficient and robust.
89    #[track_caller]
90    #[verus_spec(ret =>
91        requires
92            cond.requires(()),
93        ensures
94            cond.ensures((), Some(ret)),
95    )]
96    #[verifier::exec_allows_no_decreases_clause]
97    pub fn wait_until<F, R>(&self, mut cond: F) -> R where F: FnMut() -> Option<R> {
98        if let Some(res) = cond() {
99            return res;
100        }
101        let (waiter, _) = Waiter::new_pair();
102        #[verus_spec(invariant
103            cond.requires(()),
104        )]
105        loop {
106            self.enqueue(waiter.waker());
107            if let Some(res) = cond() {
108                assert(cond.ensures((), Some(res)));
109                proof! { admit(); }  // FIXME: https://github.com/verus-lang/verus/issues/2295
110                return res;
111            }
112            waiter.wait();
113        }
114    }
115
116    /// Wakes up one waiting thread, if there is one at the point of time when this method is
117    /// called, returning whether such a thread was woken up.
118    #[verifier::exec_allows_no_decreases_clause]
119    pub fn wake_one(&self) -> (r: bool) {
120        proof!{
121            use_type_invariant(self);
122        }
123
124        // Fast path
125        if self.is_empty() {
126            return false;
127        }
128        loop
129            invariant
130                self.wf(),
131        {
132            let mut wakers = self.wakers.lock();
133            let Some(waker) = wakers.pop_front() else {
134                return false;
135            };
136            atomic_with_ghost! {
137                self.num_wakers => fetch_sub(1);
138                update prev -> next;
139                ghost g => {
140                    assume(prev > 0);
141                }
142            };
143            // Avoid holding lock when calling `wake_up`
144            //drop(wakers);
145            wakers.drop();
146
147            if waker.wake_up() {
148                return true;
149            }
150        }
151    }
152
153    /// Wakes up all waiting threads, returning the number of threads that were woken up.
154    #[verifier::exec_allows_no_decreases_clause]
155    pub fn wake_all(&self) -> (r: usize) {
156        proof!{
157            use_type_invariant(self);
158        }
159
160        // Fast path
161        if self.is_empty() {
162            return 0;
163        }
164        let mut num_woken = 0;
165
166        loop
167            invariant
168                self.wf(),
169        {
170            let mut wakers = self.wakers.lock();
171            let Some(waker) = wakers.pop_front() else {
172                break;
173            };
174            atomic_with_ghost! {
175                self.num_wakers => fetch_sub(1);
176                update prev -> next;
177                ghost g => {
178                    assume(prev > 1);
179                }
180            };
181            // Avoid holding lock when calling `wake_up`
182            //drop(wakers);
183            wakers.drop();
184
185            if waker.wake_up() {
186                assume(num_woken < usize::MAX);
187                num_woken += 1;
188            }
189        }
190
191        num_woken
192    }
193
194    #[verifier::external_body]
195    fn is_empty(&self) -> bool {
196        self.num_wakers.load() == 0
197    }
198
199    /// Enqueues the input [`Waker`] to the wait queue.
200    #[doc(hidden)]
201    pub fn enqueue(&self, waker: Arc<Waker>) {
202        proof!{
203            use_type_invariant(self);
204        }
205        let mut wakers = self.wakers.lock();
206        wakers.push_back(waker);
207        atomic_with_ghost! {
208            self.num_wakers => fetch_add(1);
209            update prev -> next;
210            ghost g => {
211                assume(prev < u32::MAX);
212            }
213        };
214    }
215}
216
217impl Default for WaitQueue {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223/// A waiter that can put the current thread to sleep until it is woken up by the associated
224/// [`Waker`].
225///
226/// By definition, a waiter belongs to the current thread, so it cannot be sent to another thread
227/// and its reference cannot be shared between threads.
228pub struct Waiter {
229    waker: Arc<Waker>,
230}
231
232impl !Send for Waiter {
233
234}
235
236impl !Sync for Waiter {
237
238}
239
240impl Waiter {
241    /// Checks if the input waker is the associated waker of the current waiter.
242    pub closed spec fn rel_waker(self, waker: Arc<Waker>) -> bool {
243        self.waker == waker
244    }
245}
246
247struct_with_invariants! {
248/// A waker that can wake up the associated [`Waiter`].
249///
250/// A waker can be created by calling [`Waiter::new_pair`]. This method creates an `Arc<Waker>` that can
251/// be used across different threads.
252pub struct Waker {
253    has_woken: AtomicBool<_, (), _>, // It should attach a task-related token once we start to verify the scheduler, using () as a placeholder for now.
254    task: Arc<Task>,
255}
256
257closed spec fn wf(self) -> bool {
258    invariant on has_woken is (v: bool, g: ()) {
259        true
260    }
261}
262}
263
264impl Waker {
265    #[verifier::type_invariant]
266    pub closed spec fn type_inv(self) -> bool {
267        self.wf()
268    }
269}
270
271#[verus_verify]
272impl Waiter {
273    /// Creates a waiter and its associated [`Waker`].
274    #[verus_spec(ret =>
275        ensures
276            ret.0.rel_waker(ret.1),
277    )]
278    pub fn new_pair() -> (Self, Arc<Waker>) {
279        proof_decl! {
280            let ghost waker_id: int = arbitrary();
281        }
282        let waker = Arc::new(
283            Waker {
284                has_woken: AtomicBool::new(Ghost(()), false, Tracked(())),
285                // task: Task::current().unwrap().cloned(),
286                task: Arc::new(Task {  }),
287            },
288        );
289        let waiter = Self { waker: waker.clone() };
290        (waiter, waker)
291    }
292
293    /// Waits until the waiter is woken up by calling [`Waker::wake_up`] on the associated
294    /// [`Waker`].
295    ///
296    /// This method returns immediately if the waiter has been woken since the end of the last call
297    /// to this method (or since the waiter was created, if this method has not been called
298    /// before). Otherwise, it puts the current thread to sleep until the waiter is woken up.
299    #[track_caller]
300    pub fn wait(&self) {
301        self.waker.do_wait();
302    }
303
304    /// Waits until some condition is met or the cancel condition becomes true.
305    ///
306    /// This method will return `Ok(_)` if the condition returns `Some(_)`, and will stop waiting
307    /// if the cancel condition returns `Err(_)`. In this situation, this method will return the `Err(_)`
308    /// generated by the cancel condition.
309    #[verus_spec(ret =>
310        requires
311            cond.requires(()),
312            cancel_cond.requires(()),
313        ensures
314            match ret {
315                Ok(res) => cond.ensures((),Some(res)),
316                Err(e) => cancel_cond.ensures((), Err(e)),
317            },
318    )]
319    #[track_caller]
320    #[verifier::exec_allows_no_decreases_clause]
321    pub fn wait_until_or_cancelled<F, R, FCancel, E>(
322        &self,
323        mut cond: F,
324        cancel_cond: FCancel,
325    ) -> core::result::Result<R, E> where
326        F: FnMut() -> Option<R>,
327        FCancel: Fn() -> core::result::Result<(), E>,
328     {
329        let mut cond = cond;
330        #[verus_spec(invariant
331            cond.requires(()),
332            cancel_cond.requires(()),
333        )]
334        loop {
335            if let Some(res) = cond() {
336                assert(cond.ensures((), Some(res))) by {
337                    admit();
338                };  // FIXME:
339                proof! { admit(); }  // FIXME: https://github.com/verus-lang/verus/issues/2295
340                return Ok(res);
341            };
342            if let Err(e) = cancel_cond() {
343                // Close the waker and check again to avoid missing a wake event.
344                self.waker.close();
345                proof! { admit(); }  // FIXME: https://github.com/verus-lang/verus/issues/2295
346                return cond().ok_or(e);
347            }
348            self.wait();
349        }
350    }
351
352    /// Gets the associated [`Waker`] of the current waiter.
353    #[verus_spec(ret =>
354        ensures
355            self.rel_waker(ret),
356    )]
357    pub fn waker(&self) -> Arc<Waker> {
358        self.waker.clone()
359    }
360
361    /// Returns the task that the associated waker will attempt to wake up.
362    pub fn task(&self) -> &Arc<Task> {
363        &self.waker.task
364    }
365}
366
367/*impl Drop for Waiter {
368    #[verifier::external_body]
369    fn drop(&mut self)
370        opens_invariants none
371        no_unwind
372    {
373        // When dropping the waiter, we need to close the waker to ensure that if someone wants to
374        // wake up the waiter afterwards, they will perform a no-op.
375        self.waker.close();
376    }
377}*/
378
379impl Waiter {
380    /// VERUS LIMITATION: We implement `drop` and call it manually because Verus's support for `Drop` is incomplete for now.
381    pub fn drop(self) {
382        // When dropping the waiter, we need to close the waker to ensure that if someone wants to
383        // wake up the waiter afterwards, they will perform a no-op.
384        self.waker.close();
385    }
386}
387
388impl Waker {
389    /// Wakes up the associated [`Waiter`].
390    ///
391    /// This method returns `true` if the waiter is woken by this call. It returns `false` if the
392    /// waiter has already been woken by a previous call to the method, or if the waiter has been
393    /// dropped.
394    ///
395    /// Note that if this method returns `true`, it implies that the wake event will be properly
396    /// delivered, _or_ that the waiter will be dropped after being woken. It's up to the caller to
397    /// handle the latter case properly to avoid missing the wake event.
398    #[verifier::external_body]
399    pub fn wake_up(&self) -> bool {
400        /*if self.has_woken.swap(true, Ordering::Release) {
401            return false;
402        }
403        scheduler::unpark_target(self.task.clone());
404
405        true*/
406        unimplemented!()
407    }
408
409    #[track_caller]
410    #[verifier::external_body]
411    fn do_wait(&self) {
412        /*while !self.has_woken.swap(false, Ordering::Acquire) {
413            scheduler::park_current(|| self.has_woken.load(Ordering::Acquire));
414        }*/
415        unimplemented!()
416    }
417
418    fn close(&self) {
419        // This must use `Ordering::Acquire`, although we do not care about the return value. See
420        // the memory order explanation at the top of the file for details.
421        //let _ = self.has_woken.swap(true, Ordering::Acquire);
422        proof!{ use_type_invariant(self);}
423        let _ =
424            atomic_with_ghost!{
425            self.has_woken => swap(true);
426            update prev -> next;
427            ghost g => {}
428        };
429    }
430}
431
432} // verus!
433#[cfg(ktest)]
434mod test {
435    use super::*;
436    use crate::{prelude::*, task::TaskOptions};
437
438    fn queue_wake<F>(wake: F)
439    where
440        F: Fn(&WaitQueue) + Sync + Send + 'static,
441    {
442        let queue = Arc::new(WaitQueue::new());
443        let queue_cloned = queue.clone();
444
445        let cond = Arc::new(AtomicBool::new(false));
446        let cond_cloned = cond.clone();
447
448        TaskOptions::new(move || {
449            Task::yield_now();
450
451            cond_cloned.store(true, Ordering::Relaxed);
452            wake(&queue_cloned);
453        })
454        .data(())
455        .spawn()
456        .unwrap();
457
458        queue.wait_until(|| cond.load(Ordering::Relaxed).then_some(()));
459
460        assert!(cond.load(Ordering::Relaxed));
461    }
462
463    #[ktest]
464    fn queue_wake_one() {
465        queue_wake(|queue| {
466            queue.wake_one();
467        });
468    }
469
470    #[ktest]
471    fn queue_wake_all() {
472        queue_wake(|queue| {
473            queue.wake_all();
474        });
475    }
476
477    #[ktest]
478    fn waiter_wake_twice() {
479        let (_waiter, waker) = Waiter::new_pair();
480
481        assert!(waker.wake_up());
482        assert!(!waker.wake_up());
483    }
484
485    #[ktest]
486    fn waiter_wake_drop() {
487        let (waiter, waker) = Waiter::new_pair();
488
489        drop(waiter);
490        assert!(!waker.wake_up());
491    }
492
493    #[ktest]
494    fn waiter_wake_async() {
495        let (waiter, waker) = Waiter::new_pair();
496
497        let cond = Arc::new(AtomicBool::new(false));
498        let cond_cloned = cond.clone();
499
500        TaskOptions::new(move || {
501            Task::yield_now();
502
503            cond_cloned.store(true, Ordering::Relaxed);
504            assert!(waker.wake_up());
505        })
506        .data(())
507        .spawn()
508        .unwrap();
509
510        waiter.wait();
511
512        assert!(cond.load(Ordering::Relaxed));
513    }
514
515    #[ktest]
516    fn waiter_wake_reorder() {
517        let (waiter, waker) = Waiter::new_pair();
518
519        let cond = Arc::new(AtomicBool::new(false));
520        let cond_cloned = cond.clone();
521
522        let (waiter2, waker2) = Waiter::new_pair();
523
524        let cond2 = Arc::new(AtomicBool::new(false));
525        let cond2_cloned = cond2.clone();
526
527        TaskOptions::new(move || {
528            Task::yield_now();
529
530            cond2_cloned.store(true, Ordering::Relaxed);
531            assert!(waker2.wake_up());
532
533            Task::yield_now();
534
535            cond_cloned.store(true, Ordering::Relaxed);
536            assert!(waker.wake_up());
537        })
538        .data(())
539        .spawn()
540        .unwrap();
541
542        waiter.wait();
543        assert!(cond.load(Ordering::Relaxed));
544
545        waiter2.wait();
546        assert!(cond2.load(Ordering::Relaxed));
547    }
548}