Skip to content

Commit

Permalink
Fix an async signal delivery flake on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
jart committed Oct 2, 2024
1 parent e4d6eb3 commit 85c58be
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 46 deletions.
107 changes: 82 additions & 25 deletions libc/calls/sig.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,22 @@ static textwindows wontreturn void __sig_terminate(int sig) {
TerminateThisProcess(sig);
}

textwindows static void __sig_wake(struct PosixThread *pt, int sig) {
textwindows static bool __sig_wake(struct PosixThread *pt, int sig) {
atomic_int *blocker;
blocker = atomic_load_explicit(&pt->pt_blocker, memory_order_acquire);
if (!blocker)
return;
return false;
// threads can create semaphores on an as-needed basis
if (blocker == PT_BLOCKER_EVENT) {
STRACE("%G set %d's event object", sig, _pthread_tid(pt));
SetEvent(pt->pt_event);
return;
return !!atomic_load_explicit(&pt->pt_blocker, memory_order_acquire);
}
// all other blocking ops that aren't overlap should use futexes
// we force restartable futexes to churn by waking w/o releasing
STRACE("%G waking %d's futex", sig, _pthread_tid(pt));
WakeByAddressSingle(blocker);
return !!atomic_load_explicit(&pt->pt_blocker, memory_order_acquire);
}

textwindows static bool __sig_start(struct PosixThread *pt, int sig,
Expand Down Expand Up @@ -302,17 +303,48 @@ static textwindows int __sig_killer(struct PosixThread *pt, int sig, int sic) {
return 0;
}

// we can't preempt threads that masked sigs or are blocked. we also
// need to ensure we don't overflow the target thread's stack if many
// signals need to be delivered at once. we also need to make sure two
// threads can't deadlock by killing each other at the same time.
if ((atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire) &
(1ull << (sig - 1))) ||
atomic_exchange_explicit(&pt->pt_intoff, 1, memory_order_acquire)) {
atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1),
memory_order_relaxed);
__sig_wake(pt, sig);
return 0;
// we can't preempt threads that masked sigs or are blocked on i/o
while ((atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire) &
(1ull << (sig - 1)))) {
if (atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1),
memory_order_acq_rel) &
(1ull << (sig - 1)))
// we believe signal was already enqueued
return 0;
if (__sig_wake(pt, sig))
// we believe i/o routine will handle signal
return 0;
if (atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire) &
(1ull << (sig - 1)))
// we believe ALLOW_SIGNALS will handle signal
return 0;
if (!(atomic_fetch_and_explicit(&pt->tib->tib_sigpending,
~(1ull << (sig - 1)),
memory_order_acq_rel) &
(1ull << (sig - 1))))
// we believe another thread sniped our signal
return 0;
break;
}

// avoid race conditions and deadlocks with thread suspend process
if (atomic_exchange_explicit(&pt->pt_intoff, 1, memory_order_acquire)) {
// we believe another thread is asynchronously waking the mark
if (atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1),
memory_order_acq_rel) &
(1ull << (sig - 1)))
// we believe our signal is already being delivered
return 0;
if (atomic_load_explicit(&pt->pt_intoff, memory_order_acquire) ||
atomic_exchange_explicit(&pt->pt_intoff, 1, memory_order_acquire))
// we believe __sig_tramp will deliver our signal
return 0;
if (!(atomic_fetch_and_explicit(&pt->tib->tib_sigpending,
~(1ull << (sig - 1)),
memory_order_acq_rel) &
(1ull << (sig - 1))))
// we believe another thread sniped our signal
return 0;
}

// if there's no handler then killing a thread kills the process
Expand All @@ -321,17 +353,10 @@ static textwindows int __sig_killer(struct PosixThread *pt, int sig, int sic) {
__sig_terminate(sig);
}

// ignore signals already pending
uintptr_t th = _pthread_syshand(pt);
if (atomic_load_explicit(&pt->tib->tib_sigpending, memory_order_acquire) &
(1ull << (sig - 1))) {
atomic_store_explicit(&pt->pt_intoff, 0, memory_order_release);
return 0;
}

