ostd/sync/rcu/mod.rs
1// SPDX-License-Identifier: MPL-2.0
2
3//! Read-copy update (RCU).
4
5use core::{
6 marker::PhantomData,
7 mem::ManuallyDrop,
8 ops::Deref,
9 ptr::NonNull,
10 sync::atomic::{
11 AtomicPtr,
12 Ordering::{AcqRel, Acquire},
13 },
14};
15
16use non_null::NonNullPtr;
17use spin::once::Once;
18
19use self::monitor::RcuMonitor;
20use crate::{
21 panic::PanicGuard,
22 task::{
23 DisabledPreemptGuard,
24 atomic_mode::{AsAtomicModeGuard, InAtomicMode},
25 disable_preempt,
26 },
27};
28
29mod monitor;
30pub mod non_null;
31
32/// A Read-Copy Update (RCU) cell for sharing a pointer between threads.
33///
34/// The pointer should be a non-null pointer with type `P`, which implements
35/// [`NonNullPtr`]. For example, `P` can be `Box<T>` or `Arc<T>`.
36///
37/// # Overview
38///
39/// Read-Copy-Update (RCU) is a synchronization mechanism designed for high-
40/// performance, low-latency read operations in concurrent systems. It allows
41/// multiple readers to access shared data simultaneously without contention,
42/// while writers can update the data safely in a way that does not disrupt
43/// ongoing reads. RCU is particularly suited for situations where reads are
44/// far more frequent than writes.
45///
46/// The original design and implementation of RCU is described in paper _The
47/// Read-Copy-Update Mechanism for Supporting Real-Time Applications on Shared-
48/// Memory Multiprocessor Systems with Linux_ published on IBM Systems Journal
49/// 47.2 (2008).
50///
51/// # Examples
52///
53/// ```
54/// use ostd::sync::Rcu;
55///
56/// let rcu = Rcu::new(Box::new(42));
57///
58/// let rcu_guard = rcu.read();
59///
60/// assert_eq!(*rcu_guard, Some(&42));
61///
62/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
63///
64/// let rcu_guard = rcu.read();
65///
66/// assert_eq!(*rcu_guard, Some(&43));
67/// ```
68pub struct Rcu<P: NonNullPtr>(RcuInner<P>);
69
70/// A guard that allows access to the pointed data protected by a [`Rcu`].
71#[clippy::has_significant_drop]
72#[must_use]
73pub struct RcuReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
74
75/// A Read-Copy Update (RCU) cell for sharing a _nullable_ pointer.
76///
77/// This is a variant of [`Rcu`] that allows the contained pointer to be null.
78/// So that it can implement `Rcu<Option<P>>` where `P` is not a nullable
79/// pointer. It is the same as [`Rcu`] in other aspects.
80///
81/// # Examples
82///
83/// ```
84/// use ostd::sync::RcuOption;
85///
86/// static RCU: RcuOption<Box<usize>> = RcuOption::new_none();
87///
88/// assert!(RCU.read().is_none());
89///
90/// RCU.update(Box::new(42));
91///
92/// // Read the data protected by RCU
93/// {
94/// let rcu_guard = RCU.read().try_get().unwrap();
95/// assert_eq!(*rcu_guard, 42);
96/// }
97///
98/// // Update the data protected by RCU
99/// {
100/// let rcu_guard = RCU.read().try_get().unwrap();
101///
102/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
103///
104/// let rcu_guard = RCU.read().try_get().unwrap();
105/// assert_eq!(*rcu_guard, 43);
106/// }
107/// ```
108pub struct RcuOption<P: NonNullPtr>(RcuInner<P>);
109
110/// A guard that allows access to the pointed data protected by a [`RcuOption`].
111#[clippy::has_significant_drop]
112#[must_use]
113pub struct RcuOptionReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
114
115/// The inner implementation of both [`Rcu`] and [`RcuOption`].
116struct RcuInner<P: NonNullPtr> {
117 ptr: AtomicPtr<<P as NonNullPtr>::Target>,
118 // We want to implement Send and Sync explicitly.
119 // Having a pointer field prevents them from being implemented
120 // automatically by the compiler.
121 _marker: PhantomData<*const P::Target>,
122}
123
124// SAFETY: It is apparent that if `P` is `Send`, then `Rcu<P>` is `Send`.
125unsafe impl<P: NonNullPtr> Send for RcuInner<P> where P: Send {}
126
127// SAFETY: To implement `Sync` for `Rcu<P>`, we need to meet two conditions:
128// 1. `P` must be `Sync` because `Rcu::get` allows concurrent access.
129// 2. `P` must be `Send` because `Rcu::update` may obtain an object
130// of `P` created on another thread.
131unsafe impl<P: NonNullPtr> Sync for RcuInner<P> where P: Send + Sync {}
132
133impl<P: NonNullPtr + Send> RcuInner<P> {
134 const fn new_none() -> Self {
135 Self {
136 ptr: AtomicPtr::new(core::ptr::null_mut()),
137 _marker: PhantomData,
138 }
139 }
140
141 fn new(pointer: P) -> Self {
142 let ptr = <P as NonNullPtr>::into_raw(pointer).as_ptr();
143 let ptr = AtomicPtr::new(ptr);
144 Self {
145 ptr,
146 _marker: PhantomData,
147 }
148 }
149
150 fn update(&self, new_ptr: Option<P>) {
151 let new_ptr = if let Some(new_ptr) = new_ptr {
152 <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
153 } else {
154 core::ptr::null_mut()
155 };
156
157 let old_raw_ptr = self.ptr.swap(new_ptr, AcqRel);
158
159 if let Some(p) = NonNull::new(old_raw_ptr) {
160 // SAFETY:
161 // 1. The pointer was previously returned by `into_raw`.
162 // 2. The pointer is removed from the RCU slot so that no one will
163 // use it after the end of the current grace period. The removal
164 // is done atomically, so it will only be dropped once.
165 unsafe { delay_drop::<P>(p) };
166 }
167 }
168
169 fn read(&self) -> RcuReadGuardInner<'_, P> {
170 let guard = disable_preempt();
171 RcuReadGuardInner {
172 obj_ptr: self.ptr.load(Acquire),
173 rcu: self,
174 _inner_guard: guard,
175 }
176 }
177
178 fn read_with<'a>(&'a self, _guard: &'a dyn InAtomicMode) -> Option<P::Ref<'a>> {
179 let obj_ptr = self.ptr.load(Acquire);
180 if obj_ptr.is_null() {
181 return None;
182 }
183 // SAFETY:
184 // 1. This pointer is not NULL.
185 // 2. The `_guard` guarantees atomic mode for the duration of lifetime
186 // `'a`, the pointer is valid because other writers won't release the
187 // allocation until this task passes the quiescent state.
188 NonNull::new(obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
189 }
190}
191
192impl<P: NonNullPtr> Drop for RcuInner<P> {
193 fn drop(&mut self) {
194 let ptr = self.ptr.load(Acquire);
195 if let Some(p) = NonNull::new(ptr) {
196 // SAFETY: It was previously returned by `into_raw` when creating
197 // the RCU primitive.
198 let pointer = unsafe { <P as NonNullPtr>::from_raw(p) };
199 // It is OK not to delay the drop because the RCU primitive is
200 // owned by nobody else.
201 drop(pointer);
202 }
203 }
204}
205
206/// The inner implementation of both [`RcuReadGuard`] and [`RcuOptionReadGuard`].
207struct RcuReadGuardInner<'a, P: NonNullPtr> {
208 obj_ptr: *mut <P as NonNullPtr>::Target,
209 rcu: &'a RcuInner<P>,
210 _inner_guard: DisabledPreemptGuard,
211}
212
213impl<P: NonNullPtr + Send> RcuReadGuardInner<'_, P> {
214 fn get(&self) -> Option<P::Ref<'_>> {
215 // SAFETY: The guard ensures that `P` will not be dropped. Thus, `P`
216 // outlives the lifetime of `&self`. Additionally, during this period,
217 // it is impossible to create a mutable reference to `P`.
218 NonNull::new(self.obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
219 }
220
221 fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
222 let new_ptr = if let Some(new_ptr) = new_ptr {
223 <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
224 } else {
225 core::ptr::null_mut()
226 };
227
228 if self
229 .rcu
230 .ptr
231 .compare_exchange(self.obj_ptr, new_ptr, AcqRel, Acquire)
232 .is_err()
233 {
234 let Some(new_ptr) = NonNull::new(new_ptr) else {
235 return Err(None);
236 };
237 // SAFETY:
238 // 1. It was previously returned by `into_raw`.
239 // 2. The `compare_exchange` fails so the pointer will not
240 // be used by other threads via reading the RCU primitive.
241 return Err(Some(unsafe { <P as NonNullPtr>::from_raw(new_ptr) }));
242 }
243
244 if let Some(p) = NonNull::new(self.obj_ptr) {
245 // SAFETY:
246 // 1. The pointer was previously returned by `into_raw`.
247 // 2. The pointer is removed from the RCU slot so that no one will
248 // use it after the end of the current grace period. The removal
249 // is done atomically, so it will only be dropped once.
250 unsafe { delay_drop::<P>(p) };
251 }
252
253 Ok(())
254 }
255}
256
257impl<P: NonNullPtr + Send> Rcu<P> {
258 /// Creates a new RCU primitive with the given pointer.
259 pub fn new(pointer: P) -> Self {
260 Self(RcuInner::new(pointer))
261 }
262
263 /// Replaces the current pointer with a null pointer.
264 ///
265 /// This function updates the pointer to the new pointer regardless of the
266 /// original pointer. The original pointer will be dropped after the grace
267 /// period.
268 ///
269 /// Oftentimes this function is not recommended unless you have serialized
270 /// writes with locks. Otherwise, you can use [`Self::read`] and then
271 /// [`RcuReadGuard::compare_exchange`] to update the pointer.
272 pub fn update(&self, new_ptr: P) {
273 self.0.update(Some(new_ptr));
274 }
275
276 /// Retrieves a read guard for the RCU primitive.
277 ///
278 /// The guard allows read access to the data protected by RCU, as well
279 /// as the ability to do compare-and-exchange.
280 pub fn read(&self) -> RcuReadGuard<'_, P> {
281 RcuReadGuard(self.0.read())
282 }
283
284 /// Reads the RCU-protected value in an atomic mode.
285 ///
286 /// The RCU mechanism protects reads ([`Self::read`]) by entering an
287 /// atomic mode. If we are already in an atomic mode, this function can
288 /// reduce the overhead of disabling preemption again.
289 ///
290 /// Unlike [`Self::read`], this function does not return a read guard, so
291 /// you cannot use [`RcuReadGuard::compare_exchange`] to synchronize the
292 /// writers. You may do it via a [`super::SpinLock`].
293 pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(&'a self, guard: &'a G) -> P::Ref<'a> {
294 self.0.read_with(guard.as_atomic_mode_guard()).unwrap()
295 }
296}
297
298impl<P: NonNullPtr + Send> RcuOption<P> {
299 /// Creates a new RCU primitive with the given pointer.
300 pub fn new(pointer: Option<P>) -> Self {
301 if let Some(pointer) = pointer {
302 Self(RcuInner::new(pointer))
303 } else {
304 Self(RcuInner::new_none())
305 }
306 }
307
308 /// Creates a new RCU primitive that contains nothing.
309 ///
310 /// This is a constant equivalence to [`RcuOption::new`] with parameter `None`.
311 pub const fn new_none() -> Self {
312 Self(RcuInner::new_none())
313 }
314
315 /// Replaces the current pointer with a null pointer.
316 ///
317 /// This function updates the pointer to the new pointer regardless of the
318 /// original pointer. If the original pointer is not NULL, it will be
319 /// dropped after the grace period.
320 ///
321 /// Oftentimes this function is not recommended unless you have
322 /// synchronized writes with locks. Otherwise, you can use [`Self::read`]
323 /// and then [`RcuOptionReadGuard::compare_exchange`] to update the pointer.
324 pub fn update(&self, new_ptr: Option<P>) {
325 self.0.update(new_ptr);
326 }
327
328 /// Retrieves a read guard for the RCU primitive.
329 ///
330 /// The guard allows read access to the data protected by RCU, as well
331 /// as the ability to do compare-and-exchange.
332 ///
333 /// The contained pointer can be NULL and you can only get a reference
334 /// (if checked non-NULL) via [`RcuOptionReadGuard::get`].
335 pub fn read(&self) -> RcuOptionReadGuard<'_, P> {
336 RcuOptionReadGuard(self.0.read())
337 }
338
339 /// Reads the RCU-protected value in an atomic mode.
340 ///
341 /// The RCU mechanism protects reads ([`Self::read`]) by entering an
342 /// atomic mode. If we are already in an atomic mode, this function can
343 /// reduce the overhead of disabling preemption again.
344 ///
345 /// Unlike [`Self::read`], this function does not return a read guard, so
346 /// you cannot use [`RcuOptionReadGuard::compare_exchange`] to synchronize the
347 /// writers. You may do it via a [`super::SpinLock`].
348 pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(
349 &'a self,
350 guard: &'a G,
351 ) -> Option<P::Ref<'a>> {
352 self.0.read_with(guard.as_atomic_mode_guard())
353 }
354}
355
356impl<P: NonNullPtr + Send> RcuReadGuard<'_, P> {
357 /// Gets the reference of the protected data.
358 pub fn get(&self) -> P::Ref<'_> {
359 self.0.get().unwrap()
360 }
361
362 /// Tries to replace the already read pointer with a new pointer.
363 ///
364 /// If another thread has updated the pointer after the read, this
365 /// function will fail, and returns the given pointer back. Otherwise,
366 /// it will replace the pointer with the new one and drop the old pointer
367 /// after the grace period.
368 ///
369 /// If spinning on [`Rcu::read`] and this function, it is recommended
370 /// to relax the CPU or yield the task on failure. Otherwise contention
371 /// will occur.
372 ///
373 /// This API does not help to avoid
374 /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
375 pub fn compare_exchange(self, new_ptr: P) -> Result<(), P> {
376 self.0
377 .compare_exchange(Some(new_ptr))
378 .map_err(|err| err.unwrap())
379 }
380}
381
382impl<P: NonNullPtr + Send> RcuOptionReadGuard<'_, P> {
383 /// Gets the reference of the protected data.
384 ///
385 /// If the RCU primitive protects nothing, this function returns `None`.
386 pub fn get(&self) -> Option<P::Ref<'_>> {
387 self.0.get()
388 }
389
390 /// Returns if the RCU primitive protects nothing when [`Rcu::read`] happens.
391 pub fn is_none(&self) -> bool {
392 self.0.obj_ptr.is_null()
393 }
394
395 /// Tries to replace the already read pointer with a new pointer
396 /// (or none).
397 ///
398 /// If another thread has updated the pointer after the read, this
399 /// function will fail, and returns the given pointer back. Otherwise,
400 /// it will replace the pointer with the new one and drop the old pointer
401 /// after the grace period.
402 ///
403 /// If spinning on [`RcuOption::read`] and this function, it is recommended
404 /// to relax the CPU or yield the task on failure. Otherwise contention
405 /// will occur.
406 ///
407 /// This API does not help to avoid
408 /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
409 pub fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
410 self.0.compare_exchange(new_ptr)
411 }
412}
413
414/// Delays the dropping of a [`NonNullPtr`] after the RCU grace period.
415///
416/// This is internally needed for implementing [`Rcu`] and [`RcuOption`]
417/// because we cannot alias a [`Box`]. Restoring `P` and use [`RcuDrop`] for it
418/// can lead to multiple [`Box`]es simultaneously pointing to the same
419/// content.
420///
421/// # Safety
422///
423/// The pointer must be previously returned by `into_raw`, will not be used
424/// after the end of the current grace period, and will only be dropped once.
425///
426/// [`Box`]: alloc::boxed::Box
427unsafe fn delay_drop<P: NonNullPtr + Send>(pointer: NonNull<<P as NonNullPtr>::Target>) {
428 struct ForceSend<P: NonNullPtr + Send>(NonNull<<P as NonNullPtr>::Target>);
429 // SAFETY: Sending a raw pointer to another task is safe as long as
430 // the pointer access in another task is safe (guaranteed by the trait
431 // bound `P: Send`).
432 unsafe impl<P: NonNullPtr + Send> Send for ForceSend<P> {}
433
434 let pointer: ForceSend<P> = ForceSend(pointer);
435
436 let rcu_monitor = RCU_MONITOR.get().unwrap();
437 rcu_monitor.after_grace_period(move || {
438 // This is necessary to make the Rust compiler to move the entire
439 // `ForceSend` structure into the closure.
440 let pointer = pointer;
441
442 // SAFETY:
443 // 1. The pointer was previously returned by `into_raw`.
444 // 2. The pointer won't be used anymore since the grace period has
445 // finished and this is the only time the pointer gets dropped.
446 let p = unsafe { <P as NonNullPtr>::from_raw(pointer.0) };
447 drop(p);
448 });
449}
450
451/// A wrapper to delay calling destructor of `T` after the RCU grace period.
452///
453/// Upon dropping this structure, a callback will be registered to the global
454/// RCU monitor and the destructor of `T` will be delayed until the callback.
455///
456/// [`RcuDrop<T>`] is guaranteed to have the same layout as `T`. You can also
457/// access the inner value safely via [`RcuDrop<T>`].
458#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
459#[repr(transparent)]
460pub struct RcuDrop<T: Send + 'static> {
461 value: ManuallyDrop<T>,
462}
463
464impl<T: Send + 'static> RcuDrop<T> {
465 /// Creates a new [`RcuDrop`] instance that delays the dropping of `value`.
466 pub fn new(value: T) -> Self {
467 Self {
468 value: ManuallyDrop::new(value),
469 }
470 }
471
472 /// Extracts the value from the `RcuDrop` container.
473 ///
474 /// # Safety
475 ///
476 /// The caller must ensure that the returned value will be dropped after
477 /// all the threads cannot access it anymore. Specifically, dropping it
478 /// after the RCU grace period is guaranteed to be safe.
479 ///
480 /// Note that panic unwinding may cause the returned value to be dropped
481 /// immediately, which is not sound. Therefore, the caller must forget the
482 /// [`PanicGuard`] after it ensures that the value will be dropped at the
483 /// correct time.
484 pub(crate) unsafe fn into_inner(slot: RcuDrop<T>) -> (T, PanicGuard) {
485 let mut slot = ManuallyDrop::new(slot);
486 let panic_guard = PanicGuard::new();
487 // SAFETY: The `slot` will not be used after this point.
488 let val = unsafe { ManuallyDrop::take(&mut slot.value) };
489 (val, panic_guard)
490 }
491}
492
493impl<T: Send + 'static> Deref for RcuDrop<T> {
494 type Target = T;
495
496 fn deref(&self) -> &Self::Target {
497 &self.value
498 }
499}
500
501impl<T: Send + 'static> Drop for RcuDrop<T> {
502 fn drop(&mut self) {
503 // SAFETY: The `ManuallyDrop` will not be used after this point.
504 let taken = unsafe { ManuallyDrop::take(&mut self.value) };
505 let rcu_monitor = RCU_MONITOR.get().unwrap();
506 rcu_monitor.after_grace_period(|| {
507 drop(taken);
508 });
509 }
510}
511
512/// Finishes the current grace period.
513///
514/// This function is called when the current grace period on current CPU is
515/// finished. If this CPU is the last CPU to finish the current grace period,
516/// it takes all the current callbacks and invokes them.
517///
518/// # Safety
519///
520/// The caller must ensure that this CPU is not executing in a RCU read-side
521/// critical section.
522pub unsafe fn finish_grace_period() {
523 let rcu_monitor = RCU_MONITOR.get().unwrap();
524 // SAFETY: The caller ensures safety.
525 unsafe {
526 rcu_monitor.finish_grace_period();
527 }
528}
529
530static RCU_MONITOR: Once<RcuMonitor> = Once::new();
531
532pub fn init() {
533 RCU_MONITOR.call_once(RcuMonitor::new);
534}