ostd/sync/rcu/mod.rs
1// SPDX-License-Identifier: MPL-2.0
2
3//! Read-copy update (RCU).
4
5use core::{
6 marker::PhantomData,
7 mem::ManuallyDrop,
8 ops::Deref,
9 ptr::NonNull,
10 sync::atomic::{
11 AtomicPtr,
12 Ordering::{AcqRel, Acquire},
13 },
14};
15
16use non_null::NonNullPtr;
17use spin::once::Once;
18
19use self::monitor::RcuMonitor;
20use crate::{
21 panic::PanicGuard,
22 task::{
23 DisabledPreemptGuard,
24 atomic_mode::{AsAtomicModeGuard, InAtomicMode},
25 disable_preempt,
26 },
27};
28
29mod monitor;
30pub mod non_null;
31
32/// A Read-Copy Update (RCU) cell for sharing a pointer between threads.
33///
34/// The pointer should be a non-null pointer with type `P`, which implements
35/// [`NonNullPtr`]. For example, `P` can be `Box<T>` or `Arc<T>`.
36///
37/// # Overview
38///
39/// Read-Copy-Update (RCU) is a synchronization mechanism designed for high-
40/// performance, low-latency read operations in concurrent systems. It allows
41/// multiple readers to access shared data simultaneously without contention,
42/// while writers can update the data safely in a way that does not disrupt
43/// ongoing reads. RCU is particularly suited for situations where reads are
44/// far more frequent than writes.
45///
46/// The original design and implementation of RCU is described in paper _The
47/// Read-Copy-Update Mechanism for Supporting Real-Time Applications on Shared-
48/// Memory Multiprocessor Systems with Linux_ published on IBM Systems Journal
49/// 47.2 (2008).
50///
51/// # Examples
52///
53/// ```
54/// use ostd::sync::Rcu;
55///
56/// let rcu = Rcu::new(Box::new(42));
57///
58/// let rcu_guard = rcu.read();
59///
60/// assert_eq!(*rcu_guard, Some(&42));
61///
62/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
63///
64/// let rcu_guard = rcu.read();
65///
66/// assert_eq!(*rcu_guard, Some(&43));
67/// ```
68pub struct Rcu<P: NonNullPtr>(RcuInner<P>);
69
70/// A guard that allows access to the pointed data protected by a [`Rcu`].
71#[clippy::has_significant_drop]
72#[must_use]
73pub struct RcuReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
74
75/// A Read-Copy Update (RCU) cell for sharing a _nullable_ pointer.
76///
77/// This is a variant of [`Rcu`] that allows the contained pointer to be null.
78/// So that it can implement `Rcu<Option<P>>` where `P` is not a nullable
79/// pointer. It is the same as [`Rcu`] in other aspects.
80///
81/// # Examples
82///
83/// ```
84/// use ostd::sync::RcuOption;
85///
86/// static RCU: RcuOption<Box<usize>> = RcuOption::new_none();
87///
88/// assert!(RCU.read().is_none());
89///
90/// RCU.update(Box::new(42));
91///
92/// // Read the data protected by RCU
93/// {
94/// let rcu_guard = RCU.read().try_get().unwrap();
95/// assert_eq!(*rcu_guard, 42);
96/// }
97///
98/// // Update the data protected by RCU
99/// {
100/// let rcu_guard = RCU.read().try_get().unwrap();
101///
102/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
103///
104/// let rcu_guard = RCU.read().try_get().unwrap();
105/// assert_eq!(*rcu_guard, 43);
106/// }
107/// ```
108pub struct RcuOption<P: NonNullPtr>(RcuInner<P>);
109
110/// A guard that allows access to the pointed data protected by a [`RcuOption`].
111#[clippy::has_significant_drop]
112#[must_use]
113pub struct RcuOptionReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
114
115/// The inner implementation of both [`Rcu`] and [`RcuOption`].
116struct RcuInner<P: NonNullPtr> {
117 ptr: AtomicPtr<<P as NonNullPtr>::Target>,
118 // We want to implement Send and Sync explicitly.
119 // Having a pointer field prevents them from being implemented
120 // automatically by the compiler.
121 _marker: PhantomData<*const P::Target>,
122}
123
124// SAFETY: It is apparent that if `P` is `Send`, then `Rcu<P>` is `Send`.
125unsafe impl<P: NonNullPtr> Send for RcuInner<P> where P: Send {}
126
127// SAFETY: To implement `Sync` for `Rcu<P>`, we need to meet two conditions:
128// 1. `P` must be `Sync` because `Rcu::get` allows concurrent access.
129// 2. `P` must be `Send` because `Rcu::update` may obtain an object
130// of `P` created on another thread.
131unsafe impl<P: NonNullPtr> Sync for RcuInner<P> where P: Send + Sync {}
132
133impl<P: NonNullPtr + Send> RcuInner<P> {
134 const fn new_none() -> Self {
135 Self {
136 ptr: AtomicPtr::new(core::ptr::null_mut()),
137 _marker: PhantomData,
138 }
139 }
140
141 fn new(pointer: P) -> Self {
142 let ptr = <P as NonNullPtr>::into_raw(pointer).as_ptr();
143 let ptr = AtomicPtr::new(ptr);
144 Self {
145 ptr,
146 _marker: PhantomData,
147 }
148 }
149
150 fn update(&self, new_ptr: Option<P>) {
151 let new_ptr = if let Some(new_ptr) = new_ptr {
152 <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
153 } else {
154 core::ptr::null_mut()
155 };
156
157 let old_raw_ptr = self.ptr.swap(new_ptr, AcqRel);
158
159 if let Some(p) = NonNull::new(old_raw_ptr) {
160 // SAFETY:
161 // 1. The pointer was previously returned by `into_raw`.
162 // 2. The pointer is removed from the RCU slot so that no one will
163 // use it after the end of the current grace period. The removal
164 // is done atomically, so it will only be dropped once.
165 unsafe { delay_drop::<P>(p) };
166 }
167 }
168
169 fn read(&self) -> RcuReadGuardInner<'_, P> {
170 let guard = disable_preempt();
171 RcuReadGuardInner {
172 obj_ptr: self.ptr.load(Acquire),
173 rcu: self,
174 inner_guard: guard,
175 }
176 }
177
178 fn read_with<'a>(&'a self, _guard: &'a dyn InAtomicMode) -> Option<P::Ref<'a>> {
179 let obj_ptr = self.ptr.load(Acquire);
180 if obj_ptr.is_null() {
181 return None;
182 }
183 // SAFETY:
184 // 1. This pointer is not NULL.
185 // 2. The `_guard` guarantees atomic mode for the duration of lifetime
186 // `'a`, the pointer is valid because other writers won't release the
187 // allocation until this task passes the quiescent state.
188 NonNull::new(obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
189 }
190}
191
192impl<P: NonNullPtr> Drop for RcuInner<P> {
193 fn drop(&mut self) {
194 let ptr = self.ptr.load(Acquire);
195 if let Some(p) = NonNull::new(ptr) {
196 // SAFETY: It was previously returned by `into_raw` when creating
197 // the RCU primitive.
198 let pointer = unsafe { <P as NonNullPtr>::from_raw(p) };
199 // It is OK not to delay the drop because the RCU primitive is
200 // owned by nobody else.
201 drop(pointer);
202 }
203 }
204}
205
206/// The inner implementation of both [`RcuReadGuard`] and [`RcuOptionReadGuard`].
207struct RcuReadGuardInner<'a, P: NonNullPtr> {
208 obj_ptr: *mut <P as NonNullPtr>::Target,
209 rcu: &'a RcuInner<P>,
210 inner_guard: DisabledPreemptGuard,
211}
212
213impl<P: NonNullPtr + Send> RcuReadGuardInner<'_, P> {
214 fn get(&self) -> Option<P::Ref<'_>> {
215 // SAFETY: The guard ensures that `P` will not be dropped. Thus, `P`
216 // outlives the lifetime of `&self`. Additionally, during this period,
217 // it is impossible to create a mutable reference to `P`.
218 NonNull::new(self.obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
219 }
220
221 fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
222 let new_ptr = if let Some(new_ptr) = new_ptr {
223 <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
224 } else {
225 core::ptr::null_mut()
226 };
227
228 if self
229 .rcu
230 .ptr
231 .compare_exchange(self.obj_ptr, new_ptr, AcqRel, Acquire)
232 .is_err()
233 {
234 let Some(new_ptr) = NonNull::new(new_ptr) else {
235 return Err(None);
236 };
237 // SAFETY:
238 // 1. It was previously returned by `into_raw`.
239 // 2. The `compare_exchange` fails so the pointer will not
240 // be used by other threads via reading the RCU primitive.
241 return Err(Some(unsafe { <P as NonNullPtr>::from_raw(new_ptr) }));
242 }
243
244 if let Some(p) = NonNull::new(self.obj_ptr) {
245 // SAFETY:
246 // 1. The pointer was previously returned by `into_raw`.
247 // 2. The pointer is removed from the RCU slot so that no one will
248 // use it after the end of the current grace period. The removal
249 // is done atomically, so it will only be dropped once.
250 unsafe { delay_drop::<P>(p) };
251 }
252
253 Ok(())
254 }
255}
256
257impl<P: NonNullPtr + Send> Rcu<P> {
258 /// Creates a new RCU primitive with the given pointer.
259 pub fn new(pointer: P) -> Self {
260 Self(RcuInner::new(pointer))
261 }
262
263 /// Replaces the current pointer with a null pointer.
264 ///
265 /// This function updates the pointer to the new pointer regardless of the
266 /// original pointer. The original pointer will be dropped after the grace
267 /// period.
268 ///
269 /// Oftentimes this function is not recommended unless you have serialized
270 /// writes with locks. Otherwise, you can use [`Self::read`] and then
271 /// [`RcuReadGuard::compare_exchange`] to update the pointer.
272 pub fn update(&self, new_ptr: P) {
273 self.0.update(Some(new_ptr));
274 }
275
276 /// Retrieves a read guard for the RCU primitive.
277 ///
278 /// The guard allows read access to the data protected by RCU, as well
279 /// as the ability to do compare-and-exchange.
280 pub fn read(&self) -> RcuReadGuard<'_, P> {
281 RcuReadGuard(self.0.read())
282 }
283
284 /// Reads the RCU-protected value in an atomic mode.
285 ///
286 /// The RCU mechanism protects reads ([`Self::read`]) by entering an
287 /// atomic mode. If we are already in an atomic mode, this function can
288 /// reduce the overhead of disabling preemption again.
289 ///
290 /// Unlike [`Self::read`], this function does not return a read guard, so
291 /// you cannot use [`RcuReadGuard::compare_exchange`] to synchronize the
292 /// writers. You may do it via a [`super::SpinLock`].
293 pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(&'a self, guard: &'a G) -> P::Ref<'a> {
294 self.0.read_with(guard.as_atomic_mode_guard()).unwrap()
295 }
296}
297
298impl<P: NonNullPtr + Send> RcuOption<P> {
299 /// Creates a new RCU primitive with the given pointer.
300 pub fn new(pointer: Option<P>) -> Self {
301 if let Some(pointer) = pointer {
302 Self(RcuInner::new(pointer))
303 } else {
304 Self(RcuInner::new_none())
305 }
306 }
307
308 /// Creates a new RCU primitive that contains nothing.
309 ///
310 /// This is a constant equivalence to [`RcuOption::new`] with parameter `None`.
311 pub const fn new_none() -> Self {
312 Self(RcuInner::new_none())
313 }
314
315 /// Replaces the current pointer with a null pointer.
316 ///
317 /// This function updates the pointer to the new pointer regardless of the
318 /// original pointer. If the original pointer is not NULL, it will be
319 /// dropped after the grace period.
320 ///
321 /// Oftentimes this function is not recommended unless you have
322 /// synchronized writes with locks. Otherwise, you can use [`Self::read`]
323 /// and then [`RcuOptionReadGuard::compare_exchange`] to update the pointer.
324 pub fn update(&self, new_ptr: Option<P>) {
325 self.0.update(new_ptr);
326 }
327
328 /// Retrieves a read guard for the RCU primitive.
329 ///
330 /// The guard allows read access to the data protected by RCU, as well
331 /// as the ability to do compare-and-exchange.
332 ///
333 /// The contained pointer can be NULL and you can only get a reference
334 /// (if checked non-NULL) via [`RcuOptionReadGuard::get`].
335 pub fn read(&self) -> RcuOptionReadGuard<'_, P> {
336 RcuOptionReadGuard(self.0.read())
337 }
338
339 /// Reads the RCU-protected value in an atomic mode.
340 ///
341 /// The RCU mechanism protects reads ([`Self::read`]) by entering an
342 /// atomic mode. If we are already in an atomic mode, this function can
343 /// reduce the overhead of disabling preemption again.
344 ///
345 /// Unlike [`Self::read`], this function does not return a read guard, so
346 /// you cannot use [`RcuOptionReadGuard::compare_exchange`] to synchronize the
347 /// writers. You may do it via a [`super::SpinLock`].
348 pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(
349 &'a self,
350 guard: &'a G,
351 ) -> Option<P::Ref<'a>> {
352 self.0.read_with(guard.as_atomic_mode_guard())
353 }
354}
355
356impl<P: NonNullPtr + Send> RcuReadGuard<'_, P> {
357 /// Gets the reference of the protected data.
358 pub fn get(&self) -> P::Ref<'_> {
359 self.0.get().unwrap()
360 }
361
362 /// Tries to replace the already read pointer with a new pointer.
363 ///
364 /// If another thread has updated the pointer after the read, this
365 /// function will fail, and returns the given pointer back. Otherwise,
366 /// it will replace the pointer with the new one and drop the old pointer
367 /// after the grace period.
368 ///
369 /// If spinning on [`Rcu::read`] and this function, it is recommended
370 /// to relax the CPU or yield the task on failure. Otherwise contention
371 /// will occur.
372 ///
373 /// This API does not help to avoid
374 /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
375 pub fn compare_exchange(self, new_ptr: P) -> Result<(), P> {
376 self.0
377 .compare_exchange(Some(new_ptr))
378 .map_err(|err| err.unwrap())
379 }
380}
381
382impl<P: NonNullPtr> AsAtomicModeGuard for RcuReadGuard<'_, P> {
383 fn as_atomic_mode_guard(&self) -> &dyn InAtomicMode {
384 self.0.inner_guard.as_atomic_mode_guard()
385 }
386}
387
388impl<P: NonNullPtr + Send> RcuOptionReadGuard<'_, P> {
389 /// Gets the reference of the protected data.
390 ///
391 /// If the RCU primitive protects nothing, this function returns `None`.
392 pub fn get(&self) -> Option<P::Ref<'_>> {
393 self.0.get()
394 }
395
396 /// Returns if the RCU primitive protects nothing when [`Rcu::read`] happens.
397 pub fn is_none(&self) -> bool {
398 self.0.obj_ptr.is_null()
399 }
400
401 /// Tries to replace the already read pointer with a new pointer
402 /// (or none).
403 ///
404 /// If another thread has updated the pointer after the read, this
405 /// function will fail, and returns the given pointer back. Otherwise,
406 /// it will replace the pointer with the new one and drop the old pointer
407 /// after the grace period.
408 ///
409 /// If spinning on [`RcuOption::read`] and this function, it is recommended
410 /// to relax the CPU or yield the task on failure. Otherwise contention
411 /// will occur.
412 ///
413 /// This API does not help to avoid
414 /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
415 pub fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
416 self.0.compare_exchange(new_ptr)
417 }
418}
419
420impl<P: NonNullPtr> AsAtomicModeGuard for RcuOptionReadGuard<'_, P> {
421 fn as_atomic_mode_guard(&self) -> &dyn InAtomicMode {
422 self.0.inner_guard.as_atomic_mode_guard()
423 }
424}
425
426/// Delays the dropping of a [`NonNullPtr`] after the RCU grace period.
427///
428/// This is internally needed for implementing [`Rcu`] and [`RcuOption`]
429/// because we cannot alias a [`Box`]. Restoring `P` and use [`RcuDrop`] for it
430/// can lead to multiple [`Box`]es simultaneously pointing to the same
431/// content.
432///
433/// # Safety
434///
435/// The pointer must be previously returned by `into_raw`, will not be used
436/// after the end of the current grace period, and will only be dropped once.
437///
438/// [`Box`]: alloc::boxed::Box
439unsafe fn delay_drop<P: NonNullPtr + Send>(pointer: NonNull<<P as NonNullPtr>::Target>) {
440 struct ForceSend<P: NonNullPtr + Send>(NonNull<<P as NonNullPtr>::Target>);
441 // SAFETY: Sending a raw pointer to another task is safe as long as
442 // the pointer access in another task is safe (guaranteed by the trait
443 // bound `P: Send`).
444 unsafe impl<P: NonNullPtr + Send> Send for ForceSend<P> {}
445
446 let pointer: ForceSend<P> = ForceSend(pointer);
447
448 let rcu_monitor = RCU_MONITOR.get().unwrap();
449 rcu_monitor.after_grace_period(move || {
450 // This is necessary to make the Rust compiler to move the entire
451 // `ForceSend` structure into the closure.
452 let pointer = pointer;
453
454 // SAFETY:
455 // 1. The pointer was previously returned by `into_raw`.
456 // 2. The pointer won't be used anymore since the grace period has
457 // finished and this is the only time the pointer gets dropped.
458 let p = unsafe { <P as NonNullPtr>::from_raw(pointer.0) };
459 drop(p);
460 });
461}
462
463/// A wrapper to delay calling destructor of `T` after the RCU grace period.
464///
465/// Upon dropping this structure, a callback will be registered to the global
466/// RCU monitor and the destructor of `T` will be delayed until the callback.
467///
468/// [`RcuDrop<T>`] is guaranteed to have the same layout as `T`. You can also
469/// access the inner value safely via [`RcuDrop<T>`].
470#[repr(transparent)]
471#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
472pub struct RcuDrop<T: Send + 'static> {
473 value: ManuallyDrop<T>,
474}
475
476impl<T: Send + 'static> RcuDrop<T> {
477 /// Creates a new [`RcuDrop`] instance that delays the dropping of `value`.
478 pub fn new(value: T) -> Self {
479 Self {
480 value: ManuallyDrop::new(value),
481 }
482 }
483
484 /// Extracts the value from the `RcuDrop` container.
485 ///
486 /// # Safety
487 ///
488 /// The caller must ensure that the returned value will be dropped after
489 /// all the threads cannot access it anymore. Specifically, dropping it
490 /// after the RCU grace period is guaranteed to be safe.
491 ///
492 /// Note that panic unwinding may cause the returned value to be dropped
493 /// immediately, which is not sound. Therefore, the caller must forget the
494 /// [`PanicGuard`] after it ensures that the value will be dropped at the
495 /// correct time.
496 pub(crate) unsafe fn into_inner(slot: RcuDrop<T>) -> (T, PanicGuard) {
497 let mut slot = ManuallyDrop::new(slot);
498 let panic_guard = PanicGuard::new();
499 // SAFETY: The `slot` will not be used after this point.
500 let val = unsafe { ManuallyDrop::take(&mut slot.value) };
501 (val, panic_guard)
502 }
503}
504
505impl<T: Send + 'static> Deref for RcuDrop<T> {
506 type Target = T;
507
508 fn deref(&self) -> &Self::Target {
509 &self.value
510 }
511}
512
513impl<T: Send + 'static> Drop for RcuDrop<T> {
514 fn drop(&mut self) {
515 // SAFETY: The `ManuallyDrop` will not be used after this point.
516 let taken = unsafe { ManuallyDrop::take(&mut self.value) };
517 let rcu_monitor = RCU_MONITOR.get().unwrap();
518 rcu_monitor.after_grace_period(|| {
519 drop(taken);
520 });
521 }
522}
523
524/// Finishes the current grace period.
525///
526/// This function is called when the current grace period on current CPU is
527/// finished. If this CPU is the last CPU to finish the current grace period,
528/// it takes all the current callbacks and invokes them.
529///
530/// # Safety
531///
532/// The caller must ensure that this CPU is not executing in a RCU read-side
533/// critical section.
534pub unsafe fn finish_grace_period() {
535 let rcu_monitor = RCU_MONITOR.get().unwrap();
536 // SAFETY: The caller ensures safety.
537 unsafe {
538 rcu_monitor.finish_grace_period();
539 }
540}
541
542static RCU_MONITOR: Once<RcuMonitor> = Once::new();
543
544pub fn init() {
545 RCU_MONITOR.call_once(RcuMonitor::new);
546}