std/sys/sync/once/
futex.rs

1use crate::cell::Cell;
2use crate::sync as public;
3use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use crate::sync::poison::once::ExclusiveState;
5use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake_all};
6
7// On some platforms, the OS is very nice and handles the waiter queue for us.
8// This means we only need one atomic value with 4 states:
9
10/// No initialization has run yet, and no thread is currently using the Once.
11const INCOMPLETE: Primitive = 3;
12/// Some thread has previously attempted to initialize the Once, but it panicked,
13/// so the Once is now poisoned. There are no other threads currently accessing
14/// this Once.
15const POISONED: Primitive = 2;
16/// Some thread is currently attempting to run initialization. It may succeed,
17/// so all future threads need to wait for it to finish.
18const RUNNING: Primitive = 1;
19/// Initialization has completed and all future calls should finish immediately.
20/// By choosing this state as the all-zero state the `is_completed` check can be
21/// a bit faster on some platforms.
22const COMPLETE: Primitive = 0;
23
24// An additional bit indicates whether there are waiting threads:
25
26/// May only be set if the state is not COMPLETE.
27const QUEUED: Primitive = 4;
28
29// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
30// variable. When the running thread finishes, it will wake all waiting threads using
31// `futex_wake_all`.
32
33const STATE_MASK: Primitive = 0b11;
34
35pub struct OnceState {
36    poisoned: bool,
37    set_state_to: Cell<Primitive>,
38}
39
40impl OnceState {
41    #[inline]
42    pub fn is_poisoned(&self) -> bool {
43        self.poisoned
44    }
45
46    #[inline]
47    pub fn poison(&self) {
48        self.set_state_to.set(POISONED);
49    }
50}
51
52struct CompletionGuard<'a> {
53    state_and_queued: &'a Futex,
54    set_state_on_drop_to: Primitive,
55}
56
57impl<'a> Drop for CompletionGuard<'a> {
58    fn drop(&mut self) {
59        // Use release ordering to propagate changes to all threads checking
60        // up on the Once. `futex_wake_all` does its own synchronization, hence
61        // we do not need `AcqRel`.
62        if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
63            futex_wake_all(self.state_and_queued);
64        }
65    }
66}
67
68pub struct Once {
69    state_and_queued: Futex,
70}
71
72impl Once {
73    #[inline]
74    pub const fn new() -> Once {
75        Once { state_and_queued: Futex::new(INCOMPLETE) }
76    }
77
78    #[inline]
79    pub fn is_completed(&self) -> bool {
80        // Use acquire ordering to make all initialization changes visible to the
81        // current thread.
82        self.state_and_queued.load(Acquire) == COMPLETE
83    }
84
85    #[inline]
86    pub(crate) fn state(&mut self) -> ExclusiveState {
87        match *self.state_and_queued.get_mut() {
88            INCOMPLETE => ExclusiveState::Incomplete,
89            POISONED => ExclusiveState::Poisoned,
90            COMPLETE => ExclusiveState::Complete,
91            _ => unreachable!("invalid Once state"),
92        }
93    }
94
95    #[inline]
96    pub(crate) fn set_state(&mut self, new_state: ExclusiveState) {
97        *self.state_and_queued.get_mut() = match new_state {
98            ExclusiveState::Incomplete => INCOMPLETE,
99            ExclusiveState::Poisoned => POISONED,
100            ExclusiveState::Complete => COMPLETE,
101        };
102    }
103
104    #[cold]
105    #[track_caller]
106    pub fn wait(&self, ignore_poisoning: bool) {
107        let mut state_and_queued = self.state_and_queued.load(Acquire);
108        loop {
109            let state = state_and_queued & STATE_MASK;
110            let queued = state_and_queued & QUEUED != 0;
111            match state {
112                COMPLETE => return,
113                POISONED if !ignore_poisoning => {
114                    // Panic to propagate the poison.
115                    panic!("Once instance has previously been poisoned");
116                }
117                _ => {
118                    // Set the QUEUED bit if it has not already been set.
119                    if !queued {
120                        state_and_queued += QUEUED;
121                        if let Err(new) = self.state_and_queued.compare_exchange_weak(
122                            state,
123                            state_and_queued,
124                            Relaxed,
125                            Acquire,
126                        ) {
127                            state_and_queued = new;
128                            continue;
129                        }
130                    }
131
132                    futex_wait(&self.state_and_queued, state_and_queued, None);
133                    state_and_queued = self.state_and_queued.load(Acquire);
134                }
135            }
136        }
137    }
138
139    #[cold]
140    #[track_caller]
141    pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
142        let mut state_and_queued = self.state_and_queued.load(Acquire);
143        loop {
144            let state = state_and_queued & STATE_MASK;
145            let queued = state_and_queued & QUEUED != 0;
146            match state {
147                COMPLETE => return,
148                POISONED if !ignore_poisoning => {
149                    // Panic to propagate the poison.
150                    panic!("Once instance has previously been poisoned");
151                }
152                INCOMPLETE | POISONED => {
153                    // Try to register the current thread as the one running.
154                    let next = RUNNING + if queued { QUEUED } else { 0 };
155                    if let Err(new) = self.state_and_queued.compare_exchange_weak(
156                        state_and_queued,
157                        next,
158                        Acquire,
159                        Acquire,
160                    ) {
161                        state_and_queued = new;
162                        continue;
163                    }
164
165                    // `waiter_queue` will manage other waiting threads, and
166                    // wake them up on drop.
167                    let mut waiter_queue = CompletionGuard {
168                        state_and_queued: &self.state_and_queued,
169                        set_state_on_drop_to: POISONED,
170                    };
171                    // Run the function, letting it know if we're poisoned or not.
172                    let f_state = public::OnceState {
173                        inner: OnceState {
174                            poisoned: state == POISONED,
175                            set_state_to: Cell::new(COMPLETE),
176                        },
177                    };
178                    f(&f_state);
179                    waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
180                    return;
181                }
182                _ => {
183                    // All other values must be RUNNING.
184                    assert!(state == RUNNING);
185
186                    // Set the QUEUED bit if it is not already set.
187                    if !queued {
188                        state_and_queued += QUEUED;
189                        if let Err(new) = self.state_and_queued.compare_exchange_weak(
190                            state,
191                            state_and_queued,
192                            Relaxed,
193                            Acquire,
194                        ) {
195                            state_and_queued = new;
196                            continue;
197                        }
198                    }
199
200                    futex_wait(&self.state_and_queued, state_and_queued, None);
201                    state_and_queued = self.state_and_queued.load(Acquire);
202                }
203            }
204        }
205    }
206}