Skip to content

Commit

Permalink
time: avoid stale receives after Timer/Ticker Stop/Reset return
Browse files Browse the repository at this point in the history
A proposal discussion in mid-2020 on #37196 decided to change
time.Timer and time.Ticker so that their Stop and Reset methods
guarantee that no old value (corresponding to the previous configuration
of the Timer or Ticker) will be received after the method returns.

The trivial way to do this is to make the Timer/Ticker channels
unbuffered, create a goroutine per Timer/Ticker feeding the channel,
and then coordinate with that goroutine during Stop/Reset.
Since Stop/Reset coordinate with the goroutine and the channel
is unbuffered, there is no possibility of a stale value being sent
after Stop/Reset returns.

Of course, we do not want an extra goroutine per Timer/Ticker,
but that's still a good semantic model: behave like the channels
are unbuffered and fed by a coordinating goroutine.

The actual implementation is more effort but behaves like the model.
Specifically, the timer channel has a 1-element buffer like it always has,
but len(t.C) and cap(t.C) are special-cased to return 0 anyway, so user
code cannot see what's in the buffer except with a receive.
Stop/Reset lock out any stale sends and then clear any pending send
from the buffer.

Some programs will change behavior. For example:

	package main

	import "time"

	func main() {
		t := time.NewTimer(2 * time.Second)
		time.Sleep(3 * time.Second)
		if t.Reset(2*time.Second) != false {
			panic("expected timer to have fired")
		}
		<-t.C
		<-t.C
	}

