1use vstd::atomic_ghost::*;
3use vstd::prelude::*;
4
5use alloc::{collections::VecDeque, sync::Arc};
6use core::sync::atomic::{AtomicBool, Ordering};
7
8use super::{LocalIrqDisabled, SpinLock};
9use crate::task::{scheduler, Task};
10
11verus! {
40
41pub tracked struct WaitQueueGhost {
42 pub queued_wakers: Seq<int>,
43}
44
45struct_with_invariants! {
46
47pub struct WaitQueue {
54 num_wakers: AtomicU32<_, WaitQueueGhost, _>,
56 wakers: SpinLock<VecDeque<Arc<Waker>>, LocalIrqDisabled>,
57}
58
59closed spec fn wf(self) -> bool {
60 invariant on num_wakers is (v: u32, g: WaitQueueGhost) {
61 &&& g.queued_wakers.len() == v as int
62 }
63}
64}
65
66impl WaitQueue {
67 #[verifier::type_invariant]
68 pub closed spec fn type_inv(self) -> bool {
69 self.wf()
70 }
71}
72
73impl WaitQueue {
74 #[verifier::external_body]
76 pub const fn new() -> Self {
77 WaitQueue {
78 num_wakers: AtomicU32::new(
79 Ghost(()),
80 0,
81 Tracked(WaitQueueGhost { queued_wakers: seq![] }),
82 ),
83 wakers: SpinLock::new(VecDeque::new()),
84 }
85 }
86
87 #[track_caller]
98 #[verus_spec(ret =>
99 requires
100 cond.requires(()),
101 ensures
102 cond.ensures((), Some(ret)),
103 )]
104 pub fn wait_until<F, R>(&self, mut cond: F) -> R where F: FnMut() -> Option<R> {
105 if let Some(res) = cond() {
106 return res;
107 }
108 let (waiter, _) = Waiter::new_pair();
109 let cond = ||
110 {
111 self.enqueue(waiter.waker());
112 cond()
113 };
114 proof!{
115 admit(); }
117 waiter.wait_until_or_cancelled(cond, || -> (ret:Result<(), ()>) ensures ret == Ok::<(),()>(()) { Ok::<(), ()>(()) }).unwrap()
118 }
119
120 #[verifier::external_body]
123 pub fn wake_one(&self) -> (r: bool)
124 {
125 if self.is_empty() {
127 return false;
128 }
129 loop {
130 let mut wakers = self.wakers.lock();
131 let Some(waker) = wakers.pop_front() else {
132 return false;
133 };
134 atomic_with_ghost! {
135 self.num_wakers => fetch_sub(1);
136 update prev -> next;
137 ghost g => {
138 g = WaitQueueGhost { queued_wakers: g.queued_wakers.drop_first() };
139 }
140 };
141 drop(wakers);
143
144 if waker.wake_up() {
145 return true;
146 }
147 }
148 }
149
150 #[verifier::external_body]
152 pub fn wake_all(&self) -> (r: usize)
153 {
154 if self.is_empty() {
156 return 0;
157 }
158 let mut num_woken = 0;
159
160 loop {
161 let mut wakers = self.wakers.lock();
162 let Some(waker) = wakers.pop_front() else {
163 break ;
164 };
165 atomic_with_ghost! {
166 self.num_wakers => fetch_sub(1);
167 update prev -> next;
168 ghost g => {
169 g = WaitQueueGhost { queued_wakers: g.queued_wakers.drop_first() };
170 }
171 };
172 drop(wakers);
174
175 if waker.wake_up() {
176 num_woken += 1;
177 }
178 }
179
180 num_woken
181 }
182
183 #[verifier::external_body]
184 fn is_empty(&self) -> bool {
185 self.num_wakers.load() == 0
186 }
187
188 #[doc(hidden)]
190 #[verifier::external_body]
191 pub fn enqueue(&self, waker: Arc<Waker>)
192 {
193 let mut wakers = self.wakers.lock();
194 wakers.push_back(waker);
195 atomic_with_ghost! {
196 self.num_wakers => fetch_add(1);
197 update prev -> next;
198 ghost g => {
199 g = WaitQueueGhost { queued_wakers: g.queued_wakers.push(waker.id()) };
200 }
201 };
202 }
203}
204
205impl Default for WaitQueue {
206 #[verifier::external_body]
207 fn default() -> Self {
208 Self::new()
209 }
210}
211
212pub struct Waiter {
218 waker: Arc<Waker>,
219}
220
221impl !Send for Waiter {
222
223}
224
225impl !Sync for Waiter {
226
227}
228
229impl Waiter {
230 pub closed spec fn rel_waker(self, waker: Arc<Waker>) -> bool {
232 self.waker == waker
233 }
234
235 pub closed spec fn waker_id(self) -> int {
237 self.waker.id()
238 }
239}
240
241pub struct Waker {
246 has_woken: AtomicBool,
247 task: Arc<Task>,
248 v_id: Ghost<int>,
249}
250
251impl Waker {
252 pub closed spec fn id(self) -> int {
254 self.v_id@
255 }
256}
257
258#[verus_verify]
259impl Waiter {
260 #[verus_spec(ret =>
262 ensures
263 ret.0.rel_waker(ret.1),
264 )]
265 pub fn new_pair() -> (Self, Arc<Waker>) {
266 proof_decl! {
267 let ghost waker_id: int = arbitrary();
268 }
269 let waker = Arc::new(
270 Waker {
271 has_woken: AtomicBool::new(false),
272 task: Arc::new(Task { }),
274 v_id: Ghost(waker_id),
275 },
276 );
277 let waiter = Self { waker: waker.clone() };
278 (waiter, waker)
279 }
280
281 #[track_caller]
288 pub fn wait(&self) {
289 self.waker.do_wait();
290 }
291
292 #[verus_spec(ret =>
298 requires
299 cond.requires(()),
300 cancel_cond.requires(()),
301 ensures
302 match ret {
303 Ok(res) => cond.ensures((),Some(res)),
304 Err(e) => cancel_cond.ensures((), Err(e)),
305 },
306 )]
307 #[track_caller]
308 #[verifier::exec_allows_no_decreases_clause]
309 pub fn wait_until_or_cancelled<F, R, FCancel, E>(
310 &self,
311 mut cond: F,
312 cancel_cond: FCancel,
313 ) -> core::result::Result<R, E> where
314 F: FnMut() -> Option<R>,
315 FCancel: Fn() -> core::result::Result<(), E>,
316 {
317 let mut cond = cond;
318 #[verus_spec(invariant
319 cond.requires(()),
320 cancel_cond.requires(()),
321 )]
322 loop {
323 if let Some(res) = cond() {
324 assert(cond.ensures((), Some(res)));
325 proof! { admit(); } return Ok(res);
327 };
328 if let Err(e) = cancel_cond() {
329 self.waker.close();
331 proof! { admit(); } return cond().ok_or(e);
333 }
334 self.wait();
335 }
336 }
337
338 #[verus_spec(ret =>
340 ensures
341 self.rel_waker(ret),
342 )]
343 pub fn waker(&self) -> Arc<Waker> {
344 self.waker.clone()
345 }
346
347 pub fn task(&self) -> &Arc<Task> {
349 &self.waker.task
350 }
351}
352
353impl Drop for Waiter {
354 #[verifier::external_body]
355 fn drop(&mut self)
356 opens_invariants none
357 no_unwind
358 {
359 self.waker.close();
362 }
363}
364
365impl Waker {
366 #[verifier::external_body]
376 pub fn wake_up(&self) -> bool {
377 if self.has_woken.swap(true, Ordering::Release) {
378 return false;
379 }
380 scheduler::unpark_target(self.task.clone());
381
382 true
383 }
384
385 #[track_caller]
386 #[verifier::external_body]
387 fn do_wait(&self) {
388 while !self.has_woken.swap(false, Ordering::Acquire) {
389 scheduler::park_current(|| self.has_woken.load(Ordering::Acquire));
390 }
391 }
392
393 #[verifier::external_body]
394 fn close(&self) {
395 let _ = self.has_woken.swap(true, Ordering::Acquire);
398 }
399}
400
401} #[cfg(ktest)]
403mod test {
404 use super::*;
405 use crate::{prelude::*, task::TaskOptions};
406
407 fn queue_wake<F>(wake: F)
408 where
409 F: Fn(&WaitQueue) + Sync + Send + 'static,
410 {
411 let queue = Arc::new(WaitQueue::new());
412 let queue_cloned = queue.clone();
413
414 let cond = Arc::new(AtomicBool::new(false));
415 let cond_cloned = cond.clone();
416
417 TaskOptions::new(move || {
418 Task::yield_now();
419
420 cond_cloned.store(true, Ordering::Relaxed);
421 wake(&queue_cloned);
422 })
423 .data(())
424 .spawn()
425 .unwrap();
426
427 queue.wait_until(|| cond.load(Ordering::Relaxed).then_some(()));
428
429 assert!(cond.load(Ordering::Relaxed));
430 }
431
432 #[ktest]
433 fn queue_wake_one() {
434 queue_wake(|queue| {
435 queue.wake_one();
436 });
437 }
438
439 #[ktest]
440 fn queue_wake_all() {
441 queue_wake(|queue| {
442 queue.wake_all();
443 });
444 }
445
446 #[ktest]
447 fn waiter_wake_twice() {
448 let (_waiter, waker) = Waiter::new_pair();
449
450 assert!(waker.wake_up());
451 assert!(!waker.wake_up());
452 }
453
454 #[ktest]
455 fn waiter_wake_drop() {
456 let (waiter, waker) = Waiter::new_pair();
457
458 drop(waiter);
459 assert!(!waker.wake_up());
460 }
461
462 #[ktest]
463 fn waiter_wake_async() {
464 let (waiter, waker) = Waiter::new_pair();
465
466 let cond = Arc::new(AtomicBool::new(false));
467 let cond_cloned = cond.clone();
468
469 TaskOptions::new(move || {
470 Task::yield_now();
471
472 cond_cloned.store(true, Ordering::Relaxed);
473 assert!(waker.wake_up());
474 })
475 .data(())
476 .spawn()
477 .unwrap();
478
479 waiter.wait();
480
481 assert!(cond.load(Ordering::Relaxed));
482 }
483
484 #[ktest]
485 fn waiter_wake_reorder() {
486 let (waiter, waker) = Waiter::new_pair();
487
488 let cond = Arc::new(AtomicBool::new(false));
489 let cond_cloned = cond.clone();
490
491 let (waiter2, waker2) = Waiter::new_pair();
492
493 let cond2 = Arc::new(AtomicBool::new(false));
494 let cond2_cloned = cond2.clone();
495
496 TaskOptions::new(move || {
497 Task::yield_now();
498
499 cond2_cloned.store(true, Ordering::Relaxed);
500 assert!(waker2.wake_up());
501
502 Task::yield_now();
503
504 cond_cloned.store(true, Ordering::Relaxed);
505 assert!(waker.wake_up());
506 })
507 .data(())
508 .spawn()
509 .unwrap();
510
511 waiter.wait();
512 assert!(cond.load(Ordering::Relaxed));
513
514 waiter2.wait();
515 assert!(cond2.load(Ordering::Relaxed));
516 }
517}