ostd/sync/rcu/
mod.rs

1// SPDX-License-Identifier: MPL-2.0
2
3//! Read-copy update (RCU).
4
5use core::{
6    marker::PhantomData,
7    mem::ManuallyDrop,
8    ops::Deref,
9    ptr::NonNull,
10    sync::atomic::{
11        AtomicPtr,
12        Ordering::{AcqRel, Acquire},
13    },
14};
15
16use non_null::NonNullPtr;
17use spin::once::Once;
18
19use self::monitor::RcuMonitor;
20use crate::{
21    panic::PanicGuard,
22    task::{
23        DisabledPreemptGuard,
24        atomic_mode::{AsAtomicModeGuard, InAtomicMode},
25        disable_preempt,
26    },
27};
28
29mod monitor;
30pub mod non_null;
31
32/// A Read-Copy Update (RCU) cell for sharing a pointer between threads.
33///
34/// The pointer should be a non-null pointer with type `P`, which implements
35/// [`NonNullPtr`]. For example, `P` can be `Box<T>` or `Arc<T>`.
36///
37/// # Overview
38///
39/// Read-Copy-Update (RCU) is a synchronization mechanism designed for high-
40/// performance, low-latency read operations in concurrent systems. It allows
41/// multiple readers to access shared data simultaneously without contention,
42/// while writers can update the data safely in a way that does not disrupt
43/// ongoing reads. RCU is particularly suited for situations where reads are
44/// far more frequent than writes.  
45///  
46/// The original design and implementation of RCU is described in paper _The
47/// Read-Copy-Update Mechanism for Supporting Real-Time Applications on Shared-
48/// Memory Multiprocessor Systems with Linux_ published on IBM Systems Journal
49/// 47.2 (2008).
50///
51/// # Examples
52///
53/// ```
54/// use ostd::sync::Rcu;
55///
56/// let rcu = Rcu::new(Box::new(42));
57///
58/// let rcu_guard = rcu.read();
59///
60/// assert_eq!(*rcu_guard, Some(&42));
61///
62/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
63///
64/// let rcu_guard = rcu.read();
65///
66/// assert_eq!(*rcu_guard, Some(&43));
67/// ```
68pub struct Rcu<P: NonNullPtr>(RcuInner<P>);
69
70/// A guard that allows access to the pointed data protected by a [`Rcu`].
71#[clippy::has_significant_drop]
72#[must_use]
73pub struct RcuReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
74
75/// A Read-Copy Update (RCU) cell for sharing a _nullable_ pointer.  
76///
77/// This is a variant of [`Rcu`] that allows the contained pointer to be null.
78/// So that it can implement `Rcu<Option<P>>` where `P` is not a nullable
79/// pointer. It is the same as [`Rcu`] in other aspects.
80///
81/// # Examples
82///
83/// ```
84/// use ostd::sync::RcuOption;
85///
86/// static RCU: RcuOption<Box<usize>> = RcuOption::new_none();
87///
88/// assert!(RCU.read().is_none());
89///
90/// RCU.update(Box::new(42));
91///
92/// // Read the data protected by RCU
93/// {
94///     let rcu_guard = RCU.read().try_get().unwrap();
95///     assert_eq!(*rcu_guard, 42);
96/// }
97///
98/// // Update the data protected by RCU
99/// {
100///     let rcu_guard = RCU.read().try_get().unwrap();
101///
102///     rcu_guard.compare_exchange(Box::new(43)).unwrap();
103///
104///     let rcu_guard = RCU.read().try_get().unwrap();
105///     assert_eq!(*rcu_guard, 43);
106/// }
107/// ```
108pub struct RcuOption<P: NonNullPtr>(RcuInner<P>);
109
110/// A guard that allows access to the pointed data protected by a [`RcuOption`].
111#[clippy::has_significant_drop]
112#[must_use]
113pub struct RcuOptionReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
114
115/// The inner implementation of both [`Rcu`] and [`RcuOption`].
116struct RcuInner<P: NonNullPtr> {
117    ptr: AtomicPtr<<P as NonNullPtr>::Target>,
118    // We want to implement Send and Sync explicitly.
119    // Having a pointer field prevents them from being implemented
120    // automatically by the compiler.
121    _marker: PhantomData<*const P::Target>,
122}
123
124// SAFETY: It is apparent that if `P` is `Send`, then `Rcu<P>` is `Send`.
125unsafe impl<P: NonNullPtr> Send for RcuInner<P> where P: Send {}
126
127// SAFETY: To implement `Sync` for `Rcu<P>`, we need to meet two conditions:
128//  1. `P` must be `Sync` because `Rcu::get` allows concurrent access.
129//  2. `P` must be `Send` because `Rcu::update` may obtain an object
130//     of `P` created on another thread.
131unsafe impl<P: NonNullPtr> Sync for RcuInner<P> where P: Send + Sync {}
132
133impl<P: NonNullPtr + Send> RcuInner<P> {
134    const fn new_none() -> Self {
135        Self {
136            ptr: AtomicPtr::new(core::ptr::null_mut()),
137            _marker: PhantomData,
138        }
139    }
140
141    fn new(pointer: P) -> Self {
142        let ptr = <P as NonNullPtr>::into_raw(pointer).as_ptr();
143        let ptr = AtomicPtr::new(ptr);
144        Self {
145            ptr,
146            _marker: PhantomData,
147        }
148    }
149
150    fn update(&self, new_ptr: Option<P>) {
151        let new_ptr = if let Some(new_ptr) = new_ptr {
152            <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
153        } else {
154            core::ptr::null_mut()
155        };
156
157        let old_raw_ptr = self.ptr.swap(new_ptr, AcqRel);
158
159        if let Some(p) = NonNull::new(old_raw_ptr) {
160            // SAFETY:
161            // 1. The pointer was previously returned by `into_raw`.
162            // 2. The pointer is removed from the RCU slot so that no one will
163            //    use it after the end of the current grace period. The removal
164            //    is done atomically, so it will only be dropped once.
165            unsafe { delay_drop::<P>(p) };
166        }
167    }
168
169    fn read(&self) -> RcuReadGuardInner<'_, P> {
170        let guard = disable_preempt();
171        RcuReadGuardInner {
172            obj_ptr: self.ptr.load(Acquire),
173            rcu: self,
174            inner_guard: guard,
175        }
176    }
177
178    fn read_with<'a>(&'a self, _guard: &'a dyn InAtomicMode) -> Option<P::Ref<'a>> {
179        let obj_ptr = self.ptr.load(Acquire);
180        if obj_ptr.is_null() {
181            return None;
182        }
183        // SAFETY:
184        // 1. This pointer is not NULL.
185        // 2. The `_guard` guarantees atomic mode for the duration of lifetime
186        //    `'a`, the pointer is valid because other writers won't release the
187        //    allocation until this task passes the quiescent state.
188        NonNull::new(obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
189    }
190}
191
192impl<P: NonNullPtr> Drop for RcuInner<P> {
193    fn drop(&mut self) {
194        let ptr = self.ptr.load(Acquire);
195        if let Some(p) = NonNull::new(ptr) {
196            // SAFETY: It was previously returned by `into_raw` when creating
197            // the RCU primitive.
198            let pointer = unsafe { <P as NonNullPtr>::from_raw(p) };
199            // It is OK not to delay the drop because the RCU primitive is
200            // owned by nobody else.
201            drop(pointer);
202        }
203    }
204}
205
206/// The inner implementation of both [`RcuReadGuard`] and [`RcuOptionReadGuard`].
207struct RcuReadGuardInner<'a, P: NonNullPtr> {
208    obj_ptr: *mut <P as NonNullPtr>::Target,
209    rcu: &'a RcuInner<P>,
210    inner_guard: DisabledPreemptGuard,
211}
212
213impl<P: NonNullPtr + Send> RcuReadGuardInner<'_, P> {
214    fn get(&self) -> Option<P::Ref<'_>> {
215        // SAFETY: The guard ensures that `P` will not be dropped. Thus, `P`
216        // outlives the lifetime of `&self`. Additionally, during this period,
217        // it is impossible to create a mutable reference to `P`.
218        NonNull::new(self.obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
219    }
220
221    fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
222        let new_ptr = if let Some(new_ptr) = new_ptr {
223            <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
224        } else {
225            core::ptr::null_mut()
226        };
227
228        if self
229            .rcu
230            .ptr
231            .compare_exchange(self.obj_ptr, new_ptr, AcqRel, Acquire)
232            .is_err()
233        {
234            let Some(new_ptr) = NonNull::new(new_ptr) else {
235                return Err(None);
236            };
237            // SAFETY:
238            // 1. It was previously returned by `into_raw`.
239            // 2. The `compare_exchange` fails so the pointer will not
240            //    be used by other threads via reading the RCU primitive.
241            return Err(Some(unsafe { <P as NonNullPtr>::from_raw(new_ptr) }));
242        }
243
244        if let Some(p) = NonNull::new(self.obj_ptr) {
245            // SAFETY:
246            // 1. The pointer was previously returned by `into_raw`.
247            // 2. The pointer is removed from the RCU slot so that no one will
248            //    use it after the end of the current grace period. The removal
249            //    is done atomically, so it will only be dropped once.
250            unsafe { delay_drop::<P>(p) };
251        }
252
253        Ok(())
254    }
255}
256
257impl<P: NonNullPtr + Send> Rcu<P> {
258    /// Creates a new RCU primitive with the given pointer.
259    pub fn new(pointer: P) -> Self {
260        Self(RcuInner::new(pointer))
261    }
262
263    /// Replaces the current pointer with a null pointer.
264    ///
265    /// This function updates the pointer to the new pointer regardless of the
266    /// original pointer. The original pointer will be dropped after the grace
267    /// period.
268    ///
269    /// Oftentimes this function is not recommended unless you have serialized
270    /// writes with locks. Otherwise, you can use [`Self::read`] and then
271    /// [`RcuReadGuard::compare_exchange`] to update the pointer.
272    pub fn update(&self, new_ptr: P) {
273        self.0.update(Some(new_ptr));
274    }
275
276    /// Retrieves a read guard for the RCU primitive.
277    ///
278    /// The guard allows read access to the data protected by RCU, as well
279    /// as the ability to do compare-and-exchange.
280    pub fn read(&self) -> RcuReadGuard<'_, P> {
281        RcuReadGuard(self.0.read())
282    }
283
284    /// Reads the RCU-protected value in an atomic mode.
285    ///
286    /// The RCU mechanism protects reads ([`Self::read`]) by entering an
287    /// atomic mode. If we are already in an atomic mode, this function can
288    /// reduce the overhead of disabling preemption again.
289    ///
290    /// Unlike [`Self::read`], this function does not return a read guard, so
291    /// you cannot use [`RcuReadGuard::compare_exchange`] to synchronize the
292    /// writers. You may do it via a [`super::SpinLock`].
293    pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(&'a self, guard: &'a G) -> P::Ref<'a> {
294        self.0.read_with(guard.as_atomic_mode_guard()).unwrap()
295    }
296}
297
298impl<P: NonNullPtr + Send> RcuOption<P> {
299    /// Creates a new RCU primitive with the given pointer.
300    pub fn new(pointer: Option<P>) -> Self {
301        if let Some(pointer) = pointer {
302            Self(RcuInner::new(pointer))
303        } else {
304            Self(RcuInner::new_none())
305        }
306    }
307
308    /// Creates a new RCU primitive that contains nothing.
309    ///
310    /// This is a constant equivalence to [`RcuOption::new`] with parameter `None`.
311    pub const fn new_none() -> Self {
312        Self(RcuInner::new_none())
313    }
314
315    /// Replaces the current pointer with a null pointer.
316    ///
317    /// This function updates the pointer to the new pointer regardless of the
318    /// original pointer. If the original pointer is not NULL, it will be
319    /// dropped after the grace period.
320    ///
321    /// Oftentimes this function is not recommended unless you have
322    /// synchronized writes with locks. Otherwise, you can use [`Self::read`]
323    /// and then [`RcuOptionReadGuard::compare_exchange`] to update the pointer.
324    pub fn update(&self, new_ptr: Option<P>) {
325        self.0.update(new_ptr);
326    }
327
328    /// Retrieves a read guard for the RCU primitive.
329    ///
330    /// The guard allows read access to the data protected by RCU, as well
331    /// as the ability to do compare-and-exchange.
332    ///
333    /// The contained pointer can be NULL and you can only get a reference
334    /// (if checked non-NULL) via [`RcuOptionReadGuard::get`].
335    pub fn read(&self) -> RcuOptionReadGuard<'_, P> {
336        RcuOptionReadGuard(self.0.read())
337    }
338
339    /// Reads the RCU-protected value in an atomic mode.
340    ///
341    /// The RCU mechanism protects reads ([`Self::read`]) by entering an
342    /// atomic mode. If we are already in an atomic mode, this function can
343    /// reduce the overhead of disabling preemption again.
344    ///
345    /// Unlike [`Self::read`], this function does not return a read guard, so
346    /// you cannot use [`RcuOptionReadGuard::compare_exchange`] to synchronize the
347    /// writers. You may do it via a [`super::SpinLock`].
348    pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(
349        &'a self,
350        guard: &'a G,
351    ) -> Option<P::Ref<'a>> {
352        self.0.read_with(guard.as_atomic_mode_guard())
353    }
354}
355
356impl<P: NonNullPtr + Send> RcuReadGuard<'_, P> {
357    /// Gets the reference of the protected data.
358    pub fn get(&self) -> P::Ref<'_> {
359        self.0.get().unwrap()
360    }
361
362    /// Tries to replace the already read pointer with a new pointer.
363    ///
364    /// If another thread has updated the pointer after the read, this
365    /// function will fail, and returns the given pointer back. Otherwise,
366    /// it will replace the pointer with the new one and drop the old pointer
367    /// after the grace period.
368    ///
369    /// If spinning on [`Rcu::read`] and this function, it is recommended
370    /// to relax the CPU or yield the task on failure. Otherwise contention
371    /// will occur.
372    ///
373    /// This API does not help to avoid
374    /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
375    pub fn compare_exchange(self, new_ptr: P) -> Result<(), P> {
376        self.0
377            .compare_exchange(Some(new_ptr))
378            .map_err(|err| err.unwrap())
379    }
380}
381
382impl<P: NonNullPtr> AsAtomicModeGuard for RcuReadGuard<'_, P> {
383    fn as_atomic_mode_guard(&self) -> &dyn InAtomicMode {
384        self.0.inner_guard.as_atomic_mode_guard()
385    }
386}
387
388impl<P: NonNullPtr + Send> RcuOptionReadGuard<'_, P> {
389    /// Gets the reference of the protected data.
390    ///
391    /// If the RCU primitive protects nothing, this function returns `None`.
392    pub fn get(&self) -> Option<P::Ref<'_>> {
393        self.0.get()
394    }
395
396    /// Returns if the RCU primitive protects nothing when [`Rcu::read`] happens.
397    pub fn is_none(&self) -> bool {
398        self.0.obj_ptr.is_null()
399    }
400
401    /// Tries to replace the already read pointer with a new pointer
402    /// (or none).
403    ///
404    /// If another thread has updated the pointer after the read, this
405    /// function will fail, and returns the given pointer back. Otherwise,
406    /// it will replace the pointer with the new one and drop the old pointer
407    /// after the grace period.
408    ///
409    /// If spinning on [`RcuOption::read`] and this function, it is recommended
410    /// to relax the CPU or yield the task on failure. Otherwise contention
411    /// will occur.
412    ///
413    /// This API does not help to avoid
414    /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
415    pub fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
416        self.0.compare_exchange(new_ptr)
417    }
418}
419
420impl<P: NonNullPtr> AsAtomicModeGuard for RcuOptionReadGuard<'_, P> {
421    fn as_atomic_mode_guard(&self) -> &dyn InAtomicMode {
422        self.0.inner_guard.as_atomic_mode_guard()
423    }
424}
425
426/// Delays the dropping of a [`NonNullPtr`] after the RCU grace period.
427///
428/// This is internally needed for implementing [`Rcu`] and [`RcuOption`]
429/// because we cannot alias a [`Box`]. Restoring `P` and use [`RcuDrop`] for it
430/// can lead to multiple [`Box`]es simultaneously pointing to the same
431/// content.
432///
433/// # Safety
434///
435/// The pointer must be previously returned by `into_raw`, will not be used
436/// after the end of the current grace period, and will only be dropped once.
437///
438/// [`Box`]: alloc::boxed::Box
439unsafe fn delay_drop<P: NonNullPtr + Send>(pointer: NonNull<<P as NonNullPtr>::Target>) {
440    struct ForceSend<P: NonNullPtr + Send>(NonNull<<P as NonNullPtr>::Target>);
441    // SAFETY: Sending a raw pointer to another task is safe as long as
442    // the pointer access in another task is safe (guaranteed by the trait
443    // bound `P: Send`).
444    unsafe impl<P: NonNullPtr + Send> Send for ForceSend<P> {}
445
446    let pointer: ForceSend<P> = ForceSend(pointer);
447
448    let rcu_monitor = RCU_MONITOR.get().unwrap();
449    rcu_monitor.after_grace_period(move || {
450        // This is necessary to make the Rust compiler to move the entire
451        // `ForceSend` structure into the closure.
452        let pointer = pointer;
453
454        // SAFETY:
455        // 1. The pointer was previously returned by `into_raw`.
456        // 2. The pointer won't be used anymore since the grace period has
457        //    finished and this is the only time the pointer gets dropped.
458        let p = unsafe { <P as NonNullPtr>::from_raw(pointer.0) };
459        drop(p);
460    });
461}
462
463/// A wrapper to delay calling destructor of `T` after the RCU grace period.
464///
465/// Upon dropping this structure, a callback will be registered to the global
466/// RCU monitor and the destructor of `T` will be delayed until the callback.
467///
468/// [`RcuDrop<T>`] is guaranteed to have the same layout as `T`. You can also
469/// access the inner value safely via [`RcuDrop<T>`].
470#[repr(transparent)]
471#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
472pub struct RcuDrop<T: Send + 'static> {
473    value: ManuallyDrop<T>,
474}
475
476impl<T: Send + 'static> RcuDrop<T> {
477    /// Creates a new [`RcuDrop`] instance that delays the dropping of `value`.
478    pub fn new(value: T) -> Self {
479        Self {
480            value: ManuallyDrop::new(value),
481        }
482    }
483
484    /// Extracts the value from the `RcuDrop` container.
485    ///
486    /// # Safety
487    ///
488    /// The caller must ensure that the returned value will be dropped after
489    /// all the threads cannot access it anymore. Specifically, dropping it
490    /// after the RCU grace period is guaranteed to be safe.
491    ///
492    /// Note that panic unwinding may cause the returned value to be dropped
493    /// immediately, which is not sound. Therefore, the caller must forget the
494    /// [`PanicGuard`] after it ensures that the value will be dropped at the
495    /// correct time.
496    pub(crate) unsafe fn into_inner(slot: RcuDrop<T>) -> (T, PanicGuard) {
497        let mut slot = ManuallyDrop::new(slot);
498        let panic_guard = PanicGuard::new();
499        // SAFETY: The `slot` will not be used after this point.
500        let val = unsafe { ManuallyDrop::take(&mut slot.value) };
501        (val, panic_guard)
502    }
503}
504
505impl<T: Send + 'static> Deref for RcuDrop<T> {
506    type Target = T;
507
508    fn deref(&self) -> &Self::Target {
509        &self.value
510    }
511}
512
513impl<T: Send + 'static> Drop for RcuDrop<T> {
514    fn drop(&mut self) {
515        // SAFETY: The `ManuallyDrop` will not be used after this point.
516        let taken = unsafe { ManuallyDrop::take(&mut self.value) };
517        let rcu_monitor = RCU_MONITOR.get().unwrap();
518        rcu_monitor.after_grace_period(|| {
519            drop(taken);
520        });
521    }
522}
523
524/// Finishes the current grace period.
525///
526/// This function is called when the current grace period on current CPU is
527/// finished. If this CPU is the last CPU to finish the current grace period,
528/// it takes all the current callbacks and invokes them.
529///
530/// # Safety
531///
532/// The caller must ensure that this CPU is not executing in a RCU read-side
533/// critical section.
534pub unsafe fn finish_grace_period() {
535    let rcu_monitor = RCU_MONITOR.get().unwrap();
536    // SAFETY: The caller ensures safety.
537    unsafe {
538        rcu_monitor.finish_grace_period();
539    }
540}
541
542static RCU_MONITOR: Once<RcuMonitor> = Once::new();
543
544pub fn init() {
545    RCU_MONITOR.call_once(RcuMonitor::new);
546}