ostd/sync/rcu/mod.rs
1// SPDX-License-Identifier: MPL-2.0
2
3//! Read-copy update (RCU).
4
5use core::{
6 marker::PhantomData,
7 mem::ManuallyDrop,
8 ops::Deref,
9 ptr::NonNull,
10 sync::atomic::{
11 AtomicPtr,
12 Ordering::{AcqRel, Acquire},
13 },
14};
15
16use non_null::NonNullPtr;
17use spin::once::Once;
18
19use self::monitor::RcuMonitor;
20use crate::task::{
21 DisabledPreemptGuard,
22 atomic_mode::{AsAtomicModeGuard, InAtomicMode},
23 disable_preempt,
24};
25
26mod monitor;
27pub mod non_null;
28
29/// A Read-Copy Update (RCU) cell for sharing a pointer between threads.
30///
31/// The pointer should be a non-null pointer with type `P`, which implements
32/// [`NonNullPtr`]. For example, `P` can be `Box<T>` or `Arc<T>`.
33///
34/// # Overview
35///
36/// Read-Copy-Update (RCU) is a synchronization mechanism designed for high-
37/// performance, low-latency read operations in concurrent systems. It allows
38/// multiple readers to access shared data simultaneously without contention,
39/// while writers can update the data safely in a way that does not disrupt
40/// ongoing reads. RCU is particularly suited for situations where reads are
41/// far more frequent than writes.
42///
43/// The original design and implementation of RCU is described in paper _The
44/// Read-Copy-Update Mechanism for Supporting Real-Time Applications on Shared-
45/// Memory Multiprocessor Systems with Linux_ published on IBM Systems Journal
46/// 47.2 (2008).
47///
48/// # Examples
49///
50/// ```
51/// use ostd::sync::Rcu;
52///
53/// let rcu = Rcu::new(Box::new(42));
54///
55/// let rcu_guard = rcu.read();
56///
57/// assert_eq!(*rcu_guard, Some(&42));
58///
59/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
60///
61/// let rcu_guard = rcu.read();
62///
63/// assert_eq!(*rcu_guard, Some(&43));
64/// ```
65pub struct Rcu<P: NonNullPtr>(RcuInner<P>);
66
67/// A guard that allows access to the pointed data protected by a [`Rcu`].
68#[clippy::has_significant_drop]
69#[must_use]
70pub struct RcuReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
71
72/// A Read-Copy Update (RCU) cell for sharing a _nullable_ pointer.
73///
74/// This is a variant of [`Rcu`] that allows the contained pointer to be null.
75/// So that it can implement `Rcu<Option<P>>` where `P` is not a nullable
76/// pointer. It is the same as [`Rcu`] in other aspects.
77///
78/// # Examples
79///
80/// ```
81/// use ostd::sync::RcuOption;
82///
83/// static RCU: RcuOption<Box<usize>> = RcuOption::new_none();
84///
85/// assert!(RCU.read().is_none());
86///
87/// RCU.update(Box::new(42));
88///
89/// // Read the data protected by RCU
90/// {
91/// let rcu_guard = RCU.read().try_get().unwrap();
92/// assert_eq!(*rcu_guard, 42);
93/// }
94///
95/// // Update the data protected by RCU
96/// {
97/// let rcu_guard = RCU.read().try_get().unwrap();
98///
99/// rcu_guard.compare_exchange(Box::new(43)).unwrap();
100///
101/// let rcu_guard = RCU.read().try_get().unwrap();
102/// assert_eq!(*rcu_guard, 43);
103/// }
104/// ```
105pub struct RcuOption<P: NonNullPtr>(RcuInner<P>);
106
107/// A guard that allows access to the pointed data protected by a [`RcuOption`].
108#[clippy::has_significant_drop]
109#[must_use]
110pub struct RcuOptionReadGuard<'a, P: NonNullPtr>(RcuReadGuardInner<'a, P>);
111
112/// The inner implementation of both [`Rcu`] and [`RcuOption`].
113struct RcuInner<P: NonNullPtr> {
114 ptr: AtomicPtr<<P as NonNullPtr>::Target>,
115 // We want to implement Send and Sync explicitly.
116 // Having a pointer field prevents them from being implemented
117 // automatically by the compiler.
118 _marker: PhantomData<*const P::Target>,
119}
120
121// SAFETY: It is apparent that if `P` is `Send`, then `Rcu<P>` is `Send`.
122unsafe impl<P: NonNullPtr> Send for RcuInner<P> where P: Send {}
123
124// SAFETY: To implement `Sync` for `Rcu<P>`, we need to meet two conditions:
125// 1. `P` must be `Sync` because `Rcu::get` allows concurrent access.
126// 2. `P` must be `Send` because `Rcu::update` may obtain an object
127// of `P` created on another thread.
128unsafe impl<P: NonNullPtr> Sync for RcuInner<P> where P: Send + Sync {}
129
130impl<P: NonNullPtr + Send> RcuInner<P> {
131 const fn new_none() -> Self {
132 Self {
133 ptr: AtomicPtr::new(core::ptr::null_mut()),
134 _marker: PhantomData,
135 }
136 }
137
138 fn new(pointer: P) -> Self {
139 let ptr = <P as NonNullPtr>::into_raw(pointer).as_ptr();
140 let ptr = AtomicPtr::new(ptr);
141 Self {
142 ptr,
143 _marker: PhantomData,
144 }
145 }
146
147 fn update(&self, new_ptr: Option<P>) {
148 let new_ptr = if let Some(new_ptr) = new_ptr {
149 <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
150 } else {
151 core::ptr::null_mut()
152 };
153
154 let old_raw_ptr = self.ptr.swap(new_ptr, AcqRel);
155
156 if let Some(p) = NonNull::new(old_raw_ptr) {
157 // SAFETY:
158 // 1. The pointer was previously returned by `into_raw`.
159 // 2. The pointer is removed from the RCU slot so that no one will
160 // use it after the end of the current grace period. The removal
161 // is done atomically, so it will only be dropped once.
162 unsafe { delay_drop::<P>(p) };
163 }
164 }
165
166 fn read(&self) -> RcuReadGuardInner<'_, P> {
167 let guard = disable_preempt();
168 RcuReadGuardInner {
169 obj_ptr: self.ptr.load(Acquire),
170 rcu: self,
171 _inner_guard: guard,
172 }
173 }
174
175 fn read_with<'a>(&'a self, _guard: &'a dyn InAtomicMode) -> Option<P::Ref<'a>> {
176 let obj_ptr = self.ptr.load(Acquire);
177 if obj_ptr.is_null() {
178 return None;
179 }
180 // SAFETY:
181 // 1. This pointer is not NULL.
182 // 2. The `_guard` guarantees atomic mode for the duration of lifetime
183 // `'a`, the pointer is valid because other writers won't release the
184 // allocation until this task passes the quiescent state.
185 NonNull::new(obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
186 }
187}
188
189impl<P: NonNullPtr> Drop for RcuInner<P> {
190 fn drop(&mut self) {
191 let ptr = self.ptr.load(Acquire);
192 if let Some(p) = NonNull::new(ptr) {
193 // SAFETY: It was previously returned by `into_raw` when creating
194 // the RCU primitive.
195 let pointer = unsafe { <P as NonNullPtr>::from_raw(p) };
196 // It is OK not to delay the drop because the RCU primitive is
197 // owned by nobody else.
198 drop(pointer);
199 }
200 }
201}
202
203/// The inner implementation of both [`RcuReadGuard`] and [`RcuOptionReadGuard`].
204struct RcuReadGuardInner<'a, P: NonNullPtr> {
205 obj_ptr: *mut <P as NonNullPtr>::Target,
206 rcu: &'a RcuInner<P>,
207 _inner_guard: DisabledPreemptGuard,
208}
209
210impl<P: NonNullPtr + Send> RcuReadGuardInner<'_, P> {
211 fn get(&self) -> Option<P::Ref<'_>> {
212 // SAFETY: The guard ensures that `P` will not be dropped. Thus, `P`
213 // outlives the lifetime of `&self`. Additionally, during this period,
214 // it is impossible to create a mutable reference to `P`.
215 NonNull::new(self.obj_ptr).map(|ptr| unsafe { P::raw_as_ref(ptr) })
216 }
217
218 fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
219 let new_ptr = if let Some(new_ptr) = new_ptr {
220 <P as NonNullPtr>::into_raw(new_ptr).as_ptr()
221 } else {
222 core::ptr::null_mut()
223 };
224
225 if self
226 .rcu
227 .ptr
228 .compare_exchange(self.obj_ptr, new_ptr, AcqRel, Acquire)
229 .is_err()
230 {
231 let Some(new_ptr) = NonNull::new(new_ptr) else {
232 return Err(None);
233 };
234 // SAFETY:
235 // 1. It was previously returned by `into_raw`.
236 // 2. The `compare_exchange` fails so the pointer will not
237 // be used by other threads via reading the RCU primitive.
238 return Err(Some(unsafe { <P as NonNullPtr>::from_raw(new_ptr) }));
239 }
240
241 if let Some(p) = NonNull::new(self.obj_ptr) {
242 // SAFETY:
243 // 1. The pointer was previously returned by `into_raw`.
244 // 2. The pointer is removed from the RCU slot so that no one will
245 // use it after the end of the current grace period. The removal
246 // is done atomically, so it will only be dropped once.
247 unsafe { delay_drop::<P>(p) };
248 }
249
250 Ok(())
251 }
252}
253
254impl<P: NonNullPtr + Send> Rcu<P> {
255 /// Creates a new RCU primitive with the given pointer.
256 pub fn new(pointer: P) -> Self {
257 Self(RcuInner::new(pointer))
258 }
259
260 /// Replaces the current pointer with a null pointer.
261 ///
262 /// This function updates the pointer to the new pointer regardless of the
263 /// original pointer. The original pointer will be dropped after the grace
264 /// period.
265 ///
266 /// Oftentimes this function is not recommended unless you have serialized
267 /// writes with locks. Otherwise, you can use [`Self::read`] and then
268 /// [`RcuReadGuard::compare_exchange`] to update the pointer.
269 pub fn update(&self, new_ptr: P) {
270 self.0.update(Some(new_ptr));
271 }
272
273 /// Retrieves a read guard for the RCU primitive.
274 ///
275 /// The guard allows read access to the data protected by RCU, as well
276 /// as the ability to do compare-and-exchange.
277 pub fn read(&self) -> RcuReadGuard<'_, P> {
278 RcuReadGuard(self.0.read())
279 }
280
281 /// Reads the RCU-protected value in an atomic mode.
282 ///
283 /// The RCU mechanism protects reads ([`Self::read`]) by entering an
284 /// atomic mode. If we are already in an atomic mode, this function can
285 /// reduce the overhead of disabling preemption again.
286 ///
287 /// Unlike [`Self::read`], this function does not return a read guard, so
288 /// you cannot use [`RcuReadGuard::compare_exchange`] to synchronize the
289 /// writers. You may do it via a [`super::SpinLock`].
290 pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(&'a self, guard: &'a G) -> P::Ref<'a> {
291 self.0.read_with(guard.as_atomic_mode_guard()).unwrap()
292 }
293}
294
295impl<P: NonNullPtr + Send> RcuOption<P> {
296 /// Creates a new RCU primitive with the given pointer.
297 pub fn new(pointer: Option<P>) -> Self {
298 if let Some(pointer) = pointer {
299 Self(RcuInner::new(pointer))
300 } else {
301 Self(RcuInner::new_none())
302 }
303 }
304
305 /// Creates a new RCU primitive that contains nothing.
306 ///
307 /// This is a constant equivalence to [`RcuOption::new`] with parameter `None`.
308 pub const fn new_none() -> Self {
309 Self(RcuInner::new_none())
310 }
311
312 /// Replaces the current pointer with a null pointer.
313 ///
314 /// This function updates the pointer to the new pointer regardless of the
315 /// original pointer. If the original pointer is not NULL, it will be
316 /// dropped after the grace period.
317 ///
318 /// Oftentimes this function is not recommended unless you have
319 /// synchronized writes with locks. Otherwise, you can use [`Self::read`]
320 /// and then [`RcuOptionReadGuard::compare_exchange`] to update the pointer.
321 pub fn update(&self, new_ptr: Option<P>) {
322 self.0.update(new_ptr);
323 }
324
325 /// Retrieves a read guard for the RCU primitive.
326 ///
327 /// The guard allows read access to the data protected by RCU, as well
328 /// as the ability to do compare-and-exchange.
329 ///
330 /// The contained pointer can be NULL and you can only get a reference
331 /// (if checked non-NULL) via [`RcuOptionReadGuard::get`].
332 pub fn read(&self) -> RcuOptionReadGuard<'_, P> {
333 RcuOptionReadGuard(self.0.read())
334 }
335
336 /// Reads the RCU-protected value in an atomic mode.
337 ///
338 /// The RCU mechanism protects reads ([`Self::read`]) by entering an
339 /// atomic mode. If we are already in an atomic mode, this function can
340 /// reduce the overhead of disabling preemption again.
341 ///
342 /// Unlike [`Self::read`], this function does not return a read guard, so
343 /// you cannot use [`RcuOptionReadGuard::compare_exchange`] to synchronize the
344 /// writers. You may do it via a [`super::SpinLock`].
345 pub fn read_with<'a, G: AsAtomicModeGuard + ?Sized>(
346 &'a self,
347 guard: &'a G,
348 ) -> Option<P::Ref<'a>> {
349 self.0.read_with(guard.as_atomic_mode_guard())
350 }
351}
352
353impl<P: NonNullPtr + Send> RcuReadGuard<'_, P> {
354 /// Gets the reference of the protected data.
355 pub fn get(&self) -> P::Ref<'_> {
356 self.0.get().unwrap()
357 }
358
359 /// Tries to replace the already read pointer with a new pointer.
360 ///
361 /// If another thread has updated the pointer after the read, this
362 /// function will fail, and returns the given pointer back. Otherwise,
363 /// it will replace the pointer with the new one and drop the old pointer
364 /// after the grace period.
365 ///
366 /// If spinning on [`Rcu::read`] and this function, it is recommended
367 /// to relax the CPU or yield the task on failure. Otherwise contention
368 /// will occur.
369 ///
370 /// This API does not help to avoid
371 /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
372 pub fn compare_exchange(self, new_ptr: P) -> Result<(), P> {
373 self.0
374 .compare_exchange(Some(new_ptr))
375 .map_err(|err| err.unwrap())
376 }
377}
378
379impl<P: NonNullPtr + Send> RcuOptionReadGuard<'_, P> {
380 /// Gets the reference of the protected data.
381 ///
382 /// If the RCU primitive protects nothing, this function returns `None`.
383 pub fn get(&self) -> Option<P::Ref<'_>> {
384 self.0.get()
385 }
386
387 /// Returns if the RCU primitive protects nothing when [`Rcu::read`] happens.
388 pub fn is_none(&self) -> bool {
389 self.0.obj_ptr.is_null()
390 }
391
392 /// Tries to replace the already read pointer with a new pointer
393 /// (or none).
394 ///
395 /// If another thread has updated the pointer after the read, this
396 /// function will fail, and returns the given pointer back. Otherwise,
397 /// it will replace the pointer with the new one and drop the old pointer
398 /// after the grace period.
399 ///
400 /// If spinning on [`RcuOption::read`] and this function, it is recommended
401 /// to relax the CPU or yield the task on failure. Otherwise contention
402 /// will occur.
403 ///
404 /// This API does not help to avoid
405 /// [the ABA problem](https://en.wikipedia.org/wiki/ABA_problem).
406 pub fn compare_exchange(self, new_ptr: Option<P>) -> Result<(), Option<P>> {
407 self.0.compare_exchange(new_ptr)
408 }
409}
410
411/// Delays the dropping of a [`NonNullPtr`] after the RCU grace period.
412///
413/// This is internally needed for implementing [`Rcu`] and [`RcuOption`]
414/// because we cannot alias a [`Box`]. Restoring `P` and use [`RcuDrop`] for it
415/// can lead to multiple [`Box`]es simultaneously pointing to the same
416/// content.
417///
418/// # Safety
419///
420/// The pointer must be previously returned by `into_raw`, will not be used
421/// after the end of the current grace period, and will only be dropped once.
422///
423/// [`Box`]: alloc::boxed::Box
424unsafe fn delay_drop<P: NonNullPtr + Send>(pointer: NonNull<<P as NonNullPtr>::Target>) {
425 struct ForceSend<P: NonNullPtr + Send>(NonNull<<P as NonNullPtr>::Target>);
426 // SAFETY: Sending a raw pointer to another task is safe as long as
427 // the pointer access in another task is safe (guaranteed by the trait
428 // bound `P: Send`).
429 unsafe impl<P: NonNullPtr + Send> Send for ForceSend<P> {}
430
431 let pointer: ForceSend<P> = ForceSend(pointer);
432
433 let rcu_monitor = RCU_MONITOR.get().unwrap();
434 rcu_monitor.after_grace_period(move || {
435 // This is necessary to make the Rust compiler to move the entire
436 // `ForceSend` structure into the closure.
437 let pointer = pointer;
438
439 // SAFETY:
440 // 1. The pointer was previously returned by `into_raw`.
441 // 2. The pointer won't be used anymore since the grace period has
442 // finished and this is the only time the pointer gets dropped.
443 let p = unsafe { <P as NonNullPtr>::from_raw(pointer.0) };
444 drop(p);
445 });
446}
447
448/// A wrapper to delay calling destructor of `T` after the RCU grace period.
449///
450/// Upon dropping this structure, a callback will be registered to the global
451/// RCU monitor and the destructor of `T` will be delayed until the callback.
452///
453/// [`RcuDrop<T>`] is guaranteed to have the same layout as `T`. You can also
454/// access the inner value safely via [`RcuDrop<T>`].
455#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
456#[repr(transparent)]
457pub struct RcuDrop<T: Send + 'static> {
458 value: ManuallyDrop<T>,
459}
460
461impl<T: Send + 'static> RcuDrop<T> {
462 /// Creates a new [`RcuDrop`] instance that delays the dropping of `value`.
463 pub fn new(value: T) -> Self {
464 Self {
465 value: ManuallyDrop::new(value),
466 }
467 }
468}
469
470impl<T: Send + 'static> Deref for RcuDrop<T> {
471 type Target = T;
472
473 fn deref(&self) -> &Self::Target {
474 &self.value
475 }
476}
477
478impl<T: Send + 'static> Drop for RcuDrop<T> {
479 fn drop(&mut self) {
480 // SAFETY: The `ManuallyDrop` will not be used after this point.
481 let taken = unsafe { ManuallyDrop::take(&mut self.value) };
482 let rcu_monitor = RCU_MONITOR.get().unwrap();
483 rcu_monitor.after_grace_period(|| {
484 drop(taken);
485 });
486 }
487}
488
489/// Finishes the current grace period.
490///
491/// This function is called when the current grace period on current CPU is
492/// finished. If this CPU is the last CPU to finish the current grace period,
493/// it takes all the current callbacks and invokes them.
494///
495/// # Safety
496///
497/// The caller must ensure that this CPU is not executing in a RCU read-side
498/// critical section.
499pub unsafe fn finish_grace_period() {
500 let rcu_monitor = RCU_MONITOR.get().unwrap();
501 // SAFETY: The caller ensures safety.
502 unsafe {
503 rcu_monitor.finish_grace_period();
504 }
505}
506
507static RCU_MONITOR: Once<RcuMonitor> = Once::new();
508
509pub fn init() {
510 RCU_MONITOR.call_once(RcuMonitor::new);
511}