Skip to content

Commit 7820135

Browse files
committed
Auto merge of #76919 - fusion-engineering-forks:thread-parker, r=dtolnay
Use futex-based thread::park/unpark on Linux. This moves the parking/unparking logic out of `thread/mod.rs` into a module named `thread_parker` in `sys_common`. The current implementation is moved to `sys_common/thread_parker/generic.rs` and the new implementation using futexes is added in `sys_common/thread_parker/futex.rs`.
2 parents 9cba260 + 0b73fd7 commit 7820135

File tree

7 files changed

+276
-112
lines changed

7 files changed

+276
-112
lines changed

library/std/src/sys/unix/futex.rs

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#![cfg(any(target_os = "linux", target_os = "android"))]
2+
3+
use crate::convert::TryInto;
4+
use crate::ptr::null;
5+
use crate::sync::atomic::AtomicI32;
6+
use crate::time::Duration;
7+
8+
pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) {
9+
let timespec = timeout.and_then(|d| {
10+
Some(libc::timespec {
11+
// Sleep forever if the timeout is longer than fits in a timespec.
12+
tv_sec: d.as_secs().try_into().ok()?,
13+
// This conversion never truncates, as subsec_nanos is always <1e9.
14+
tv_nsec: d.subsec_nanos() as _,
15+
})
16+
});
17+
unsafe {
18+
libc::syscall(
19+
libc::SYS_futex,
20+
futex as *const AtomicI32,
21+
libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG,
22+
expected,
23+
timespec.as_ref().map_or(null(), |d| d as *const libc::timespec),
24+
);
25+
}
26+
}
27+
28+
pub fn futex_wake(futex: &AtomicI32) {
29+
unsafe {
30+
libc::syscall(
31+
libc::SYS_futex,
32+
futex as *const AtomicI32,
33+
libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
34+
1,
35+
);
36+
}
37+
}

library/std/src/sys/unix/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub mod env;
4949
pub mod ext;
5050
pub mod fd;
5151
pub mod fs;
52+
pub mod futex;
5253
pub mod io;
5354
#[cfg(target_os = "l4re")]
5455
mod l4re;