// take control of thread
// suspending the thread happens asynchronously
// however getting the context blocks until it's frozen
uintptr_t th = _pthread_syshand(pt);
if (SuspendThread(th) == -1u) {
STRACE("SuspendThread failed w/ %d", GetLastError());
atomic_store_explicit(&pt->pt_intoff, 0, memory_order_release);
Expand All @@ -349,9 +374,7 @@ static textwindows int __sig_killer(struct PosixThread *pt, int sig, int sic) {
// we can't preempt threads that masked sig or are blocked
// we can't preempt threads that are running in win32 code
// so we shall unblock the thread and let it signal itself
if ((atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire) &
(1ull << (sig - 1))) ||
!((uintptr_t)__executable_start <= nc.Rip &&
if (!((uintptr_t)__executable_start <= nc.Rip &&
nc.Rip < (uintptr_t)__privileged_start)) {
atomic_fetch_or_explicit(&pt->tib->tib_sigpending, 1ull << (sig - 1),
memory_order_relaxed);
Expand Down Expand Up @@ -634,6 +657,7 @@ textwindows dontinstrument static uint32_t __sig_worker(void *arg) {
__maps_track((char *)(((uintptr_t)sp + __pagesize - 1) & -__pagesize) - STKSZ,
STKSZ);
for (;;) {

// dequeue all pending signals and fire them off. if there's no
// thread that can handle them then __sig_generate will requeue
// those signals back to __sig.process; hence the need for xchg
Expand All @@ -644,6 +668,39 @@ textwindows dontinstrument static uint32_t __sig_worker(void *arg) {
sigs &= ~(1ull << (sig - 1));
__sig_generate(sig, SI_KERNEL);
}

// unblock stalled asynchronous signals in threads
_pthread_lock();
for (struct Dll *e = dll_first(_pthread_list); e;
e = dll_next(_pthread_list, e)) {
struct PosixThread *pt = POSIXTHREAD_CONTAINER(e);
if (atomic_load_explicit(&pt->pt_status, memory_order_acquire) >=
kPosixThreadTerminated) {
break;
}
sigset_t pending =
atomic_load_explicit(&pt->tib->tib_sigpending, memory_order_acquire);
sigset_t mask =
atomic_load_explicit(&pt->tib->tib_sigmask, memory_order_acquire);
if (pending & ~mask) {
_pthread_ref(pt);
_pthread_unlock();
while (!atomic_compare_exchange_weak_explicit(
&pt->tib->tib_sigpending, &pending, pending & ~mask,
memory_order_acq_rel, memory_order_relaxed)) {
}
while ((pending = pending & ~mask)) {
int sig = bsfl(pending) + 1;
pending &= ~(1ull << (sig - 1));
__sig_killer(pt, sig, SI_KERNEL);
}
_pthread_lock();
_pthread_unref(pt);
}
}
_pthread_unlock();

// wait until next scheduler quantum
Sleep(POLL_INTERVAL_MS);
}
return 0;
Expand Down
46 changes: 25 additions & 21 deletions test/posix/signal_latency_async_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,15 @@ pthread_t sender_thread;
pthread_t receiver_thread;
struct timespec send_time;
atomic_int sender_got_signal;
atomic_int receiver_got_signal;
double latencies[ITERATIONS];

void sender_signal_handler(int signo) {
sender_got_signal = 1;
}

void receiver_signal_handler(int signo) {
struct timespec receive_time;
clock_gettime(CLOCK_MONOTONIC, &receive_time);

long sec_diff = receive_time.tv_sec - send_time.tv_sec;
long nsec_diff = receive_time.tv_nsec - send_time.tv_nsec;
double latency_ns = sec_diff * 1e9 + nsec_diff;

static int iteration = 0;
if (iteration < ITERATIONS)
latencies[iteration++] = latency_ns;

// Pong sender
if (pthread_kill(sender_thread, SIGUSR2))
exit(2);

// Exit if done
if (iteration >= ITERATIONS)
pthread_exit(0);
receiver_got_signal = 1;
}

void *sender_func(void *arg) {
Expand Down Expand Up @@ -84,9 +68,29 @@ void *sender_func(void *arg) {
void *receiver_func(void *arg) {

// Wait for asynchronous signals
volatile unsigned v = 0;
for (;;)
++v;
for (;;) {
if (atomic_exchange_explicit(&receiver_got_signal, 0,
memory_order_acq_rel)) {
struct timespec receive_time;
clock_gettime(CLOCK_MONOTONIC, &receive_time);

long sec_diff = receive_time.tv_sec - send_time.tv_sec;
long nsec_diff = receive_time.tv_nsec - send_time.tv_nsec;
double latency_ns = sec_diff * 1e9 + nsec_diff;

static int iteration = 0;
if (iteration < ITERATIONS)
latencies[iteration++] = latency_ns;

// Pong sender
if (pthread_kill(sender_thread, SIGUSR2))
exit(2);

// Exit if done
if (iteration >= ITERATIONS)
pthread_exit(0);
}
}

return 0;
}
Expand Down

0 comments on commit 85c58be

Please sign in to comment.