Skip to content

Commit

Permalink
kvserver, timeutil: fix some Timer user-after-Stops
Browse files Browse the repository at this point in the history
Two guys were continuing to use a Timer after Stop()ing it, which is
illegal.
Also, this patch adds a flavor of Timer.Stop() which always puts the
receiver back into the timer pool. The existing Stop() can't always do
that because it has to be way conservative.

Release note: None
Release justification: Bug fix.
  • Loading branch information
andreimatei committed Mar 2, 2021
1 parent 9bf97ac commit 033c154
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 31 deletions.
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/closedts/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,17 @@ func (p *Provider) runCloser(ctx context.Context) {
// Track whether we've ever been live to avoid logging warnings about not
// being live during node startup.
var everBeenLive bool
var t timeutil.Timer
defer t.Stop()
t := timeutil.NewTimer()
defer t.StopExclusive()
for {
closeFraction := closedts.CloseFraction.Get(&p.cfg.Settings.SV)
targetDuration := float64(closedts.TargetDuration.Get(&p.cfg.Settings.SV))
if targetDuration > 0 {
t.Reset(time.Duration(closeFraction * targetDuration))
} else {
t.Stop() // disable closing when the target duration is non-positive
// Disable closing when the target duration is non-positive.
t.Stop()
t = timeutil.NewTimer()
}
select {
case <-p.cfg.Stopper.ShouldQuiesce():
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/closedts/sidetransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,16 @@ func (s *Sender) Run(ctx context.Context, nodeID roachpb.NodeID) {
s.buf.Close()
}()

var timer timeutil.Timer
defer timer.Stop()
timer := timeutil.NewTimer()
defer timer.StopExclusive()
for {
interval := closedts.SideTransportCloseInterval.Get(&s.st.SV)
if interval > 0 {
timer.Reset(closedts.SideTransportCloseInterval.Get(&s.st.SV))
timer.Reset(interval)
} else {
// Disable the side-transport.
timer.Stop()
timer.StopExclusive()
timer = timeutil.NewTimer()
}
select {
case <-timer.C:
Expand Down
28 changes: 28 additions & 0 deletions pkg/util/timeutil/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,38 @@ func (t *Timer) Reset(d time.Duration) {
// close the channel, to prevent a read from succeeding incorrectly.
// Note that a Timer must never be used again after calls to Stop as the timer
// object will be put into an object pool for reuse.
//
// See also StopExclusive.
func (t *Timer) Stop() bool {
return t.stopInternal(false /* safeToConsume */)
}

// StopExclusive is like Stop, but it's more efficient because it
// unconditionally puts the Timer back in the pool. When you can, prefer
// StopExclusive to Stop.
//
// Unlike Stop, StopExclusive cannot be called concurrently with receiving from
// t.C.
func (t *Timer) StopExclusive() {
t.stopInternal(true /* safeToConsume */)
}

// safeToConsume, if set, guarantees that there's no concurrent receivers on
// t.C. This allows us to consume t.C in case we detect that the timer has
// already fired, which in turn allows us to always return the Timer to the
// pool.
func (t *Timer) stopInternal(safeToConsume bool) bool {
var res bool
if t.timer != nil {
res = t.timer.Stop()
// Consume t.C if the timer had already fired and safeToConsume is set. Note
// that we don't check t.Read if safeToConsume is set to avoid data-racing
// with t.C receivers (which write to t.Read). If safeToConsume is set, the
// caller promised that there's no concurrent receivers.
if !res && safeToConsume && !t.Read {
<-t.C
res = true
}
if res {
// Only place the timer back in the pool if we successfully stopped
// it. Otherwise, we'd have to read from the channel if !t.Read.
Expand Down
57 changes: 33 additions & 24 deletions pkg/util/timeutil/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,42 @@ func TestTimerStop(t *testing.T) {
for sleepMult := time.Duration(0); sleepMult < 3; sleepMult++ {
sleepDur := sleepMult * timeStep
t.Run(fmt.Sprintf("sleepDur=%d*timeStep", sleepMult), func(t *testing.T) {
var timer Timer
timer.Reset(timeStep)
time.Sleep(sleepDur)

// Get a handle to the timer channel before calling Stop, because Stop
// clears the struct.
c := timer.C

// Even though we sleep for a certain duration which we know to be more
// or less than the timer's duration, we can't assert whether the timer
// fires before calling timer.Stop because we have no control over the
// scheduler. Instead, we handle both cases to avoid flakiness and assert
// that Stop returns the correct status.
stopped := timer.Stop()
select {
case <-c:
if stopped {
t.Errorf("timer unexpectedly fired after stopping")
}
case <-time.After(5 * timeStep):
if !stopped {
t.Errorf("timer did not fire after failing to stop")
}
for _, exclusive := range []bool{false, true} {
t.Run(fmt.Sprintf("exclusive=%t", exclusive), func(t *testing.T) {
var timer Timer
timer.Reset(timeStep)
time.Sleep(sleepDur)

// Get a handle to the timer channel before calling Stop, because Stop
// clears the struct.
c := timer.C

// Even though we sleep for a certain duration which we know to be more
// or less than the timer's duration, we can't assert whether the timer
// fires before calling timer.Stop because we have no control over the
// scheduler. Instead, we handle both cases to avoid flakiness and assert
// that Stop returns the correct status.
var stopped bool
if exclusive {
timer.StopExclusive()
stopped = true
} else {
stopped = timer.Stop()
}
select {
case <-c:
if stopped {
t.Errorf("timer unexpectedly fired after stopping")
}
case <-time.After(5 * timeStep):
if !stopped {
t.Errorf("timer did not fire after failing to stop")
}
}
})
}
})
}

}

func TestTimerUninitializedStopNoop(t *testing.T) {
Expand Down

0 comments on commit 033c154

Please sign in to comment.