Skip to content

Commit

Permalink
runtime: prevent send on closed channel in wakeableSleep
Browse files Browse the repository at this point in the history
Currently wakeableSleep has a race where, although stopTimer is called,
the timer could be queued already and fire *after* the wakeup channel is
closed.

Fix this by protecting wakeup with a lock used on the close and wake
paths and assigning the wakeup to nil on close. The wake path then
ignores a nil wakeup channel. This fixes the problem by ensuring that a
failure to stop the timer only results in the timer doing nothing,
rather than trying to send on a closed channel.

The addition of this lock requires some changes to the static lock
ranking system.

Thiere's also a second problem here: the timer could be delayed far
enough into the future that when it fires, it observes a non-nil wakeup
if the wakeableSleep has been re-initialized and reset.

Fix this problem too  by allocating the wakeableSleep on the heap and
creating a new one instead of reinitializing the old one. The GC will
make sure that the reference to the old one stays alive for the timer to
fire, but that timer firing won't cause a spurious wakeup in the new
one.

Change-Id: I2b979304e755c015d4466991f135396f6a271069
Reviewed-on: https://go-review.googlesource.com/c/go/+/542335
Reviewed-by: Michael Pratt <[email protected]>
Commit-Queue: Michael Knyszek <[email protected]>
Auto-Submit: Michael Knyszek <[email protected]>
TryBot-Result: Gopher Robot <[email protected]>
Run-TryBot: Michael Knyszek <[email protected]>
  • Loading branch information
mknyszek authored and gopherbot committed Nov 14, 2023
1 parent 67d555e commit 3073f3f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 19 deletions.
3 changes: 3 additions & 0 deletions src/runtime/lockrank.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/runtime/mklockrank.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ assistQueue,
< sched;
sched < allg, allp;
allp < timers;
timers < wakeableSleep;
timers < netpollInit;
# Channels
Expand Down
62 changes: 44 additions & 18 deletions src/runtime/trace2.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ var trace struct {
cpuLogRead *profBuf
signalLock atomic.Uint32 // protects use of the following member, only usable in signal handlers
cpuLogWrite atomic.Pointer[profBuf] // copy of cpuLogRead for use in signal handlers, set without signalLock
cpuSleep wakeableSleep
cpuSleep *wakeableSleep
cpuLogDone <-chan struct{}
cpuBuf [2]*traceBuf

Expand Down Expand Up @@ -856,15 +856,15 @@ func traceReaderAvailable() *g {
var traceAdvancer traceAdvancerState

type traceAdvancerState struct {
timer wakeableSleep
timer *wakeableSleep
done chan struct{}
}

// start starts a new traceAdvancer.
func (s *traceAdvancerState) start() {
// Start a goroutine to periodically advance the trace generation.
s.done = make(chan struct{})
s.timer.init()
s.timer = newWakeableSleep()
go func() {
for traceEnabled() {
// Set a timer to wake us up
Expand Down Expand Up @@ -895,50 +895,76 @@ const defaultTraceAdvancePeriod = 1e9 // 1 second.
// close to free up resources. Once close is called, init
// must be called before another use.
type wakeableSleep struct {
timer *timer
timer *timer

// lock protects access to wakeup, but not send/recv on it.
lock mutex
wakeup chan struct{}
}

// init initializes the timer.
func (s *wakeableSleep) init() {
// newWakeableSleep initializes a new wakeableSleep and returns it.
func newWakeableSleep() *wakeableSleep {
s := new(wakeableSleep)
lockInit(&s.lock, lockRankWakeableSleep)
s.wakeup = make(chan struct{}, 1)
s.timer = new(timer)
s.timer.arg = s
s.timer.f = func(s any, _ uintptr) {
s.(*wakeableSleep).wake()
}
return s
}

// sleep sleeps for the provided duration in nanoseconds or until
// another goroutine calls wake.
//
// Must not be called by more than one goroutine at a time.
// Must not be called by more than one goroutine at a time and
// must not be called concurrently with close.
func (s *wakeableSleep) sleep(ns int64) {
resetTimer(s.timer, nanotime()+ns)
<-s.wakeup
lock(&s.lock)
wakeup := s.wakeup
unlock(&s.lock)
<-wakeup
stopTimer(s.timer)
}

// wake awakens any goroutine sleeping on the timer.
//
// Safe for concurrent use.
// Safe for concurrent use with all other methods.
func (s *wakeableSleep) wake() {
// Non-blocking send.
//
// Others may also write to this channel and we don't
// want to block on the receiver waking up. This also
// effectively batches together wakeup notifications.
select {
case s.wakeup <- struct{}{}:
default:
// Grab the wakeup channel, which may be nil if we're
// racing with close.
lock(&s.lock)
if s.wakeup != nil {
// Non-blocking send.
//
// Others may also write to this channel and we don't
// want to block on the receiver waking up. This also
// effectively batches together wakeup notifications.
select {
case s.wakeup <- struct{}{}:
default:
}
}
unlock(&s.lock)
}

// close wakes any goroutine sleeping on the timer and prevents
// further sleeping on it.
//
// Once close is called, the wakeableSleep must no longer be used.
//
// It must only be called once no goroutine is sleeping on the
// timer *and* nothing else will call wake concurrently.
func (s *wakeableSleep) close() {
close(s.wakeup)
// Set wakeup to nil so that a late timer ends up being a no-op.
lock(&s.lock)
wakeup := s.wakeup
s.wakeup = nil

// Close the channel.
close(wakeup)
unlock(&s.lock)
return
}
2 changes: 1 addition & 1 deletion src/runtime/trace2cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func traceStartReadCPU() {
throw("traceStartReadCPU called with trace disabled")
}
// Spin up the logger goroutine.
trace.cpuSleep.init()
trace.cpuSleep = newWakeableSleep()
done := make(chan struct{}, 1)
go func() {
for traceEnabled() {
Expand Down

0 comments on commit 3073f3f

Please sign in to comment.