This program (from #11513) sleeps 3s after setting a 2s timer,
resets the timer, and expects Reset to return false: the Reset is too
late and the send has already occurred. It then expects to receive
two values: the one from before the Reset, and the one from after
the Reset.

With an unbuffered timer channel, it should be clear that no value
can be sent during the time.Sleep, so the time.Reset returns true,
indicating that the Reset stopped the timer from going off.
Then there is only one value to receive from t.C: the one from after the Reset.

In 2015, I used the above example as an argument against this change.

Note that a correct version of the program would be:

	func main() {
		t := time.NewTimer(2 * time.Second)
		time.Sleep(3 * time.Second)
		if !t.Reset(2*time.Second) {
			<-t.C
		}
		<-t.C
	}

This works with either semantics, by heeding t.Reset's result.
The change should not affect correct programs.

However, one way that the change would be visible is when programs
use len(t.C) (instead of a non-blocking receive) to poll whether the timer
has triggered already. We might legitimately worry about breaking such
programs.

In 2020, discussing #37196, Bryan Mills and I surveyed programs using
len on timer channels. These are exceedingly rare to start with; nearly all
the uses are buggy; and all the buggy programs would be fixed by the new
semantics. The details are at [1].

To further reduce the impact of this change, this CL adds a temporary
GODEBUG setting, which we didn't know about yet in 2015 and 2020.
Specifically, asynctimerchan=1 disables the change and is the default
for main programs in modules that use a Go version before 1.23.
We hope to be able to retire this setting after the minimum 2-year window.
Setting asynctimerchan=1 also disables the garbage collection change
from CL 568341, although users shouldn't need to know that since
it is not a semantically visible change (unless we have bugs!).

As an undocumented bonus that we do not officially support,
asynctimerchan=2 disables the channel buffer change but keeps
the garbage collection change. This may help while we are
shaking out bugs in either of them.

Fixes #37196.

[1] #37196 (comment)

Change-Id: I8925d3fb2b86b2ae87fd2acd055011cbf7bd5916
Reviewed-on: https://go-review.googlesource.com/c/go/+/568341
Reviewed-by: Austin Clements <[email protected]>
Auto-Submit: Russ Cox <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
  • Loading branch information
rsc authored and gopherbot committed Mar 14, 2024
1 parent 0159150 commit 966609a
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 160 deletions.
7 changes: 6 additions & 1 deletion doc/godebug.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ and the [go command documentation](/cmd/go#hdr-Build_and_test_caching).

### Go 1.23

TODO: `asynctimerchan` setting.
Go 1.23 changed the channels created by package time to be unbuffered
(synchronous), which makes correct use of the [`Timer.Stop`](/pkg/time/#Timer.Stop)
and [`Timer.Reset`](/pkg/time/#Timer.Reset) method results much easier.
The [`asynctimerchan` setting](/pkg/time/#NewTimer) disables this change.
There are no runtime metrics for this change,
This setting may be removed in a future release, Go 1.27 at the earliest.

Go 1.23 changed the mode bits reported by [`os.Lstat`](/pkg/os#Lstat) and [`os.Stat`](/pkg/os#Stat)
for reparse points, which can be controlled with the `winsymlink` setting.
Expand Down
2 changes: 1 addition & 1 deletion src/internal/godebugs/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Info struct {
// Note: After adding entries to this table, update the list in doc/godebug.md as well.
// (Otherwise the test in this package will fail.)
var All = []Info{
{Name: "asynctimerchan", Package: "time", Opaque: true},
{Name: "asynctimerchan", Package: "time", Changed: 23, Old: "1", Opaque: true},
{Name: "execerrdot", Package: "os/exec"},
{Name: "gocachehash", Package: "cmd/go"},
{Name: "gocachetest", Package: "cmd/go"},
Expand Down
48 changes: 47 additions & 1 deletion src/runtime/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,35 @@ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
goready(gp, skip+1)
}

// timerchandrain removes all elements in channel c's buffer.
// It reports whether any elements were removed.
// Because it is only intended for timers, it does not
// handle waiting senders at all (all timer channels
// use non-blocking sends to fill the buffer).
func timerchandrain(c *hchan) bool {
// Note: Cannot use empty(c) because we are called
// while holding c.timer.sendLock, and empty(c) will
// call c.timer.maybeRunChan, which will deadlock.
// We are emptying the channel, so we only care about
// the count, not about potentially filling it up.
if atomic.Loaduint(&c.qcount) == 0 {
return false
}
lock(&c.lock)
any := false
for c.qcount > 0 {
any = true
typedmemclr(c.elemtype, chanbuf(c, c.recvx))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
}
unlock(&c.lock)
return any
}

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
Expand Down Expand Up @@ -748,16 +777,33 @@ func chanlen(c *hchan) int {
if c == nil {
return 0
}
if c.timer != nil {
async := debug.asynctimerchan.Load() != 0
if c.timer != nil && async {
c.timer.maybeRunChan()
}
if c.timer != nil && !async {
// timer channels have a buffered implementation
// but present to users as unbuffered, so that we can
// undo sends without users noticing.
return 0
}
return int(c.qcount)
}

func chancap(c *hchan) int {
if c == nil {
return 0
}
if c.timer != nil {
async := debug.asynctimerchan.Load() != 0
if async {
return int(c.dataqsiz)
}
// timer channels have a buffered implementation
// but present to users as unbuffered, so that we can
// undo sends without users noticing.
return 0
}
return int(c.dataqsiz)
}

Expand Down
63 changes: 33 additions & 30 deletions src/runtime/lockrank.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions src/runtime/mklockrank.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ NONE <
# Test only
NONE < testR, testW;
NONE < timerSend;
# Scheduler, timers, netpoll
NONE < allocmW, execW, cpuprof, pollCache, pollDesc, wakeableSleep;
scavenge, sweep, testR, wakeableSleep < hchan;
scavenge, sweep, testR, wakeableSleep, timerSend < hchan;
assistQueue,
cpuprof,
forcegc,
Expand All @@ -81,7 +83,7 @@ NONE < notifyList;
hchan, notifyList < sudog;
hchan, pollDesc, wakeableSleep < timers;
timers < timer < netpollInit;
timers, timerSend < timer < netpollInit;
# Semaphores
NONE < root;
Expand Down
141 changes: 117 additions & 24 deletions src/runtime/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@ type timer struct {
// a well-behaved function and not block.
//
// The arg and seq are client-specified opaque arguments passed back to f.
// When used from package time, arg is a channel (for After, NewTicker)
// or the function to call (for AfterFunc) and seq is unused (0).
// When used from netpoll, arg and seq have meanings defined by netpoll
// and are completely opaque to this code; in that context, seq is a sequence
// number to recognize and squech stale function invocations.
// When used from package time, arg is a channel (for After, NewTicker)
// or the function to call (for AfterFunc) and seq is unused (0).
//
// Package time does not know about seq, but if this is a channel timer (t.isChan == true),
// this file uses t.seq as a sequence number to recognize and squelch
// sends that correspond to an earlier (stale) timer configuration,
// similar to its use in netpoll. In this usage (that is, when t.isChan == true),
// writes to seq are protected by both t.mu and t.sendLock,
// so reads are allowed when holding either of the two mutexes.
//
// The delay argument is nanotime() - t.when, meaning the delay in ns between
// when the timer should have gone off and now. Normally that amount is
Expand Down Expand Up @@ -69,6 +76,10 @@ type timer struct {
// Since writes to whenHeap are protected by two locks (t.mu and t.ts.mu),
// it is permitted to read whenHeap when holding either one.
whenHeap int64

// sendLock protects sends on the timer's channel.
// Not used for async (pre-Go 1.23) behavior when debug.asynctimerchan.Load() != 0.
sendLock mutex
}

// init initializes a newly allocated timer t.
Expand Down Expand Up @@ -167,7 +178,7 @@ func (t *timer) trace1(op string) {
return
}
bits := [4]string{"h", "m", "z", "c"}
for i := range bits {
for i := range 3 {
if t.state&(1<<i) == 0 {
bits[i] = "-"
}
Expand Down Expand Up @@ -199,6 +210,18 @@ func (t *timer) unlock() {
unlock(&t.mu)
}

// hchan returns the channel in t.arg.
// t must be a timer with a channel.
func (t *timer) hchan() *hchan {
if !t.isChan {
badTimer()
}
// Note: t.arg is a chan time.Time,
// and runtime cannot refer to that type,
// so we cannot use a type assertion.
return (*hchan)(efaceOf(&t.arg).data)
}

// updateHeap updates t.whenHeap as directed by t.state, updating t.state
// and returning a bool indicating whether the state (and t.whenHeap) changed.
// The caller must hold t's lock, or the world can be stopped instead.
Expand Down Expand Up @@ -309,6 +332,7 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg
racerelease(unsafe.Pointer(&t.timer))
}
if c != nil {
lockInit(&t.sendLock, lockRankTimerSend)
t.isChan = true
c.timer = &t.timer
if c.dataqsiz == 0 {
Expand Down Expand Up @@ -372,24 +396,45 @@ func (ts *timers) addHeap(t *timer) {
}
}

// stop stops the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as stopped.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was stopped before it was run.
func (t *timer) stop() bool {
t.lock()
t.trace("stop")
// maybeRunAsync checks whether t needs to be triggered and runs it if so.
// The caller is responsible for locking the timer and for checking that we
// are running timers in async mode. If the timer needs to be run,
// maybeRunAsync will unlock and re-lock it.
// The timer is always locked on return.
func (t *timer) maybeRunAsync() {
assertLockHeld(&t.mu)
if t.state&timerHeaped == 0 && t.isChan && t.when > 0 {
// If timer should have triggered already (but nothing looked at it yet),
// trigger now, so that a receive after the stop sees the "old" value
// that should be there.
// (It is possible to have t.blocked > 0 if there is a racing receive
// in blockTimerChan, but timerHeaped not being set means
// it hasn't run t.maybeAdd yet; in that case, running the
// timer ourselves now is fine.)
if now := nanotime(); t.when <= now {
systemstack(func() {
t.unlockAndRun(now) // resets t.when
})
t.lock()
}
}
}

// stop stops the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as stopped.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was stopped before it was run.
func (t *timer) stop() bool {
async := debug.asynctimerchan.Load() != 0
if !async && t.isChan {
lock(&t.sendLock)
}

t.lock()
t.trace("stop")
if async {
t.maybeRunAsync()
}
if t.state&timerHeaped != 0 {
t.state |= timerModified
if t.state&timerZombie == 0 {
Expand All @@ -399,7 +444,20 @@ func (t *timer) stop() bool {
}
pending := t.when > 0
t.when = 0

if !async && t.isChan {
// Stop any future sends with stale values.
// See timer.unlockAndRun.
t.seq++
}
t.unlock()
if !async && t.isChan {
unlock(&t.sendLock)
if timerchandrain(t.hchan()) {
pending = true
}
}

return pending
}

Expand Down Expand Up @@ -439,8 +497,16 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
if period < 0 {
throw("timer period must be non-negative")
}
async := debug.asynctimerchan.Load() != 0

if !async && t.isChan {
lock(&t.sendLock)
}

t.lock()
if async {
t.maybeRunAsync()
}
t.trace("modify")
t.period = period
if f != nil {
Expand All @@ -449,20 +515,6 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
t.seq = seq
}

if t.state&timerHeaped == 0 && t.isChan && t.when > 0 {
// This is a timer for an unblocked channel.
// Perhaps it should have expired already.
if now := nanotime(); t.when <= now {
// The timer should have run already,
// but nothing has checked it yet.
// Run it now.
systemstack(func() {
t.unlockAndRun(now) // resets t.when
})
t.lock()
}
}

wake := false
pending := t.when > 0
t.when = when
Expand All @@ -483,7 +535,20 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
}

add := t.needsAdd()

if !async && t.isChan {
// Stop any future sends with stale values.
// See timer.unlockAndRun.
t.seq++
}
t.unlock()
if !async && t.isChan {
if timerchandrain(t.hchan()) {
pending = true
}
unlock(&t.sendLock)
}

if add {
t.maybeAdd()
}
Expand Down Expand Up @@ -936,7 +1001,35 @@ func (t *timer) unlockAndRun(now int64) {
if ts != nil {
ts.unlock()
}

async := debug.asynctimerchan.Load() != 0
if !async && t.isChan {
// For a timer channel, we want to make sure that no stale sends
// happen after a t.stop or t.modify, but we cannot hold t.mu
// during the actual send (which f does) due to lock ordering.
// It can happen that we are holding t's lock above, we decide
// it's time to send a time value (by calling f), grab the parameters,
// unlock above, and then a t.stop or t.modify changes the timer
// and returns. At that point, the send needs not to happen after all.
// The way we arrange for it not to happen is that t.stop and t.modify
// both increment t.seq while holding both t.mu and t.sendLock.
// We copied the seq value above while holding t.mu.
// Now we can acquire t.sendLock (which will be held across the send)
// and double-check that t.seq is still the seq value we saw above.
// If not, the timer has been updated and we should skip the send.
// We skip the send by reassigning f to a no-op function.
lock(&t.sendLock)
if t.seq != seq {
f = func(any, uintptr, int64) {}
}
}

f(arg, seq, delay)

if !async && t.isChan {
unlock(&t.sendLock)
}

if ts != nil {
ts.lock()
}
Expand Down
Loading

0 comments on commit 966609a

Please sign in to comment.