std/sys/pal/unix/
futex.rs

1#![cfg(any(
2    target_os = "linux",
3    target_os = "android",
4    all(target_os = "emscripten", target_feature = "atomics"),
5    target_os = "freebsd",
6    target_os = "openbsd",
7    target_os = "dragonfly",
8    target_os = "fuchsia",
9))]
10
11use crate::sync::atomic::Atomic;
12use crate::time::Duration;
13
14/// An atomic for use as a futex that is at least 32-bits but may be larger
15pub type Futex = Atomic<Primitive>;
16/// Must be the underlying type of Futex
17pub type Primitive = u32;
18
19/// An atomic for use as a futex that is at least 8-bits but may be larger.
20pub type SmallFutex = Atomic<SmallPrimitive>;
21/// Must be the underlying type of SmallFutex
22pub type SmallPrimitive = u32;
23
24/// Waits for a `futex_wake` operation to wake us.
25///
26/// Returns directly if the futex doesn't hold the expected value.
27///
28/// Returns false on timeout, and true in all other cases.
29#[cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd"))]
30pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
31    use super::time::Timespec;
32    use crate::ptr::null;
33    use crate::sync::atomic::Ordering::Relaxed;
34
35    // Calculate the timeout as an absolute timespec.
36    //
37    // Overflows are rounded up to an infinite timeout (None).
38    let timespec = timeout
39        .and_then(|d| Timespec::now(libc::CLOCK_MONOTONIC).checked_add_duration(&d))
40        .and_then(|t| t.to_timespec());
41
42    loop {
43        // No need to wait if the value already changed.
44        if futex.load(Relaxed) != expected {
45            return true;
46        }
47
48        let r = unsafe {
49            cfg_select! {
50                target_os = "freebsd" => {
51                    // FreeBSD doesn't have futex(), but it has
52                    // _umtx_op(UMTX_OP_WAIT_UINT_PRIVATE), which is nearly
53                    // identical. It supports absolute timeouts through a flag
54                    // in the _umtx_time struct.
55                    let umtx_timeout = timespec.map(|t| libc::_umtx_time {
56                        _timeout: t,
57                        _flags: libc::UMTX_ABSTIME,
58                        _clockid: libc::CLOCK_MONOTONIC as u32,
59                    });
60                    let umtx_timeout_ptr = umtx_timeout.as_ref().map_or(null(), |t| t as *const _);
61                    let umtx_timeout_size = umtx_timeout.as_ref().map_or(0, |t| size_of_val(t));
62                    libc::_umtx_op(
63                        futex as *const Atomic<u32> as *mut _,
64                        libc::UMTX_OP_WAIT_UINT_PRIVATE,
65                        expected as libc::c_ulong,
66                        crate::ptr::without_provenance_mut(umtx_timeout_size),
67                        umtx_timeout_ptr as *mut _,
68                    )
69                }
70                any(target_os = "linux", target_os = "android") => {
71                    // Use FUTEX_WAIT_BITSET rather than FUTEX_WAIT to be able to give an
72                    // absolute time rather than a relative time.
73                    libc::syscall(
74                        libc::SYS_futex,
75                        futex as *const Atomic<u32>,
76                        libc::FUTEX_WAIT_BITSET | libc::FUTEX_PRIVATE_FLAG,
77                        expected,
78                        timespec.as_ref().map_or(null(), |t| t as *const libc::timespec),
79                        null::<u32>(), // This argument is unused for FUTEX_WAIT_BITSET.
80                        !0u32,         // A full bitmask, to make it behave like a regular FUTEX_WAIT.
81                    )
82                }
83                _ => {
84                    compile_error!("unknown target_os");
85                }
86            }
87        };
88
89        match (r < 0).then(super::os::errno) {
90            Some(libc::ETIMEDOUT) => return false,
91            Some(libc::EINTR) => continue,
92            _ => return true,
93        }
94    }
95}
96
97/// Wakes up one thread that's blocked on `futex_wait` on this futex.
98///
99/// Returns true if this actually woke up such a thread,
100/// or false if no thread was waiting on this futex.
101///
102/// On some platforms, this always returns false.
103#[cfg(any(target_os = "linux", target_os = "android"))]
104pub fn futex_wake(futex: &Atomic<u32>) -> bool {
105    let ptr = futex as *const Atomic<u32>;
106    let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG;
107    unsafe { libc::syscall(libc::SYS_futex, ptr, op, 1) > 0 }
108}
109
110/// Wakes up all threads that are waiting on `futex_wait` on this futex.
111#[cfg(any(target_os = "linux", target_os = "android"))]
112pub fn futex_wake_all(futex: &Atomic<u32>) {
113    let ptr = futex as *const Atomic<u32>;
114    let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG;
115    unsafe {
116        libc::syscall(libc::SYS_futex, ptr, op, i32::MAX);
117    }
118}
119
120// FreeBSD doesn't tell us how many threads are woken up, so this always returns false.
121#[cfg(target_os = "freebsd")]
122pub fn futex_wake(futex: &Atomic<u32>) -> bool {
123    use crate::ptr::null_mut;
124    unsafe {
125        libc::_umtx_op(
126            futex as *const Atomic<u32> as *mut _,
127            libc::UMTX_OP_WAKE_PRIVATE,
128            1,
129            null_mut(),
130            null_mut(),
131        )
132    };
133    false
134}
135
136#[cfg(target_os = "freebsd")]
137pub fn futex_wake_all(futex: &Atomic<u32>) {
138    use crate::ptr::null_mut;
139    unsafe {
140        libc::_umtx_op(
141            futex as *const Atomic<u32> as *mut _,
142            libc::UMTX_OP_WAKE_PRIVATE,
143            i32::MAX as libc::c_ulong,
144            null_mut(),
145            null_mut(),
146        )
147    };
148}
149
150#[cfg(target_os = "openbsd")]
151pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
152    use super::time::Timespec;
153    use crate::ptr::{null, null_mut};
154
155    // Overflows are rounded up to an infinite timeout (None).
156    let timespec = timeout
157        .and_then(|d| Timespec::zero().checked_add_duration(&d))
158        .and_then(|t| t.to_timespec());
159
160    let r = unsafe {
161        libc::futex(
162            futex as *const Atomic<u32> as *mut u32,
163            libc::FUTEX_WAIT,
164            expected as i32,
165            timespec.as_ref().map_or(null(), |t| t as *const libc::timespec),
166            null_mut(),
167        )
168    };
169
170    r == 0 || super::os::errno() != libc::ETIMEDOUT
171}
172
173#[cfg(target_os = "openbsd")]
174pub fn futex_wake(futex: &Atomic<u32>) -> bool {
175    use crate::ptr::{null, null_mut};
176    unsafe {
177        libc::futex(
178            futex as *const Atomic<u32> as *mut u32,
179            libc::FUTEX_WAKE,
180            1,
181            null(),
182            null_mut(),
183        ) > 0
184    }
185}
186
187#[cfg(target_os = "openbsd")]
188pub fn futex_wake_all(futex: &Atomic<u32>) {
189    use crate::ptr::{null, null_mut};
190    unsafe {
191        libc::futex(
192            futex as *const Atomic<u32> as *mut u32,
193            libc::FUTEX_WAKE,
194            i32::MAX,
195            null(),
196            null_mut(),
197        );
198    }
199}
200
201#[cfg(target_os = "dragonfly")]
202pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
203    // A timeout of 0 means infinite.
204    // We round smaller timeouts up to 1 millisecond.
205    // Overflows are rounded up to an infinite timeout.
206    let timeout_ms =
207        timeout.and_then(|d| Some(i32::try_from(d.as_millis()).ok()?.max(1))).unwrap_or(0);
208
209    let r = unsafe {
210        libc::umtx_sleep(futex as *const Atomic<u32> as *const i32, expected as i32, timeout_ms)
211    };
212
213    r == 0 || super::os::errno() != libc::ETIMEDOUT
214}
215
216// DragonflyBSD doesn't tell us how many threads are woken up, so this always returns false.
217#[cfg(target_os = "dragonfly")]
218pub fn futex_wake(futex: &Atomic<u32>) -> bool {
219    unsafe { libc::umtx_wakeup(futex as *const Atomic<u32> as *const i32, 1) };
220    false
221}
222
223#[cfg(target_os = "dragonfly")]
224pub fn futex_wake_all(futex: &Atomic<u32>) {
225    unsafe { libc::umtx_wakeup(futex as *const Atomic<u32> as *const i32, i32::MAX) };
226}
227
228#[cfg(target_os = "emscripten")]
229unsafe extern "C" {
230    fn emscripten_futex_wake(addr: *const Atomic<u32>, count: libc::c_int) -> libc::c_int;
231    fn emscripten_futex_wait(
232        addr: *const Atomic<u32>,
233        val: libc::c_uint,
234        max_wait_ms: libc::c_double,
235    ) -> libc::c_int;
236}
237
238#[cfg(target_os = "emscripten")]
239pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
240    unsafe {
241        emscripten_futex_wait(
242            futex,
243            expected,
244            timeout.map_or(f64::INFINITY, |d| d.as_secs_f64() * 1000.0),
245        ) != -libc::ETIMEDOUT
246    }
247}
248
249#[cfg(target_os = "emscripten")]
250pub fn futex_wake(futex: &Atomic<u32>) -> bool {
251    unsafe { emscripten_futex_wake(futex, 1) > 0 }
252}
253
254#[cfg(target_os = "emscripten")]
255pub fn futex_wake_all(futex: &Atomic<u32>) {
256    unsafe { emscripten_futex_wake(futex, i32::MAX) };
257}
258
259#[cfg(target_os = "fuchsia")]
260pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
261    use super::fuchsia::*;
262
263    // Sleep forever if the timeout is longer than fits in a i64.
264    let deadline = timeout
265        .and_then(|d| i64::try_from(d.as_nanos()).ok()?.checked_add(zx_clock_get_monotonic()))
266        .unwrap_or(ZX_TIME_INFINITE);
267
268    unsafe {
269        zx_futex_wait(futex, zx_futex_t::new(expected), ZX_HANDLE_INVALID, deadline)
270            != ZX_ERR_TIMED_OUT
271    }
272}
273
274// Fuchsia doesn't tell us how many threads are woken up, so this always returns false.
275#[cfg(target_os = "fuchsia")]
276pub fn futex_wake(futex: &Atomic<u32>) -> bool {
277    unsafe { super::fuchsia::zx_futex_wake(futex, 1) };
278    false
279}
280
281#[cfg(target_os = "fuchsia")]
282pub fn futex_wake_all(futex: &Atomic<u32>) {
283    unsafe { super::fuchsia::zx_futex_wake(futex, u32::MAX) };
284}