ostd/sync/wait.rs
1// SPDX-License-Identifier: MPL-2.0
2use vstd::atomic_ghost::*;
3use vstd::prelude::*;
4
5use alloc::{collections::VecDeque, sync::Arc};
6use core::intrinsics::atomic_cxchg;
7use core::sync::atomic::{/*AtomicBool,*/ Ordering};
8
9use super::{LocalIrqDisabled, SpinLock};
10use crate::task::{Task, scheduler};
11
12// # Explanation on the memory orders
13//
14// ```
15// [CPU 1 (the waker)] [CPU 2 (the waiter)]
16// cond = true;
17// wake_up();
18// wait();
19// if cond { /* .. */ }
20// ```
21//
22// As soon as the waiter is woken up by the waker, it must see the true condition. This is
23// trivially satisfied if `wake_up()` and `wait()` synchronize with a lock. But if they synchronize
24// with an atomic variable, `wake_up()` must access the variable with `Ordering::Release` and
25// `wait()` must access the variable with `Ordering::Acquire`.
26//
27// Examples of `wake_up()`:
28// - `WaitQueue::wake_one()`
29// - `WaitQueue::wake_all()`
30// - `Waker::wake_up()`
31//
32// Examples of `wait()`:
33// - `WaitQueue::wait_until()`
34// - `Waiter::wait()`
35// - `Waiter::drop()`
36//
37// Note that dropping a waiter must be treated as a `wait()` with zero timeout, because we need to
38// make sure that the wake event isn't lost in this case.
39
40verus! {
41
42struct_with_invariants! {
43
44/// A wait queue.
45///
46/// One may wait on a wait queue to put its executing thread to sleep.
47/// Multiple threads may be the waiters of a wait queue.
48/// Other threads may invoke the `wake`-family methods of a wait queue to
49/// wake up one or many waiting threads.
50pub struct WaitQueue {
51 // A copy of `wakers.len()`, used for the lock-free fast path in `wake_one` and `wake_all`.
52 num_wakers: AtomicU32<_, (), _>,
53 wakers: SpinLock<VecDeque<Arc<Waker>>, LocalIrqDisabled>,
54}
55
56closed spec fn wf(self) -> bool {
57 invariant on num_wakers is (v: u32, g: ()) {
58 true
59 }
60}
61}
62
63impl WaitQueue {
64 #[verifier::type_invariant]
65 pub closed spec fn type_inv(self) -> bool {
66 self.wf()
67 }
68}
69
70impl WaitQueue {
71 /// Creates a new, empty wait queue.
72 pub const fn new() -> Self {
73 WaitQueue {
74 num_wakers: AtomicU32::new(Ghost(()), 0, Tracked(())),
75 wakers: SpinLock::new(VecDeque::new()),
76 }
77 }
78
79 /// Waits until some condition is met.
80 ///
81 /// This method takes a closure that tests a user-given condition.
82 /// The method only returns if the condition returns `Some(_)`.
83 /// A waker thread should first make the condition `Some(_)`, then invoke the
84 /// `wake`-family method. This ordering is important to ensure that waiter
85 /// threads do not lose any wakeup notifications.
86 ///
87 /// By taking a condition closure, this wait-wakeup mechanism becomes
88 /// more efficient and robust.
89 #[track_caller]
90 #[verus_spec(ret =>
91 requires
92 cond.requires(()),
93 ensures
94 cond.ensures((), Some(ret)),
95 )]
96 #[verifier::exec_allows_no_decreases_clause]
97 pub fn wait_until<F, R>(&self, mut cond: F) -> R where F: FnMut() -> Option<R> {
98 if let Some(res) = cond() {
99 return res;
100 }
101 let (waiter, _) = Waiter::new_pair();
102 #[verus_spec(invariant
103 cond.requires(()),
104 )]
105 loop {
106 self.enqueue(waiter.waker());
107 if let Some(res) = cond() {
108 assert(cond.ensures((), Some(res)));
109 proof! { admit(); } // FIXME: https://github.com/verus-lang/verus/issues/2295
110 return res;
111 }
112 waiter.wait();
113 }
114 }
115
116 /// Wakes up one waiting thread, if there is one at the point of time when this method is
117 /// called, returning whether such a thread was woken up.
118 #[verifier::exec_allows_no_decreases_clause]
119 pub fn wake_one(&self) -> (r: bool) {
120 proof!{
121 use_type_invariant(self);
122 }
123
124 // Fast path
125 if self.is_empty() {
126 return false;
127 }
128 loop
129 invariant
130 self.wf(),
131 {
132 let mut wakers = self.wakers.lock();
133 let Some(waker) = wakers.pop_front() else {
134 return false;
135 };
136 atomic_with_ghost! {
137 self.num_wakers => fetch_sub(1);
138 update prev -> next;
139 ghost g => {
140 assume(prev > 0);
141 }
142 };
143 // Avoid holding lock when calling `wake_up`
144 //drop(wakers);
145 wakers.drop();
146
147 if waker.wake_up() {
148 return true;
149 }
150 }
151 }
152
153 /// Wakes up all waiting threads, returning the number of threads that were woken up.
154 #[verifier::exec_allows_no_decreases_clause]
155 pub fn wake_all(&self) -> (r: usize) {
156 proof!{
157 use_type_invariant(self);
158 }
159
160 // Fast path
161 if self.is_empty() {
162 return 0;
163 }
164 let mut num_woken = 0;
165
166 loop
167 invariant
168 self.wf(),
169 {
170 let mut wakers = self.wakers.lock();
171 let Some(waker) = wakers.pop_front() else {
172 break;
173 };
174 atomic_with_ghost! {
175 self.num_wakers => fetch_sub(1);
176 update prev -> next;
177 ghost g => {
178 assume(prev > 1);
179 }
180 };
181 // Avoid holding lock when calling `wake_up`
182 //drop(wakers);
183 wakers.drop();
184
185 if waker.wake_up() {
186 assume(num_woken < usize::MAX);
187 num_woken += 1;
188 }
189 }
190
191 num_woken
192 }
193
194 #[verifier::external_body]
195 fn is_empty(&self) -> bool {
196 self.num_wakers.load() == 0
197 }
198
199 /// Enqueues the input [`Waker`] to the wait queue.
200 #[doc(hidden)]
201 pub fn enqueue(&self, waker: Arc<Waker>) {
202 proof!{
203 use_type_invariant(self);
204 }
205 let mut wakers = self.wakers.lock();
206 wakers.push_back(waker);
207 atomic_with_ghost! {
208 self.num_wakers => fetch_add(1);
209 update prev -> next;
210 ghost g => {
211 assume(prev < u32::MAX);
212 }
213 };
214 }
215}
216
217impl Default for WaitQueue {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223/// A waiter that can put the current thread to sleep until it is woken up by the associated
224/// [`Waker`].
225///
226/// By definition, a waiter belongs to the current thread, so it cannot be sent to another thread
227/// and its reference cannot be shared between threads.
228pub struct Waiter {
229 waker: Arc<Waker>,
230}
231
232impl !Send for Waiter {
233
234}
235
236impl !Sync for Waiter {
237
238}
239
240impl Waiter {
241 /// Checks if the input waker is the associated waker of the current waiter.
242 pub closed spec fn rel_waker(self, waker: Arc<Waker>) -> bool {
243 self.waker == waker
244 }
245}
246
247struct_with_invariants! {
248/// A waker that can wake up the associated [`Waiter`].
249///
250/// A waker can be created by calling [`Waiter::new_pair`]. This method creates an `Arc<Waker>` that can
251/// be used across different threads.
252pub struct Waker {
253 has_woken: AtomicBool<_, (), _>, // It should attach a task-related token once we start to verify the scheduler, using () as a placeholder for now.
254 task: Arc<Task>,
255}
256
257closed spec fn wf(self) -> bool {
258 invariant on has_woken is (v: bool, g: ()) {
259 true
260 }
261}
262}
263
264impl Waker {
265 #[verifier::type_invariant]
266 pub closed spec fn type_inv(self) -> bool {
267 self.wf()
268 }
269}
270
271#[verus_verify]
272impl Waiter {
273 /// Creates a waiter and its associated [`Waker`].
274 #[verus_spec(ret =>
275 ensures
276 ret.0.rel_waker(ret.1),
277 )]
278 pub fn new_pair() -> (Self, Arc<Waker>) {
279 proof_decl! {
280 let ghost waker_id: int = arbitrary();
281 }
282 let waker = Arc::new(
283 Waker {
284 has_woken: AtomicBool::new(Ghost(()), false, Tracked(())),
285 // task: Task::current().unwrap().cloned(),
286 task: Arc::new(Task { }),
287 },
288 );
289 let waiter = Self { waker: waker.clone() };
290 (waiter, waker)
291 }
292
293 /// Waits until the waiter is woken up by calling [`Waker::wake_up`] on the associated
294 /// [`Waker`].
295 ///
296 /// This method returns immediately if the waiter has been woken since the end of the last call
297 /// to this method (or since the waiter was created, if this method has not been called
298 /// before). Otherwise, it puts the current thread to sleep until the waiter is woken up.
299 #[track_caller]
300 pub fn wait(&self) {
301 self.waker.do_wait();
302 }
303
304 /// Waits until some condition is met or the cancel condition becomes true.
305 ///
306 /// This method will return `Ok(_)` if the condition returns `Some(_)`, and will stop waiting
307 /// if the cancel condition returns `Err(_)`. In this situation, this method will return the `Err(_)`
308 /// generated by the cancel condition.
309 #[verus_spec(ret =>
310 requires
311 cond.requires(()),
312 cancel_cond.requires(()),
313 ensures
314 match ret {
315 Ok(res) => cond.ensures((),Some(res)),
316 Err(e) => cancel_cond.ensures((), Err(e)),
317 },
318 )]
319 #[track_caller]
320 #[verifier::exec_allows_no_decreases_clause]
321 pub fn wait_until_or_cancelled<F, R, FCancel, E>(
322 &self,
323 mut cond: F,
324 cancel_cond: FCancel,
325 ) -> core::result::Result<R, E> where
326 F: FnMut() -> Option<R>,
327 FCancel: Fn() -> core::result::Result<(), E>,
328 {
329 let mut cond = cond;
330 #[verus_spec(invariant
331 cond.requires(()),
332 cancel_cond.requires(()),
333 )]
334 loop {
335 if let Some(res) = cond() {
336 assert(cond.ensures((), Some(res))) by {
337 admit();
338 }; // FIXME:
339 proof! { admit(); } // FIXME: https://github.com/verus-lang/verus/issues/2295
340 return Ok(res);
341 };
342 if let Err(e) = cancel_cond() {
343 // Close the waker and check again to avoid missing a wake event.
344 self.waker.close();
345 proof! { admit(); } // FIXME: https://github.com/verus-lang/verus/issues/2295
346 return cond().ok_or(e);
347 }
348 self.wait();
349 }
350 }
351
352 /// Gets the associated [`Waker`] of the current waiter.
353 #[verus_spec(ret =>
354 ensures
355 self.rel_waker(ret),
356 )]
357 pub fn waker(&self) -> Arc<Waker> {
358 self.waker.clone()
359 }
360
361 /// Returns the task that the associated waker will attempt to wake up.
362 pub fn task(&self) -> &Arc<Task> {
363 &self.waker.task
364 }
365}
366
367/*impl Drop for Waiter {
368 #[verifier::external_body]
369 fn drop(&mut self)
370 opens_invariants none
371 no_unwind
372 {
373 // When dropping the waiter, we need to close the waker to ensure that if someone wants to
374 // wake up the waiter afterwards, they will perform a no-op.
375 self.waker.close();
376 }
377}*/
378
379impl Waiter {
380 /// VERUS LIMITATION: We implement `drop` and call it manually because Verus's support for `Drop` is incomplete for now.
381 pub fn drop(self) {
382 // When dropping the waiter, we need to close the waker to ensure that if someone wants to
383 // wake up the waiter afterwards, they will perform a no-op.
384 self.waker.close();
385 }
386}
387
388impl Waker {
389 /// Wakes up the associated [`Waiter`].
390 ///
391 /// This method returns `true` if the waiter is woken by this call. It returns `false` if the
392 /// waiter has already been woken by a previous call to the method, or if the waiter has been
393 /// dropped.
394 ///
395 /// Note that if this method returns `true`, it implies that the wake event will be properly
396 /// delivered, _or_ that the waiter will be dropped after being woken. It's up to the caller to
397 /// handle the latter case properly to avoid missing the wake event.
398 #[verifier::external_body]
399 pub fn wake_up(&self) -> bool {
400 /*if self.has_woken.swap(true, Ordering::Release) {
401 return false;
402 }
403 scheduler::unpark_target(self.task.clone());
404
405 true*/
406 unimplemented!()
407 }
408
409 #[track_caller]
410 #[verifier::external_body]
411 fn do_wait(&self) {
412 /*while !self.has_woken.swap(false, Ordering::Acquire) {
413 scheduler::park_current(|| self.has_woken.load(Ordering::Acquire));
414 }*/
415 unimplemented!()
416 }
417
418 fn close(&self) {
419 // This must use `Ordering::Acquire`, although we do not care about the return value. See
420 // the memory order explanation at the top of the file for details.
421 //let _ = self.has_woken.swap(true, Ordering::Acquire);
422 proof!{ use_type_invariant(self);}
423 let _ =
424 atomic_with_ghost!{
425 self.has_woken => swap(true);
426 update prev -> next;
427 ghost g => {}
428 };
429 }
430}
431
432} // verus!
433#[cfg(ktest)]
434mod test {
435 use super::*;
436 use crate::{prelude::*, task::TaskOptions};
437
438 fn queue_wake<F>(wake: F)
439 where
440 F: Fn(&WaitQueue) + Sync + Send + 'static,
441 {
442 let queue = Arc::new(WaitQueue::new());
443 let queue_cloned = queue.clone();
444
445 let cond = Arc::new(AtomicBool::new(false));
446 let cond_cloned = cond.clone();
447
448 TaskOptions::new(move || {
449 Task::yield_now();
450
451 cond_cloned.store(true, Ordering::Relaxed);
452 wake(&queue_cloned);
453 })
454 .data(())
455 .spawn()
456 .unwrap();
457
458 queue.wait_until(|| cond.load(Ordering::Relaxed).then_some(()));
459
460 assert!(cond.load(Ordering::Relaxed));
461 }
462
463 #[ktest]
464 fn queue_wake_one() {
465 queue_wake(|queue| {
466 queue.wake_one();
467 });
468 }
469
470 #[ktest]
471 fn queue_wake_all() {
472 queue_wake(|queue| {
473 queue.wake_all();
474 });
475 }
476
477 #[ktest]
478 fn waiter_wake_twice() {
479 let (_waiter, waker) = Waiter::new_pair();
480
481 assert!(waker.wake_up());
482 assert!(!waker.wake_up());
483 }
484
485 #[ktest]
486 fn waiter_wake_drop() {
487 let (waiter, waker) = Waiter::new_pair();
488
489 drop(waiter);
490 assert!(!waker.wake_up());
491 }
492
493 #[ktest]
494 fn waiter_wake_async() {
495 let (waiter, waker) = Waiter::new_pair();
496
497 let cond = Arc::new(AtomicBool::new(false));
498 let cond_cloned = cond.clone();
499
500 TaskOptions::new(move || {
501 Task::yield_now();
502
503 cond_cloned.store(true, Ordering::Relaxed);
504 assert!(waker.wake_up());
505 })
506 .data(())
507 .spawn()
508 .unwrap();
509
510 waiter.wait();
511
512 assert!(cond.load(Ordering::Relaxed));
513 }
514
515 #[ktest]
516 fn waiter_wake_reorder() {
517 let (waiter, waker) = Waiter::new_pair();
518
519 let cond = Arc::new(AtomicBool::new(false));
520 let cond_cloned = cond.clone();
521
522 let (waiter2, waker2) = Waiter::new_pair();
523
524 let cond2 = Arc::new(AtomicBool::new(false));
525 let cond2_cloned = cond2.clone();
526
527 TaskOptions::new(move || {
528 Task::yield_now();
529
530 cond2_cloned.store(true, Ordering::Relaxed);
531 assert!(waker2.wake_up());
532
533 Task::yield_now();
534
535 cond_cloned.store(true, Ordering::Relaxed);
536 assert!(waker.wake_up());
537 })
538 .data(())
539 .spawn()
540 .unwrap();
541
542 waiter.wait();
543 assert!(cond.load(Ordering::Relaxed));
544
545 waiter2.wait();
546 assert!(cond2.load(Ordering::Relaxed));
547 }
548}