Skip to content

Commit

Permalink
kvcoord: Correctly handle stuck rangefeeds
Browse files Browse the repository at this point in the history
Fixes cockroachdb#92570

A watcher responsible for restarting stuck range feeds
may incorrectly cancel rangefeed if the downstream event
consumer is slow.

Release note (bug fix): Fix incorrect cancellation logic
when attempting to detect stuck range feeds.
  • Loading branch information
Yevgeniy Miretskiy committed Nov 28, 2022
1 parent 8e4d3df commit c071604
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 42 deletions.
22 changes: 8 additions & 14 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,6 @@ func (ds *DistSender) singleRangeFeed(

for {
stuckWatcher.stop() // if timer is running from previous iteration, stop it now
if catchupRes == nil {
// Already finished catch-up scan (in an earlier iteration of this loop),
// so start timer early, not on first event received.
stuckWatcher.ping()
}
if transport.IsExhausted() {
return args.Timestamp, newSendError(
fmt.Sprintf("sending to all %d replicas failed", len(replicas)))
Expand Down Expand Up @@ -639,19 +634,21 @@ func (ds *DistSender) singleRangeFeed(
}
}

var event *roachpb.RangeFeedEvent
for {
event, err := stream.Recv()
if err == io.EOF {
return args.Timestamp, nil
}
if err != nil {
if err := stuckWatcher.do(func() (err error) {
event, err = stream.Recv()
return err
}); err != nil {
if err == io.EOF {
return args.Timestamp, nil
}
if stuckWatcher.stuck() {
afterCatchUpScan := catchupRes == nil
return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold())
}
return args.Timestamp, err
}
stuckWatcher.ping() // starts timer on first event only

msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span}
switch t := event.GetValue().(type) {
Expand Down Expand Up @@ -728,6 +725,3 @@ func (ds *DistSender) handleStuckEvent(
}
return errors.Wrapf(errRestartStuckRange, "waiting for r%d %s [threshold %s]", args.RangeID, args.Replica, threshold)
}

// sentinel error returned when cancelling rangefeed when it is stuck.
var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity")
45 changes: 34 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// stuckRangeFeedCanceler are a defense-in-depth mechanism to restart rangefeeds that have
Expand Down Expand Up @@ -49,14 +50,22 @@ type stuckRangeFeedCanceler struct {
resetTimerAfter time.Time
activeThreshold time.Duration

_stuck int32 // atomic
_state int32 // atomic
}

type state int32

const (
inactive state = iota
active
stuck
)

// stuck returns true if the stuck detection got triggered.
// If this returns true, the cancel function will be invoked
// shortly, if it hasn't already.
func (w *stuckRangeFeedCanceler) stuck() bool {
return atomic.LoadInt32(&w._stuck) != 0
return atomic.LoadInt32(&w._state) == int32(stuck)
}

// stop releases the active timer, if any. It should be invoked
Expand All @@ -69,13 +78,17 @@ func (w *stuckRangeFeedCanceler) stop() {
}
}

// ping notifies the canceler that the rangefeed has received an
// event, i.e. is making progress.
func (w *stuckRangeFeedCanceler) ping() {
// sentinel error returned when cancelling rangefeed when it is stuck.
var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity")

// do invokes callback cb, arranging for cancellation to happen if the callback
// takes too long to complete. Returns errRestartStuckRange if cb took excessive
// amount of time.
func (w *stuckRangeFeedCanceler) do(cb func() error) error {
threshold := w.threshold()
if threshold == 0 {
w.stop()
return
return cb()
}

mkTimer := func() {
Expand All @@ -86,21 +99,31 @@ func (w *stuckRangeFeedCanceler) ping() {
// ping() event arrives at 29.999s, the timer should only fire
// at 90s, not 60s.
w.t = time.AfterFunc(3*threshold/2, func() {
// NB: important to store _stuck before canceling, since we
// want the caller to be able to detect stuck() after ctx
// cancels.
atomic.StoreInt32(&w._stuck, 1)
w.cancel()
// NB: trigger cancellation only if currently active.
if atomic.CompareAndSwapInt32(&w._state, int32(active), int32(stuck)) {
w.cancel()
}
})
w.resetTimerAfter = timeutil.Now().Add(threshold / 2)
}

if !atomic.CompareAndSwapInt32(&w._state, int32(inactive), int32(active)) {
return errRestartStuckRange
}
defer atomic.CompareAndSwapInt32(&w._state, int32(active), int32(inactive))

if w.t == nil {
mkTimer()
} else if w.resetTimerAfter.Before(timeutil.Now()) || w.activeThreshold != threshold {
w.stop()
mkTimer()
}

err := cb()
if atomic.LoadInt32(&w._state) != int32(active) {
return errors.CombineErrors(errRestartStuckRange, err)
}
return err
}

// newStuckRangeFeedCanceler sets up a canceler with the provided
Expand Down
33 changes: 16 additions & 17 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,45 @@
package kvcoord

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

type cancelRec int32

func (c *cancelRec) cancel() {
atomic.StoreInt32((*int32)(c), 1)
}

func (c *cancelRec) canceled() bool {
return atomic.LoadInt32((*int32)(c)) != 0
}

func TestStuckRangeFeedCanceler(t *testing.T) {
defer leaktest.AfterTest(t)()

_dur := int64(24 * time.Hour) // atomic
var cr cancelRec
c := newStuckRangeFeedCanceler(cr.cancel, func() time.Duration {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doNothing := func() error { return nil }
blockUntilCanceled := func() error {
<-ctx.Done()
return ctx.Err()
}

c := newStuckRangeFeedCanceler(cancel, func() time.Duration {
return time.Duration(atomic.LoadInt64(&_dur))
})
require.Nil(t, c.t) // not running upon creation

for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
require.False(t, c.stuck())
c.ping()
require.NoError(t, c.do(doNothing))
require.NotNil(t, c.t) // first call to ping sets off timer
}
atomic.StoreInt64(&_dur, int64(time.Nanosecond))
atomic.StoreInt64(&_dur, int64(10*time.Millisecond))
// Nothing has reset the timer yet, so we won't be stuck here.
// This isn't great but it is true, so documenting it.
require.False(t, c.stuck())
// Ping will update the timer, so it will fire very soon.
c.ping()
require.Eventually(t, cr.canceled, time.Second /* max */, 5*time.Nanosecond /* tick */)
require.True(t, errors.Is(c.do(blockUntilCanceled), errRestartStuckRange))
require.True(t, c.stuck())

atomic.StoreInt64(&_dur, int64(24*time.Hour))
Expand All @@ -60,6 +59,6 @@ func TestStuckRangeFeedCanceler(t *testing.T) {
for i := 0; i < 10; i++ {
time.Sleep(time.Nanosecond)
require.True(t, c.stuck())
c.ping()
require.True(t, errors.Is(c.do(blockUntilCanceled), errRestartStuckRange))
}
}

0 comments on commit c071604

Please sign in to comment.