Skip to content

Commit

Permalink
kvcoord: Correctly handle stuck rangefeeds
Browse files Browse the repository at this point in the history
Fixes #92570

A watcher responsible for restarting stuck range feeds
may incorrectly cancel rangefeed if the downstream event
consumer is slow.

Release note (bug fix): Fix incorrect cancellation logic
when attempting to detect stuck range feeds.
  • Loading branch information
Yevgeniy Miretskiy committed Nov 28, 2022
1 parent a6cb346 commit f98f0d3
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 40 deletions.
19 changes: 8 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,6 @@ func (ds *DistSender) singleRangeFeed(

for {
stuckWatcher.stop() // if timer is running from previous iteration, stop it now
if catchupRes == nil {
// Already finished catch-up scan (in an earlier iteration of this loop),
// so start timer early, not on first event received.
stuckWatcher.ping()
}
if transport.IsExhausted() {
return args.Timestamp, newSendError(
fmt.Sprintf("sending to all %d replicas failed", len(replicas)))
Expand Down Expand Up @@ -639,19 +634,21 @@ func (ds *DistSender) singleRangeFeed(
}
}

var event *roachpb.RangeFeedEvent
for {
event, err := stream.Recv()
if err == io.EOF {
return args.Timestamp, nil
}
if err != nil {
if err := stuckWatcher.do(func() (err error) {
event, err = stream.Recv()
return err
}); err != nil {
if err == io.EOF {
return args.Timestamp, nil
}
if stuckWatcher.stuck() {
afterCatchUpScan := catchupRes == nil
return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold())
}
return args.Timestamp, err
}
stuckWatcher.ping() // starts timer on first event only

msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span}
switch t := event.GetValue().(type) {
Expand Down
54 changes: 43 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,35 @@ type stuckRangeFeedCanceler struct {
resetTimerAfter time.Time
activeThreshold time.Duration

_stuck int32 // atomic
// _state manages canceler state transitions.
// do():
// inactive <-----
// | |
// ----active---- |
// | | |
// timeout ok |
// | |-----
// stuck
// If timeout occurs outside do(), it is ignored.
_state int32 // atomic

// A testing knob to notify when timer triggers.
afterTimerTrigger func()
}

type state int32

const (
inactive state = iota
active
stuck
)

// stuck returns true if the stuck detection got triggered.
// If this returns true, the cancel function will be invoked
// shortly, if it hasn't already.
func (w *stuckRangeFeedCanceler) stuck() bool {
return atomic.LoadInt32(&w._stuck) != 0
return atomic.LoadInt32(&w._state) == int32(stuck)
}

// stop releases the active timer, if any. It should be invoked
Expand All @@ -69,13 +90,14 @@ func (w *stuckRangeFeedCanceler) stop() {
}
}

// ping notifies the canceler that the rangefeed has received an
// event, i.e. is making progress.
func (w *stuckRangeFeedCanceler) ping() {
// do invokes callback cb, arranging for cancellation to happen if the callback
// takes too long to complete. Returns errRestartStuckRange if cb took excessive
// amount of time.
func (w *stuckRangeFeedCanceler) do(cb func() error) error {
threshold := w.threshold()
if threshold == 0 {
w.stop()
return
return cb()
}

mkTimer := func() {
Expand All @@ -86,21 +108,31 @@ func (w *stuckRangeFeedCanceler) ping() {
// ping() event arrives at 29.999s, the timer should only fire
// at 90s, not 60s.
w.t = time.AfterFunc(3*threshold/2, func() {
// NB: important to store _stuck before canceling, since we
// want the caller to be able to detect stuck() after ctx
// cancels.
atomic.StoreInt32(&w._stuck, 1)
w.cancel()
if w.afterTimerTrigger != nil {
defer w.afterTimerTrigger()
}

// NB: trigger cancellation only if currently active.
if atomic.CompareAndSwapInt32(&w._state, int32(active), int32(stuck)) {
w.cancel()
}
})
w.resetTimerAfter = timeutil.Now().Add(threshold / 2)
}

if !atomic.CompareAndSwapInt32(&w._state, int32(inactive), int32(active)) {
return errRestartStuckRange
}
defer atomic.CompareAndSwapInt32(&w._state, int32(active), int32(inactive))

if w.t == nil {
mkTimer()
} else if w.resetTimerAfter.Before(timeutil.Now()) || w.activeThreshold != threshold {
w.stop()
mkTimer()
}

return cb()
}

// newStuckRangeFeedCanceler sets up a canceler with the provided
Expand Down
73 changes: 55 additions & 18 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,46 @@
package kvcoord

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

type cancelRec int32

func (c *cancelRec) cancel() {
atomic.StoreInt32((*int32)(c), 1)
}

func (c *cancelRec) canceled() bool {
return atomic.LoadInt32((*int32)(c)) != 0
}

func TestStuckRangeFeedCanceler(t *testing.T) {
defer leaktest.AfterTest(t)()

_dur := int64(24 * time.Hour) // atomic
var cr cancelRec
c := newStuckRangeFeedCanceler(cr.cancel, func() time.Duration {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doNothing := func() error { return nil }
blockUntilCanceled := func() error {
<-ctx.Done()
return ctx.Err()
}

c := newStuckRangeFeedCanceler(cancel, func() time.Duration {
return time.Duration(atomic.LoadInt64(&_dur))
})
require.Nil(t, c.t) // not running upon creation
require.Nil(t, c.t) // not running upon creation.

for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
require.False(t, c.stuck())
c.ping()
require.NoError(t, c.do(doNothing))
require.NotNil(t, c.t) // first call to ping sets off timer
}
atomic.StoreInt64(&_dur, int64(time.Nanosecond))
atomic.StoreInt64(&_dur, int64(10*time.Millisecond))
// Nothing has reset the timer yet, so we won't be stuck here.
// This isn't great but it is true, so documenting it.
require.False(t, c.stuck())
// Ping will update the timer, so it will fire very soon.
c.ping()
require.Eventually(t, cr.canceled, time.Second /* max */, 5*time.Nanosecond /* tick */)
require.True(t, errors.Is(c.do(blockUntilCanceled), context.Canceled))
require.True(t, c.stuck())

atomic.StoreInt64(&_dur, int64(24*time.Hour))
Expand All @@ -60,6 +60,43 @@ func TestStuckRangeFeedCanceler(t *testing.T) {
for i := 0; i < 10; i++ {
time.Sleep(time.Nanosecond)
require.True(t, c.stuck())
c.ping()
require.True(t, errors.Is(c.do(blockUntilCanceled), errRestartStuckRange))
}
}

// Ensure that canceller monitors only the duration of the do()
// function, and not anything happening outside.
func TestStuckRangeFeedCancelerScope(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doNothing := func() error { return nil }

triggered := struct {
sync.Once
ch chan struct{}
}{ch: make(chan struct{})}

const duration = time.Second
c := newStuckRangeFeedCanceler(cancel, func() time.Duration {
return duration
})
c.afterTimerTrigger = func() {
triggered.Do(func() {
close(triggered.ch)
})
}

require.Nil(t, c.t) // not running upon creation.
require.False(t, c.stuck())
require.Nil(t, c.do(doNothing))

// Now, start waiting until timer triggers.
// Even though timer triggered, the watcher is not cancelled since
// time expired outside do().
<-triggered.ch
require.Nil(t, ctx.Err())
require.False(t, c.stuck())
require.Nil(t, c.do(doNothing))
}

0 comments on commit f98f0d3

Please sign in to comment.