library/std/src/sys_common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub mod thread;
6666
pub mod thread_info;
6767
pub mod thread_local_dtor;
6868
pub mod thread_local_key;
69+
pub mod thread_parker;
6970
pub mod util;
7071
pub mod wtf8;
7172

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use crate::sync::atomic::AtomicI32;
2+
use crate::sync::atomic::Ordering::{Acquire, Release};
3+
use crate::sys::futex::{futex_wait, futex_wake};
4+
use crate::time::Duration;
5+
6+
const PARKED: i32 = -1;
7+
const EMPTY: i32 = 0;
8+
const NOTIFIED: i32 = 1;
9+
10+
pub struct Parker {
11+
state: AtomicI32,
12+
}
13+
14+
// Notes about memory ordering:
15+
//
16+
// Memory ordering is only relevant for the relative ordering of operations
17+
// between different variables. Even Ordering::Relaxed guarantees a
18+
// monotonic/consistent order when looking at just a single atomic variable.
19+
//
20+
// So, since this parker is just a single atomic variable, we only need to look
21+
// at the ordering guarantees we need to provide to the 'outside world'.
22+
//
23+
// The only memory ordering guarantee that parking and unparking provide, is
24+
// that things which happened before unpark() are visible on the thread
25+
// returning from park() afterwards. Otherwise, it was effectively unparked
26+
// before unpark() was called while still consuming the 'token'.
27+
//
28+
// In other words, unpark() needs to synchronize with the part of park() that
29+
// consumes the token and returns.
30+
//
31+
// This is done with a release-acquire synchronization, by using
32+
// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
33+
// Ordering::Acquire when checking for this state in park().
34+
impl Parker {
35+
#[inline]
36+
pub const fn new() -> Self {
37+
Parker { state: AtomicI32::new(EMPTY) }
38+
}
39+
40+
// Assumes this is only called by the thread that owns the Parker,
41+
// which means that `self.state != PARKED`.
42+
pub unsafe fn park(&self) {
43+
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
44+
// first case.
45+
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
46+
return;
47+
}
48+
loop {
49+
// Wait for something to happen, assuming it's still set to PARKED.
50+
futex_wait(&self.state, PARKED, None);
51+
// Change NOTIFIED=>EMPTY and return in that case.
52+
if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED {
53+
return;
54+
} else {
55+
// Spurious wake up. We loop to try again.
56+
}
57+
}
58+
}
59+
60+
// Assumes this is only called by the thread that owns the Parker,
61+
// which means that `self.state != PARKED`.
62+
pub unsafe fn park_timeout(&self, timeout: Duration) {
63+
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
64+
// first case.
65+
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
66+
return;
67+
}
68+
// Wait for something to happen, assuming it's still set to PARKED.
69+
futex_wait(&self.state, PARKED, Some(timeout));
70+
// This is not just a store, because we need to establish a
71+
// release-acquire ordering with unpark().
72+
if self.state.swap(EMPTY, Acquire) == NOTIFIED {
73+
// Woke up because of unpark().
74+
} else {
75+
// Timeout or spurious wake up.
76+
// We return either way, because we can't easily tell if it was the
77+
// timeout or not.
78+
}
79+
}
80+
81+
#[inline]
82+
pub fn unpark(&self) {
83+
// Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
84+
// wake the thread in the first case.
85+
//
86+
// Note that even NOTIFIED=>NOTIFIED results in a write. This is on
87+
// purpose, to make sure every unpark() has a release-acquire ordering
88+
// with park().
89+
if self.state.swap(NOTIFIED, Release) == PARKED {
90+
futex_wake(&self.state);
91+
}
92+
}
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//! Parker implementaiton based on a Mutex and Condvar.
2+
3+
use crate::sync::atomic::AtomicUsize;
4+
use crate::sync::atomic::Ordering::SeqCst;
5+
use crate::sync::{Condvar, Mutex};
6+
use crate::time::Duration;
7+
8+
const EMPTY: usize = 0;
9+
const PARKED: usize = 1;
10+
const NOTIFIED: usize = 2;
11+
12+
pub struct Parker {
13+
state: AtomicUsize,
14+
lock: Mutex<()>,
15+
cvar: Condvar,
16+
}
17+
18+
impl Parker {
19+
pub fn new() -> Self {
20+
Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
21+
}
22+
23+
// This implementaiton doesn't require `unsafe`, but other implementations
24+
// may assume this is only called by the thread that owns the Parker.
25+
pub unsafe fn park(&self) {
26+
// If we were previously notified then we consume this notification and
27+
// return quickly.
28+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
29+
return;
30+
}
31+
32+
// Otherwise we need to coordinate going to sleep
33+
let mut m = self.lock.lock().unwrap();
34+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
35+
Ok(_) => {}
36+
Err(NOTIFIED) => {
37+
// We must read here, even though we know it will be `NOTIFIED`.
38+
// This is because `unpark` may have been called again since we read
39+
// `NOTIFIED` in the `compare_exchange` above. We must perform an
40+
// acquire operation that synchronizes with that `unpark` to observe
41+
// any writes it made before the call to unpark. To do that we must
42+
// read from the write it made to `state`.
43+
let old = self.state.swap(EMPTY, SeqCst);
44+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
45+
return;
46+
} // should consume this notification, so prohibit spurious wakeups in next park.
47+
Err(_) => panic!("inconsistent park state"),
48+
}
49+
loop {
50+
m = self.cvar.wait(m).unwrap();
51+
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
52+
Ok(_) => return, // got a notification
53+
Err(_) => {} // spurious wakeup, go back to sleep
54+
}
55+
}
56+
}
57+
58+
// This implementaiton doesn't require `unsafe`, but other implementations
59+
// may assume this is only called by the thread that owns the Parker.
60+
pub unsafe fn park_timeout(&self, dur: Duration) {
61+
// Like `park` above we have a fast path for an already-notified thread, and
62+
// afterwards we start coordinating for a sleep.
63+
// return quickly.
64+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
65+
return;
66+
}
67+
let m = self.lock.lock().unwrap();
68+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
69+
Ok(_) => {}
70+
Err(NOTIFIED) => {
71+
// We must read again here, see `park`.
72+
let old = self.state.swap(EMPTY, SeqCst);
73+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
74+
return;
75+
} // should consume this notification, so prohibit spurious wakeups in next park.
76+
Err(_) => panic!("inconsistent park_timeout state"),
77+
}
78+
79+
// Wait with a timeout, and if we spuriously wake up or otherwise wake up
80+
// from a notification we just want to unconditionally set the state back to
81+
// empty, either consuming a notification or un-flagging ourselves as
82+
// parked.
83+
let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
84+
match self.state.swap(EMPTY, SeqCst) {
85+
NOTIFIED => {} // got a notification, hurray!
86+
PARKED => {} // no notification, alas
87+
n => panic!("inconsistent park_timeout state: {}", n),
88+
}
89+
}
90+
91+
pub fn unpark(&self) {
92+
// To ensure the unparked thread will observe any writes we made
93+
// before this call, we must perform a release operation that `park`
94+
// can synchronize with. To do that we must write `NOTIFIED` even if
95+
// `state` is already `NOTIFIED`. That is why this must be a swap
96+
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
97+
// on failure.
98+
match self.state.swap(NOTIFIED, SeqCst) {
99+
EMPTY => return, // no one was waiting
100+
NOTIFIED => return, // already unparked
101+
PARKED => {} // gotta go wake someone up
102+
_ => panic!("inconsistent state in unpark"),
103+
}
104+
105+
// There is a period between when the parked thread sets `state` to
106+
// `PARKED` (or last checked `state` in the case of a spurious wake
107+
// up) and when it actually waits on `cvar`. If we were to notify
108+
// during this period it would be ignored and then when the parked
109+
// thread went to sleep it would never wake up. Fortunately, it has
110+
// `lock` locked at this stage so we can acquire `lock` to wait until
111+
// it is ready to receive the notification.
112+
//
113+
// Releasing `lock` before the call to `notify_one` means that when the
114+
// parked thread wakes it doesn't get woken only to have to wait for us
115+
// to release `lock`.
116+
drop(self.lock.lock().unwrap());
117+
self.cvar.notify_one()
118+
}
119+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
cfg_if::cfg_if! {
2+
if #[cfg(any(target_os = "linux", target_os = "android"))] {
3+
mod futex;
4+
pub use futex::Parker;
5+
} else {
6+
mod generic;
7+
pub use generic::Parker;
8+
}
9+
}

0 commit comments

Comments
 (0)