Skip to content

Commit

Permalink
some refactors [to be squashed]
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Aug 26, 2022
1 parent 0d2d5d2 commit 6c3d72d
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 59 deletions.
64 changes: 5 additions & 59 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,9 @@ func (ds *DistSender) singleRangeFeed(
// cleanup catchup reservation in case of early termination.
defer finishCatchupScan()

stuckWatcher := makeStuckRangeWatcher(cancelFeed, &ds.st.SV)
stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, func() time.Duration {
return rangefeedRangeStuckThreshold.Get(&ds.st.SV)
})
defer stuckWatcher.stop()

var streamCleanup func()
Expand Down Expand Up @@ -580,12 +582,12 @@ func (ds *DistSender) singleRangeFeed(
return args.Timestamp, nil
}
if err != nil {
if stuckWatcher.wasStuck {
if stuckWatcher.stuck() {
return args.Timestamp, errRestartStuckRange
}
return args.Timestamp, err
}
stuckWatcher.recordEvent()
stuckWatcher.ping()

msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span}
switch t := event.GetValue().(type) {
Expand Down Expand Up @@ -641,59 +643,3 @@ func legacyRangeFeedEventProducer(

// sentinel error returned when cancelling rangefeed when it is stuck.
var errRestartStuckRange = errors.New("rangefeed restarting due to liveness")

// We want to cancel the context if we don't receive an event
// in a while. We try to do this without doing too much work
// (allocations, etc.) for each message received to bound the
// overhead in case everything is working fine and messages
// are potentially rushing in at high frequency. To do this,
// we set up a timer that would cancel the context, and
// whenever it is ~half expired, stop it (after the next
// message is there) and re-start it. That way, we allocate
// only ~twice per eventCheckInterval, which is acceptable.
type stuckRangeWatcher struct {
wasStuck bool
cancel context.CancelFunc
t *time.Timer
sv *settings.Values
resetTimerAfter time.Time
}

func (w *stuckRangeWatcher) stop() {
if w.t != nil {
w.t.Stop()
w.t = nil
}
}

func (w *stuckRangeWatcher) recordEvent() {
stuckThreshold := rangefeedRangeStuckThreshold.Get(w.sv)
if stuckThreshold == 0 {
w.stop()
return
}

mkTimer := func() {
w.t = time.AfterFunc(stuckThreshold, func() {
w.wasStuck = true
w.cancel()
})
w.resetTimerAfter = timeutil.Now().Add(stuckThreshold / 2)
}

if w.t == nil {
mkTimer()
} else if w.resetTimerAfter.Before(timeutil.Now()) {
w.stop()
mkTimer()
}
}

func makeStuckRangeWatcher(cancel context.CancelFunc, sv *settings.Values) *stuckRangeWatcher {
w := &stuckRangeWatcher{
cancel: cancel,
sv: sv,
}
w.recordEvent()
return w
}
111 changes: 111 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package kvcoord

import (
"context"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// stuckRangeFeedCanceler are a defense-in-depth mechanism to restart rangefeeds that have
// not received events from the KV layer in some time. Rangefeeds are supposed to receive
// regular updates, as at the very least they ought to be receiving closed timestamps.
// However, issues[^1] at the KV layer could prevent this.
//
// The canceler is notified via ping() whenever the associated RangeFeed receives an event.
// Should ping() not be called for the configured threshold duration, the provided cancel
// function will be invoked.
//
// This is implemented without incurring nontrivial work on each call to ping().
// Instead, work is done roughly on each threshold interval, which is assumed to
// be large enough (i.e. at least a couple of seconds) to make this negligible.
// Concretely, a timer is set that would invoke the cancellation, and the timer
// is reset on the first call to ping() after the timer is at least half
// expired. That way, we allocate only ~twice per eventCheckInterval, which is
// acceptable.
//
// The canceler detects changes to the configured threshold duration on each call
// to ping(), i.e. in the common case of no stuck rangefeeds, it will ~immediately
// pick up the new value and apply it.
type stuckRangeFeedCanceler struct {
threshold func() time.Duration
cancel context.CancelFunc
t *time.Timer
resetTimerAfter time.Time
activeThreshold time.Duration

_stuck int32 // atomic
}

// stuck returns true if the stuck detection got triggered.
// If this returns true, the cancel function will be invoked
// shortly, if it hasn't already.
func (w *stuckRangeFeedCanceler) stuck() bool {
return atomic.LoadInt32(&w._stuck) != 0
}

// stop releases the active timer, if any. It should be invoked
// unconditionally before the canceler goes out of scope.
func (w *stuckRangeFeedCanceler) stop() {
if w.t != nil {
w.t.Stop()
w.t = nil
w.activeThreshold = 0
}
}

// ping notifies the canceler that the rangefeed has received an
// event, i.e. is making progress.
func (w *stuckRangeFeedCanceler) ping() {
threshold := w.threshold()
if threshold == 0 {
w.stop()
return
}

mkTimer := func() {
w.activeThreshold = threshold
w.t = time.AfterFunc(threshold, func() {
// NB: important to store _stuck before canceling, since we
// want the caller to be able to detect stuck() after ctx
// cancels.
atomic.StoreInt32(&w._stuck, 1)
w.cancel()
})
w.resetTimerAfter = timeutil.Now().Add(threshold / 2)
}

if w.t == nil {
mkTimer()
} else if w.resetTimerAfter.Before(timeutil.Now()) || w.activeThreshold != threshold {
w.stop()
mkTimer()
}
}

// newStuckRangeFeedCanceler sets up a canceler with the provided
// cancel function (which should cancel the rangefeed if invoked)
// and uses the kv.rangefeed.range_stuck_threshold cluster setting
// to (reactively) configure the timeout.
func newStuckRangeFeedCanceler(
cancel context.CancelFunc, threshold func() time.Duration,
) *stuckRangeFeedCanceler {
w := &stuckRangeFeedCanceler{
threshold: threshold,
cancel: cancel,
}
w.ping()
return w
}
62 changes: 62 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package kvcoord

import (
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

type cancelRec int32

func (c *cancelRec) cancel() {
atomic.StoreInt32((*int32)(c), 1)
}

func (c *cancelRec) canceled() bool {
return atomic.LoadInt32((*int32)(c)) != 0
}

func TestStuckRangeFeedCanceler(t *testing.T) {
defer leaktest.AfterTest(t)()

var dur int64 = int64(24 * time.Hour) // atomic
var cr cancelRec
c := newStuckRangeFeedCanceler(cr.cancel, func() time.Duration {
return time.Duration(atomic.LoadInt64(&dur))
})
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
require.False(t, c.stuck())
c.ping()
}
atomic.StoreInt64(&dur, int64(time.Nanosecond))
// Nothing has reset the timer yet, so we won't be stuck here.
// This isn't great but it is true, so documenting it.
require.False(t, c.stuck())
// Ping will update the timer, so it will fire very soon.
c.ping()
require.Eventually(t, c.stuck, time.Second /* max */, 5*time.Nanosecond /* tick */)

atomic.StoreInt64(&dur, int64(24*time.Hour))

// Stays marked as stuck even when we ping it again.
for i := 0; i < 10; i++ {
time.Sleep(time.Nanosecond)
require.True(t, c.stuck())
c.ping()
}
}

0 comments on commit 6c3d72d

Please sign in to comment.