ostd/sync/rcu/
mod.rs

1// SPDX-License-Identifier: MPL-2.0
2
3//! Read-copy update (RCU).
4
5use core::{
6    marker::PhantomData,
7    mem::ManuallyDrop,
8    ops::Deref,
9    ptr::NonNull,
10    sync::atomic::{
11        AtomicPtr,
12        Ordering::{AcqRel, Acquire},
13    },
14};
15
16use non_null::NonNullPtr;
17use spin::once::Once;
18
19use self::monitor::RcuMonitor;
20use crate::task::{
21    DisabledPreemptGuard,
22    atomic_mode::{AsAtomicModeGuard, InAtomicMode},
23    disable_preempt,
24};
25
26mod monitor;
27pub mod non_null;
28
29/// A Read-Copy Update (RCU) cell for sharing a pointer between threads.
30///
31/// The pointer should be a non-null pointer with type `P`, which implements
32/// [`NonNullPtr`]. For example, `P` can be `Box<T>` or `Arc<T>`.
33///
34/// # Overview
35///
36/// Read-Copy-Update (RCU) is a synchronization mechanism designed for high-
37/// performance, low-latency read operations in concurrent systems. It allows
38/// multiple readers to access shared data simultaneously without contention,
39/// while writers can update the data safely in a way that does not disrupt
40/// ongoing reads. RCU is particularly suited for situations where reads are
41/// far more frequent than writes.  
42///  
43/// The original design and implementation of RCU is described in paper _The
44/// Read-Copy-Update Mechanism for Supporting Real-Time Applications on Shared-
45/// Memory Multiprocessor Systems with Linux_ published on IBM Systems Journal
46/// 47.2 (2008).
47///
48/// # Examples
49///
50/// ```
51/// use ostd::sync::Rcu;
52///
53/// let rcu = Rcu::new(Box::new(42));
54///
55/// let rcu_guard = rcu.read();
56///
57/// assert_eq!(*rcu_guard, Some(&42));
58///
59/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
60///
61/// let rcu_guard = rcu.read();
62///
63/// assert_eq!(*rcu_guard, Some(&43));
64/// ```
65pub struct Rcu<P: NonNullPtr>(RcuInner<P>);
66
67/// A guard that allows access to the pointed data protected by a [`Rcu`].
68#[clippy::has_significant_drop]
69#[must_use]
70pub struct RcuReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
71
72/// A Read-Copy Update (RCU) cell for sharing a _nullable_ pointer.  
73///
74/// This is a variant of [`Rcu`] that allows the contained pointer to be null.
75/// So that it can implement `Rcu<Option<P>>` where `P` is not a nullable
76/// pointer. It is the same as [`Rcu`] in other aspects.
77///
78/// # Examples
79///
80/// ```
81/// use ostd::sync::RcuOption;
82///
83/// static RCU: RcuOption<Box<usize>> = RcuOption::new_none();
84///
85/// assert!(RCU.read().is_none());
86///
87/// RCU.update(Box::new(42));
88///
89/// // Read the data protected by RCU
90/// {
91///     let rcu_guard = RCU.read().try_get().unwrap();
92///     assert_eq!(*rcu_guard, 42);
93/// }
94///
95/// // Update the data protected by RCU
96/// {
97///     let rcu_guard = RCU.read().try_get().unwrap();
98///
99///     rcu_guard.compare_exchange(Box::new(43)).unwrap();
100///
101///     let rcu_guard = RCU.read().try_get().unwrap();
102///     assert_eq!(*rcu_guard, 43);
103/// }
104/// ```
105pub struct RcuOption<P: NonNullPtr>(RcuInner<P>);
106
107/// A guard that allows access to the pointed data protected by a [`RcuOption`].
108#[clippy::has_significant_drop]
109#[must_use]
110pub struct RcuOptionReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
111
112/// The inner implementation of both [`Rcu`] and [`RcuOption`].
113struct RcuInner<P: NonNullPtr> {
114    ptr: AtomicPtr<<P as NonNullPtr>::Target>,
115    // We want to implement Send and Sync explicitly.
116    // Having a pointer field prevents them from being implemented
117    // automatically by the compiler.
118    _marker: PhantomData<*const P::Target>,
119}
120
121// SAFETY: It is apparent that if `P` is `Send`, then `Rcu<P>` is `Send`.
122unsafe impl<P: NonNullPtr> Send for RcuInner<P> where P: Send {}
123
124// SAFETY: To implement `Sync` for `Rcu<P>`, we need to meet two conditions:
125//  1. `P` must be `Sync` because `Rcu::get` allows concurrent access.
126//  2. `P` must be `Send` because `Rcu::update` may obtain an object
127//     of `P` created on another thread.
128unsafe impl<P: NonNullPtr> Sync for RcuInner<P> where P: Send + Sync {}
129
130impl<P: NonNullPtr + Send> RcuInner<P> {
131    const fn new_none() -> Self {
132        Self {
133            ptr: AtomicPtr::new(core::ptr::null_mut()),
134            _marker: PhantomData,
135        }
136    }
137
138    fn new(pointer: P) -> Self {
139        let ptr = <P as NonNullPtr>::into_raw(pointer).as_ptr();
140        let ptr = AtomicPtr::new(ptr);
141        Self {
142            ptr,
143            _marker: PhantomData,
144        }
145    }
146
147    fn update(&self, new_ptr: Option<P>) {
148        let new_ptr = if let Some(new_ptr) = new_ptr {
149            <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
150        } else {
151            core::ptr::null_mut()
152        };
153
154        let old_raw_ptr = self.ptr.swap(new_ptr, AcqRel);
155
156        if let Some(p) = NonNull::new(old_raw_ptr) {
157            // SAFETY:
158            // 1. The pointer was previously returned by `into_raw`.
159            // 2. The pointer is removed from the RCU slot so that no one will
160            //    use it after the end of the current grace period. The removal
161            //    is done atomically, so it will only be dropped once.
162            unsafe { delay_drop::<P>(p) };
163        }
164    }
165
166    fn read(&self) -> RcuReadGuardInner<'_, P> {
167        let guard = disable_preempt();
168        RcuReadGuardInner {
169            obj_ptr: self.ptr.load(Acquire),
170            rcu: self,
171            _inner_guard: guard,
172        }
173    }
174
175    fn read_with<'a>(&'a self, _guard: &'a dyn InAtomicMode) -> Option<P::Ref<'a>> {
176        let obj_ptr = self.ptr.load(Acquire);
177        if obj_ptr.is_null() {
178            return None;
179        }
180        // SAFETY:
181        // 1. This pointer is not NULL.
182        // 2. The `_guard` guarantees atomic mode for the duration of lifetime
183        //    `'a`, the pointer is valid because other writers won't release the
184        //    allocation until this task passes the quiescent state.
185        NonNull::new(obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
186    }
187}
188
189impl<P: NonNullPtr> Drop for RcuInner<P> {
190    fn drop(&mut self) {
191        let ptr = self.ptr.load(Acquire);
192        if let Some(p) = NonNull::new(ptr) {
193            // SAFETY: It was previously returned by `into_raw` when creating
194            // the RCU primitive.
195            let pointer = unsafe { <P as NonNullPtr>::from_raw(p) };
196            // It is OK not to delay the drop because the RCU primitive is
197            // owned by nobody else.
198            drop(pointer);
199        }
200    }
201}
202
203/// The inner implementation of both [`RcuReadGuard`] and [`RcuOptionReadGuard`].
204struct RcuReadGuardInner<'a, P: NonNullPtr> {
205    obj_ptr: *mut <P as NonNullPtr>::Target,
206    rcu: &'a RcuInner<P>,
207    _inner_guard: DisabledPreemptGuard,
208}
209
210impl<P: NonNullPtr + Send> RcuReadGuardInner<'_, P> {
211    fn get(&self) -> Option<P::Ref<'_>> {
212        // SAFETY: The guard ensures that `P` will not be dropped. Thus, `P`
213        // outlives the lifetime of `&self`. Additionally, during this period,
214        // it is impossible to create a mutable reference to `P`.
215        NonNull::new(self.obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
216    }
217
218    fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
219        let new_ptr = if let Some(new_ptr) = new_ptr {
220            <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
221        } else {
222            core::ptr::null_mut()
223        };
224
225        if self
226            .rcu
227            .ptr
228            .compare_exchange(self.obj_ptr, new_ptr, AcqRel, Acquire)
229            .is_err()
230        {
231            let Some(new_ptr) = NonNull::new(new_ptr) else {
232                return Err(None);
233            };
234            // SAFETY:
235            // 1. It was previously returned by `into_raw`.
236            // 2. The `compare_exchange` fails so the pointer will not
237            //    be used by other threads via reading the RCU primitive.
238            return Err(Some(unsafe { <P as NonNullPtr>::from_raw(new_ptr) }));
239        }
240
241        if let Some(p) = NonNull::new(self.obj_ptr) {
242            // SAFETY:
243            // 1. The pointer was previously returned by `into_raw`.
244            // 2. The pointer is removed from the RCU slot so that no one will
245            //    use it after the end of the current grace period. The removal
246            //    is done atomically, so it will only be dropped once.
247            unsafe { delay_drop::<P>(p) };
248        }
249
250        Ok(())
251    }
252}
253
254impl<P: NonNullPtr + Send> Rcu<P> {
255    /// Creates a new RCU primitive with the given pointer.
256    pub fn new(pointer: P) -> Self {
257        Self(RcuInner::new(pointer))
258    }
259
260    /// Replaces the current pointer with a null pointer.
261    ///
262    /// This function updates the pointer to the new pointer regardless of the
263    /// original pointer. The original pointer will be dropped after the grace
264    /// period.
265    ///
266    /// Oftentimes this function is not recommended unless you have serialized
267    /// writes with locks. Otherwise, you can use [`Self::read`] and then
268    /// [`RcuReadGuard::compare_exchange`] to update the pointer.
269    pub fn update(&self, new_ptr: P) {
270        self.0.update(Some(new_ptr));
271    }
272
273    /// Retrieves a read guard for the RCU primitive.
274    ///
275    /// The guard allows read access to the data protected by RCU, as well
276    /// as the ability to do compare-and-exchange.
277    pub fn read(&self) -> RcuReadGuard<'_, P> {
278        RcuReadGuard(self.0.read())
279    }
280
281    /// Reads the RCU-protected value in an atomic mode.
282    ///
283    /// The RCU mechanism protects reads ([`Self::read`]) by entering an
284    /// atomic mode. If we are already in an atomic mode, this function can
285    /// reduce the overhead of disabling preemption again.
286    ///
287    /// Unlike [`Self::read`], this function does not return a read guard, so
288    /// you cannot use [`RcuReadGuard::compare_exchange`] to synchronize the
289    /// writers. You may do it via a [`super::SpinLock`].
290    pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(&'a self, guard: &'a G) -> P::Ref<'a> {
291        self.0.read_with(guard.as_atomic_mode_guard()).unwrap()
292    }
293}
294
295impl<P: NonNullPtr + Send> RcuOption<P> {
296    /// Creates a new RCU primitive with the given pointer.
297    pub fn new(pointer: Option<P>) -> Self {
298        if let Some(pointer) = pointer {
299            Self(RcuInner::new(pointer))
300        } else {
301            Self(RcuInner::new_none())
302        }
303    }
304
305    /// Creates a new RCU primitive that contains nothing.
306    ///
307    /// This is a constant equivalence to [`RcuOption::new`] with parameter `None`.
308    pub const fn new_none() -> Self {
309        Self(RcuInner::new_none())
310    }
311
312    /// Replaces the current pointer with a null pointer.
313    ///
314    /// This function updates the pointer to the new pointer regardless of the
315    /// original pointer. If the original pointer is not NULL, it will be
316    /// dropped after the grace period.
317    ///
318    /// Oftentimes this function is not recommended unless you have
319    /// synchronized writes with locks. Otherwise, you can use [`Self::read`]
320    /// and then [`RcuOptionReadGuard::compare_exchange`] to update the pointer.
321    pub fn update(&self, new_ptr: Option<P>) {
322        self.0.update(new_ptr);
323    }
324
325    /// Retrieves a read guard for the RCU primitive.
326    ///
327    /// The guard allows read access to the data protected by RCU, as well
328    /// as the ability to do compare-and-exchange.
329    ///
330    /// The contained pointer can be NULL and you can only get a reference
331    /// (if checked non-NULL) via [`RcuOptionReadGuard::get`].
332    pub fn read(&self) -> RcuOptionReadGuard<'_, P> {
333        RcuOptionReadGuard(self.0.read())
334    }
335
336    /// Reads the RCU-protected value in an atomic mode.
337    ///
338    /// The RCU mechanism protects reads ([`Self::read`]) by entering an
339    /// atomic mode. If we are already in an atomic mode, this function can
340    /// reduce the overhead of disabling preemption again.
341    ///
342    /// Unlike [`Self::read`], this function does not return a read guard, so
343    /// you cannot use [`RcuOptionReadGuard::compare_exchange`] to synchronize the
344    /// writers. You may do it via a [`super::SpinLock`].
345    pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(
346        &'a self,
347        guard: &'a G,
348    ) -> Option<P::Ref<'a>> {
349        self.0.read_with(guard.as_atomic_mode_guard())
350    }
351}
352
353impl<P: NonNullPtr + Send> RcuReadGuard<'_, P> {
354    /// Gets the reference of the protected data.
355    pub fn get(&self) -> P::Ref<'_> {
356        self.0.get().unwrap()
357    }
358
359    /// Tries to replace the already read pointer with a new pointer.
360    ///
361    /// If another thread has updated the pointer after the read, this
362    /// function will fail, and returns the given pointer back. Otherwise,
363    /// it will replace the pointer with the new one and drop the old pointer
364    /// after the grace period.
365    ///
366    /// If spinning on [`Rcu::read`] and this function, it is recommended
367    /// to relax the CPU or yield the task on failure. Otherwise contention
368    /// will occur.
369    ///
370    /// This API does not help to avoid
371    /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
372    pub fn compare_exchange(self, new_ptr: P) -> Result<(), P> {
373        self.0
374            .compare_exchange(Some(new_ptr))
375            .map_err(|err| err.unwrap())
376    }
377}
378
379impl<P: NonNullPtr + Send> RcuOptionReadGuard<'_, P> {
380    /// Gets the reference of the protected data.
381    ///
382    /// If the RCU primitive protects nothing, this function returns `None`.
383    pub fn get(&self) -> Option<P::Ref<'_>> {
384        self.0.get()
385    }
386
387    /// Returns if the RCU primitive protects nothing when [`Rcu::read`] happens.
388    pub fn is_none(&self) -> bool {
389        self.0.obj_ptr.is_null()
390    }
391
392    /// Tries to replace the already read pointer with a new pointer
393    /// (or none).
394    ///
395    /// If another thread has updated the pointer after the read, this
396    /// function will fail, and returns the given pointer back. Otherwise,
397    /// it will replace the pointer with the new one and drop the old pointer
398    /// after the grace period.
399    ///
400    /// If spinning on [`RcuOption::read`] and this function, it is recommended
401    /// to relax the CPU or yield the task on failure. Otherwise contention
402    /// will occur.
403    ///
404    /// This API does not help to avoid
405    /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
406    pub fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
407        self.0.compare_exchange(new_ptr)
408    }
409}
410
411/// Delays the dropping of a [`NonNullPtr`] after the RCU grace period.
412///
413/// This is internally needed for implementing [`Rcu`] and [`RcuOption`]
414/// because we cannot alias a [`Box`]. Restoring `P` and use [`RcuDrop`] for it
415/// can lead to multiple [`Box`]es simultaneously pointing to the same
416/// content.
417///
418/// # Safety
419///
420/// The pointer must be previously returned by `into_raw`, will not be used
421/// after the end of the current grace period, and will only be dropped once.
422///
423/// [`Box`]: alloc::boxed::Box
424unsafe fn delay_drop<P: NonNullPtr + Send>(pointer: NonNull<<P as NonNullPtr>::Target>) {
425    struct ForceSend<P: NonNullPtr + Send>(NonNull<<P as NonNullPtr>::Target>);
426    // SAFETY: Sending a raw pointer to another task is safe as long as
427    // the pointer access in another task is safe (guaranteed by the trait
428    // bound `P: Send`).
429    unsafe impl<P: NonNullPtr + Send> Send for ForceSend<P> {}
430
431    let pointer: ForceSend<P> = ForceSend(pointer);
432
433    let rcu_monitor = RCU_MONITOR.get().unwrap();
434    rcu_monitor.after_grace_period(move || {
435        // This is necessary to make the Rust compiler to move the entire
436        // `ForceSend` structure into the closure.
437        let pointer = pointer;
438
439        // SAFETY:
440        // 1. The pointer was previously returned by `into_raw`.
441        // 2. The pointer won't be used anymore since the grace period has
442        //    finished and this is the only time the pointer gets dropped.
443        let p = unsafe { <P as NonNullPtr>::from_raw(pointer.0) };
444        drop(p);
445    });
446}
447
448/// A wrapper to delay calling destructor of `T` after the RCU grace period.
449///
450/// Upon dropping this structure, a callback will be registered to the global
451/// RCU monitor and the destructor of `T` will be delayed until the callback.
452///
453/// [`RcuDrop<T>`] is guaranteed to have the same layout as `T`. You can also
454/// access the inner value safely via [`RcuDrop<T>`].
455#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
456#[repr(transparent)]
457pub struct RcuDrop<T: Send + 'static> {
458    value: ManuallyDrop<T>,
459}
460
461impl<T: Send + 'static> RcuDrop<T> {
462    /// Creates a new [`RcuDrop`] instance that delays the dropping of `value`.
463    pub fn new(value: T) -> Self {
464        Self {
465            value: ManuallyDrop::new(value),
466        }
467    }
468}
469
470impl<T: Send + 'static> Deref for RcuDrop<T> {
471    type Target = T;
472
473    fn deref(&self) -> &Self::Target {
474        &self.value
475    }
476}
477
478impl<T: Send + 'static> Drop for RcuDrop<T> {
479    fn drop(&mut self) {
480        // SAFETY: The `ManuallyDrop` will not be used after this point.
481        let taken = unsafe { ManuallyDrop::take(&mut self.value) };
482        let rcu_monitor = RCU_MONITOR.get().unwrap();
483        rcu_monitor.after_grace_period(|| {
484            drop(taken);
485        });
486    }
487}
488
489/// Finishes the current grace period.
490///
491/// This function is called when the current grace period on current CPU is
492/// finished. If this CPU is the last CPU to finish the current grace period,
493/// it takes all the current callbacks and invokes them.
494///
495/// # Safety
496///
497/// The caller must ensure that this CPU is not executing in a RCU read-side
498/// critical section.
499pub unsafe fn finish_grace_period() {
500    let rcu_monitor = RCU_MONITOR.get().unwrap();
501    // SAFETY: The caller ensures safety.
502    unsafe {
503        rcu_monitor.finish_grace_period();
504    }
505}
506
507static RCU_MONITOR: Once<RcuMonitor> = Once::new();
508
509pub fn init() {
510    RCU_MONITOR.call_once(RcuMonitor::new);
511}