ostd/sync/rcu/
mod.rs

1// SPDX-License-Identifier: MPL-2.0
2
3//! Read-copy update (RCU).
4
5use core::{
6    marker::PhantomData,
7    mem::ManuallyDrop,
8    ops::Deref,
9    ptr::NonNull,
10    sync::atomic::{
11        AtomicPtr,
12        Ordering::{AcqRel, Acquire},
13    },
14};
15
16use non_null::NonNullPtr;
17use spin::once::Once;
18
19use self::monitor::RcuMonitor;
20use crate::{
21    panic::PanicGuard,
22    task::{
23        DisabledPreemptGuard,
24        atomic_mode::{AsAtomicModeGuard, InAtomicMode},
25        disable_preempt,
26    },
27};
28
29mod monitor;
30pub mod non_null;
31
32/// A Read-Copy Update (RCU) cell for sharing a pointer between threads.
33///
34/// The pointer should be a non-null pointer with type `P`, which implements
35/// [`NonNullPtr`]. For example, `P` can be `Box<T>` or `Arc<T>`.
36///
37/// # Overview
38///
39/// Read-Copy-Update (RCU) is a synchronization mechanism designed for high-
40/// performance, low-latency read operations in concurrent systems. It allows
41/// multiple readers to access shared data simultaneously without contention,
42/// while writers can update the data safely in a way that does not disrupt
43/// ongoing reads. RCU is particularly suited for situations where reads are
44/// far more frequent than writes.  
45///  
46/// The original design and implementation of RCU is described in paper _The
47/// Read-Copy-Update Mechanism for Supporting Real-Time Applications on Shared-
48/// Memory Multiprocessor Systems with Linux_ published on IBM Systems Journal
49/// 47.2 (2008).
50///
51/// # Examples
52///
53/// ```
54/// use ostd::sync::Rcu;
55///
56/// let rcu = Rcu::new(Box::new(42));
57///
58/// let rcu_guard = rcu.read();
59///
60/// assert_eq!(*rcu_guard, Some(&42));
61///
62/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
63///
64/// let rcu_guard = rcu.read();
65///
66/// assert_eq!(*rcu_guard, Some(&43));
67/// ```
68pub struct Rcu<P: NonNullPtr>(RcuInner<P>);
69
70/// A guard that allows access to the pointed data protected by a [`Rcu`].
71#[clippy::has_significant_drop]
72#[must_use]
73pub struct RcuReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
74
75/// A Read-Copy Update (RCU) cell for sharing a _nullable_ pointer.  
76///
77/// This is a variant of [`Rcu`] that allows the contained pointer to be null.
78/// So that it can implement `Rcu<Option<P>>` where `P` is not a nullable
79/// pointer. It is the same as [`Rcu`] in other aspects.
80///
81/// # Examples
82///
83/// ```
84/// use ostd::sync::RcuOption;
85///
86/// static RCU: RcuOption<Box<usize>> = RcuOption::new_none();
87///
88/// assert!(RCU.read().is_none());
89///
90/// RCU.update(Box::new(42));
91///
92/// // Read the data protected by RCU
93/// {
94///     let rcu_guard = RCU.read().try_get().unwrap();
95///     assert_eq!(*rcu_guard, 42);
96/// }
97///
98/// // Update the data protected by RCU
99/// {
100///     let rcu_guard = RCU.read().try_get().unwrap();
101///
102///     rcu_guard.compare_exchange(Box::new(43)).unwrap();
103///
104///     let rcu_guard = RCU.read().try_get().unwrap();
105///     assert_eq!(*rcu_guard, 43);
106/// }
107/// ```
108pub struct RcuOption<P: NonNullPtr>(RcuInner<P>);
109
110/// A guard that allows access to the pointed data protected by a [`RcuOption`].
111#[clippy::has_significant_drop]
112#[must_use]
113pub struct RcuOptionReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
114
115/// The inner implementation of both [`Rcu`] and [`RcuOption`].
116struct RcuInner<P: NonNullPtr> {
117    ptr: AtomicPtr<<P as NonNullPtr>::Target>,
118    // We want to implement Send and Sync explicitly.
119    // Having a pointer field prevents them from being implemented
120    // automatically by the compiler.
121    _marker: PhantomData<*const P::Target>,
122}
123
124// SAFETY: It is apparent that if `P` is `Send`, then `Rcu<P>` is `Send`.
125unsafe impl<P: NonNullPtr> Send for RcuInner<P> where P: Send {}
126
127// SAFETY: To implement `Sync` for `Rcu<P>`, we need to meet two conditions:
128//  1. `P` must be `Sync` because `Rcu::get` allows concurrent access.
129//  2. `P` must be `Send` because `Rcu::update` may obtain an object
130//     of `P` created on another thread.
131unsafe impl<P: NonNullPtr> Sync for RcuInner<P> where P: Send + Sync {}
132
133impl<P: NonNullPtr + Send> RcuInner<P> {
134    const fn new_none() -> Self {
135        Self {
136            ptr: AtomicPtr::new(core::ptr::null_mut()),
137            _marker: PhantomData,
138        }
139    }
140
141    fn new(pointer: P) -> Self {
142        let ptr = <P as NonNullPtr>::into_raw(pointer).as_ptr();
143        let ptr = AtomicPtr::new(ptr);
144        Self {
145            ptr,
146            _marker: PhantomData,
147        }
148    }
149
150    fn update(&self, new_ptr: Option<P>) {
151        let new_ptr = if let Some(new_ptr) = new_ptr {
152            <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
153        } else {
154            core::ptr::null_mut()
155        };
156
157        let old_raw_ptr = self.ptr.swap(new_ptr, AcqRel);
158
159        if let Some(p) = NonNull::new(old_raw_ptr) {
160            // SAFETY:
161            // 1. The pointer was previously returned by `into_raw`.
162            // 2. The pointer is removed from the RCU slot so that no one will
163            //    use it after the end of the current grace period. The removal
164            //    is done atomically, so it will only be dropped once.
165            unsafe { delay_drop::<P>(p) };
166        }
167    }
168
169    fn read(&self) -> RcuReadGuardInner<'_, P> {
170        let guard = disable_preempt();
171        RcuReadGuardInner {
172            obj_ptr: self.ptr.load(Acquire),
173            rcu: self,
174            _inner_guard: guard,
175        }
176    }
177
178    fn read_with<'a>(&'a self, _guard: &'a dyn InAtomicMode) -> Option<P::Ref<'a>> {
179        let obj_ptr = self.ptr.load(Acquire);
180        if obj_ptr.is_null() {
181            return None;
182        }
183        // SAFETY:
184        // 1. This pointer is not NULL.
185        // 2. The `_guard` guarantees atomic mode for the duration of lifetime
186        //    `'a`, the pointer is valid because other writers won't release the
187        //    allocation until this task passes the quiescent state.
188        NonNull::new(obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
189    }
190}
191
192impl<P: NonNullPtr> Drop for RcuInner<P> {
193    fn drop(&mut self) {
194        let ptr = self.ptr.load(Acquire);
195        if let Some(p) = NonNull::new(ptr) {
196            // SAFETY: It was previously returned by `into_raw` when creating
197            // the RCU primitive.
198            let pointer = unsafe { <P as NonNullPtr>::from_raw(p) };
199            // It is OK not to delay the drop because the RCU primitive is
200            // owned by nobody else.
201            drop(pointer);
202        }
203    }
204}
205
206/// The inner implementation of both [`RcuReadGuard`] and [`RcuOptionReadGuard`].
207struct RcuReadGuardInner<'a, P: NonNullPtr> {
208    obj_ptr: *mut <P as NonNullPtr>::Target,
209    rcu: &'a RcuInner<P>,
210    _inner_guard: DisabledPreemptGuard,
211}
212
213impl<P: NonNullPtr + Send> RcuReadGuardInner<'_, P> {
214    fn get(&self) -> Option<P::Ref<'_>> {
215        // SAFETY: The guard ensures that `P` will not be dropped. Thus, `P`
216        // outlives the lifetime of `&self`. Additionally, during this period,
217        // it is impossible to create a mutable reference to `P`.
218        NonNull::new(self.obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
219    }
220
221    fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
222        let new_ptr = if let Some(new_ptr) = new_ptr {
223            <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
224        } else {
225            core::ptr::null_mut()
226        };
227
228        if self
229            .rcu
230            .ptr
231            .compare_exchange(self.obj_ptr, new_ptr, AcqRel, Acquire)
232            .is_err()
233        {
234            let Some(new_ptr) = NonNull::new(new_ptr) else {
235                return Err(None);
236            };
237            // SAFETY:
238            // 1. It was previously returned by `into_raw`.
239            // 2. The `compare_exchange` fails so the pointer will not
240            //    be used by other threads via reading the RCU primitive.
241            return Err(Some(unsafe { <P as NonNullPtr>::from_raw(new_ptr) }));
242        }
243
244        if let Some(p) = NonNull::new(self.obj_ptr) {
245            // SAFETY:
246            // 1. The pointer was previously returned by `into_raw`.
247            // 2. The pointer is removed from the RCU slot so that no one will
248            //    use it after the end of the current grace period. The removal
249            //    is done atomically, so it will only be dropped once.
250            unsafe { delay_drop::<P>(p) };
251        }
252
253        Ok(())
254    }
255}
256
257impl<P: NonNullPtr + Send> Rcu<P> {
258    /// Creates a new RCU primitive with the given pointer.
259    pub fn new(pointer: P) -> Self {
260        Self(RcuInner::new(pointer))
261    }
262
263    /// Replaces the current pointer with a null pointer.
264    ///
265    /// This function updates the pointer to the new pointer regardless of the
266    /// original pointer. The original pointer will be dropped after the grace
267    /// period.
268    ///
269    /// Oftentimes this function is not recommended unless you have serialized
270    /// writes with locks. Otherwise, you can use [`Self::read`] and then
271    /// [`RcuReadGuard::compare_exchange`] to update the pointer.
272    pub fn update(&self, new_ptr: P) {
273        self.0.update(Some(new_ptr));
274    }
275
276    /// Retrieves a read guard for the RCU primitive.
277    ///
278    /// The guard allows read access to the data protected by RCU, as well
279    /// as the ability to do compare-and-exchange.
280    pub fn read(&self) -> RcuReadGuard<'_, P> {
281        RcuReadGuard(self.0.read())
282    }
283
284    /// Reads the RCU-protected value in an atomic mode.
285    ///
286    /// The RCU mechanism protects reads ([`Self::read`]) by entering an
287    /// atomic mode. If we are already in an atomic mode, this function can
288    /// reduce the overhead of disabling preemption again.
289    ///
290    /// Unlike [`Self::read`], this function does not return a read guard, so
291    /// you cannot use [`RcuReadGuard::compare_exchange`] to synchronize the
292    /// writers. You may do it via a [`super::SpinLock`].
293    pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(&'a self, guard: &'a G) -> P::Ref<'a> {
294        self.0.read_with(guard.as_atomic_mode_guard()).unwrap()
295    }
296}
297
298impl<P: NonNullPtr + Send> RcuOption<P> {
299    /// Creates a new RCU primitive with the given pointer.
300    pub fn new(pointer: Option<P>) -> Self {
301        if let Some(pointer) = pointer {
302            Self(RcuInner::new(pointer))
303        } else {
304            Self(RcuInner::new_none())
305        }
306    }
307
308    /// Creates a new RCU primitive that contains nothing.
309    ///
310    /// This is a constant equivalence to [`RcuOption::new`] with parameter `None`.
311    pub const fn new_none() -> Self {
312        Self(RcuInner::new_none())
313    }
314
315    /// Replaces the current pointer with a null pointer.
316    ///
317    /// This function updates the pointer to the new pointer regardless of the
318    /// original pointer. If the original pointer is not NULL, it will be
319    /// dropped after the grace period.
320    ///
321    /// Oftentimes this function is not recommended unless you have
322    /// synchronized writes with locks. Otherwise, you can use [`Self::read`]
323    /// and then [`RcuOptionReadGuard::compare_exchange`] to update the pointer.
324    pub fn update(&self, new_ptr: Option<P>) {
325        self.0.update(new_ptr);
326    }
327
328    /// Retrieves a read guard for the RCU primitive.
329    ///
330    /// The guard allows read access to the data protected by RCU, as well
331    /// as the ability to do compare-and-exchange.
332    ///
333    /// The contained pointer can be NULL and you can only get a reference
334    /// (if checked non-NULL) via [`RcuOptionReadGuard::get`].
335    pub fn read(&self) -> RcuOptionReadGuard<'_, P> {
336        RcuOptionReadGuard(self.0.read())
337    }
338
339    /// Reads the RCU-protected value in an atomic mode.
340    ///
341    /// The RCU mechanism protects reads ([`Self::read`]) by entering an
342    /// atomic mode. If we are already in an atomic mode, this function can
343    /// reduce the overhead of disabling preemption again.
344    ///
345    /// Unlike [`Self::read`], this function does not return a read guard, so
346    /// you cannot use [`RcuOptionReadGuard::compare_exchange`] to synchronize the
347    /// writers. You may do it via a [`super::SpinLock`].
348    pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(
349        &'a self,
350        guard: &'a G,
351    ) -> Option<P::Ref<'a>> {
352        self.0.read_with(guard.as_atomic_mode_guard())
353    }
354}
355
356impl<P: NonNullPtr + Send> RcuReadGuard<'_, P> {
357    /// Gets the reference of the protected data.
358    pub fn get(&self) -> P::Ref<'_> {
359        self.0.get().unwrap()
360    }
361
362    /// Tries to replace the already read pointer with a new pointer.
363    ///
364    /// If another thread has updated the pointer after the read, this
365    /// function will fail, and returns the given pointer back. Otherwise,
366    /// it will replace the pointer with the new one and drop the old pointer
367    /// after the grace period.
368    ///
369    /// If spinning on [`Rcu::read`] and this function, it is recommended
370    /// to relax the CPU or yield the task on failure. Otherwise contention
371    /// will occur.
372    ///
373    /// This API does not help to avoid
374    /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
375    pub fn compare_exchange(self, new_ptr: P) -> Result<(), P> {
376        self.0
377            .compare_exchange(Some(new_ptr))
378            .map_err(|err| err.unwrap())
379    }
380}
381
382impl<P: NonNullPtr + Send> RcuOptionReadGuard<'_, P> {
383    /// Gets the reference of the protected data.
384    ///
385    /// If the RCU primitive protects nothing, this function returns `None`.
386    pub fn get(&self) -> Option<P::Ref<'_>> {
387        self.0.get()
388    }
389
390    /// Returns if the RCU primitive protects nothing when [`Rcu::read`] happens.
391    pub fn is_none(&self) -> bool {
392        self.0.obj_ptr.is_null()
393    }
394
395    /// Tries to replace the already read pointer with a new pointer
396    /// (or none).
397    ///
398    /// If another thread has updated the pointer after the read, this
399    /// function will fail, and returns the given pointer back. Otherwise,
400    /// it will replace the pointer with the new one and drop the old pointer
401    /// after the grace period.
402    ///
403    /// If spinning on [`RcuOption::read`] and this function, it is recommended
404    /// to relax the CPU or yield the task on failure. Otherwise contention
405    /// will occur.
406    ///
407    /// This API does not help to avoid
408    /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
409    pub fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
410        self.0.compare_exchange(new_ptr)
411    }
412}
413
414/// Delays the dropping of a [`NonNullPtr`] after the RCU grace period.
415///
416/// This is internally needed for implementing [`Rcu`] and [`RcuOption`]
417/// because we cannot alias a [`Box`]. Restoring `P` and use [`RcuDrop`] for it
418/// can lead to multiple [`Box`]es simultaneously pointing to the same
419/// content.
420///
421/// # Safety
422///
423/// The pointer must be previously returned by `into_raw`, will not be used
424/// after the end of the current grace period, and will only be dropped once.
425///
426/// [`Box`]: alloc::boxed::Box
427unsafe fn delay_drop<P: NonNullPtr + Send>(pointer: NonNull<<P as NonNullPtr>::Target>) {
428    struct ForceSend<P: NonNullPtr + Send>(NonNull<<P as NonNullPtr>::Target>);
429    // SAFETY: Sending a raw pointer to another task is safe as long as
430    // the pointer access in another task is safe (guaranteed by the trait
431    // bound `P: Send`).
432    unsafe impl<P: NonNullPtr + Send> Send for ForceSend<P> {}
433
434    let pointer: ForceSend<P> = ForceSend(pointer);
435
436    let rcu_monitor = RCU_MONITOR.get().unwrap();
437    rcu_monitor.after_grace_period(move || {
438        // This is necessary to make the Rust compiler to move the entire
439        // `ForceSend` structure into the closure.
440        let pointer = pointer;
441
442        // SAFETY:
443        // 1. The pointer was previously returned by `into_raw`.
444        // 2. The pointer won't be used anymore since the grace period has
445        //    finished and this is the only time the pointer gets dropped.
446        let p = unsafe { <P as NonNullPtr>::from_raw(pointer.0) };
447        drop(p);
448    });
449}
450
451/// A wrapper to delay calling destructor of `T` after the RCU grace period.
452///
453/// Upon dropping this structure, a callback will be registered to the global
454/// RCU monitor and the destructor of `T` will be delayed until the callback.
455///
456/// [`RcuDrop<T>`] is guaranteed to have the same layout as `T`. You can also
457/// access the inner value safely via [`RcuDrop<T>`].
458#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
459#[repr(transparent)]
460pub struct RcuDrop<T: Send + 'static> {
461    value: ManuallyDrop<T>,
462}
463
464impl<T: Send + 'static> RcuDrop<T> {
465    /// Creates a new [`RcuDrop`] instance that delays the dropping of `value`.
466    pub fn new(value: T) -> Self {
467        Self {
468            value: ManuallyDrop::new(value),
469        }
470    }
471
472    /// Extracts the value from the `RcuDrop` container.
473    ///
474    /// # Safety
475    ///
476    /// The caller must ensure that the returned value will be dropped after
477    /// all the threads cannot access it anymore. Specifically, dropping it
478    /// after the RCU grace period is guaranteed to be safe.
479    ///
480    /// Note that panic unwinding may cause the returned value to be dropped
481    /// immediately, which is not sound. Therefore, the caller must forget the
482    /// [`PanicGuard`] after it ensures that the value will be dropped at the
483    /// correct time.
484    pub(crate) unsafe fn into_inner(slot: RcuDrop<T>) -> (T, PanicGuard) {
485        let mut slot = ManuallyDrop::new(slot);
486        let panic_guard = PanicGuard::new();
487        // SAFETY: The `slot` will not be used after this point.
488        let val = unsafe { ManuallyDrop::take(&mut slot.value) };
489        (val, panic_guard)
490    }
491}
492
493impl<T: Send + 'static> Deref for RcuDrop<T> {
494    type Target = T;
495
496    fn deref(&self) -> &Self::Target {
497        &self.value
498    }
499}
500
501impl<T: Send + 'static> Drop for RcuDrop<T> {
502    fn drop(&mut self) {
503        // SAFETY: The `ManuallyDrop` will not be used after this point.
504        let taken = unsafe { ManuallyDrop::take(&mut self.value) };
505        let rcu_monitor = RCU_MONITOR.get().unwrap();
506        rcu_monitor.after_grace_period(|| {
507            drop(taken);
508        });
509    }
510}
511
512/// Finishes the current grace period.
513///
514/// This function is called when the current grace period on current CPU is
515/// finished. If this CPU is the last CPU to finish the current grace period,
516/// it takes all the current callbacks and invokes them.
517///
518/// # Safety
519///
520/// The caller must ensure that this CPU is not executing in a RCU read-side
521/// critical section.
522pub unsafe fn finish_grace_period() {
523    let rcu_monitor = RCU_MONITOR.get().unwrap();
524    // SAFETY: The caller ensures safety.
525    unsafe {
526        rcu_monitor.finish_grace_period();
527    }
528}
529
530static RCU_MONITOR: Once<RcuMonitor> = Once::new();
531
532pub fn init() {
533    RCU_MONITOR.call_once(RcuMonitor::new);
534}