Skip to content

Commit

Permalink
kvcoord: restart stuck RangeFeeds
Browse files Browse the repository at this point in the history
It has been observed in the wild, that rangefeeds (and changefeeds
that use them) would appear to be stuck and not make any progress.
It has been determined that, for reasons yet unknown, the
RangeFeed RPC stops receiving events from the server, even though
the contract indicates that such events should always come in
since events also include range checkpoint records that should always
be emitted periodically.

This PR introduces a defense in depth mechanism to the client side range
feed library so that ranges that appear to be stuck are restarted
automatically, based on the value of the
`kv.rangefeed.range_stuck_threshold` cluster setting.

Concretely, DistSender waits until it has seen the first event on the
stream (since there is a semaphore on the server side that can limit
delay when the catch-up phase first emits data) and then activates the
protection mechanism.

The metric `distsender.rangefeed.restart_stuck` tracks how often this
fires, along with newly added telemetry
`rangefeed.stuck.{during,after}-catchup-scan`.

Touches cockroachlabs/support#1729.

Release justification: stability improvement.
Release note (enterprise change): The new
`kv.rangefeed.range_stuck_threshold` (default 60s) cluster setting
instructs RangeFeed clients (used internally by changefeeds) to restart
automatically if no checkpoint or other event has been received from
the server for some time. This is a defense-in-depth mechanism which
will log output as follows if triggered: restarting stuck rangefeed:
waiting for r100 (n1,s1):1 [threshold 1m]: rangefeed restarting due to
inactivity.
  • Loading branch information
Yevgeniy Miretskiy authored and tbg committed Aug 29, 2022
1 parent 3b16435 commit 6b4f40d
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 21 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"dist_sender.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"dist_sender_rangefeed_canceler.go",
"doc.go",
"local_test_cluster_util.go",
"lock_spans_over_budget_error.go",
Expand Down Expand Up @@ -113,6 +114,7 @@ go_test(
srcs = [
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_rangefeed_canceler_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
"dist_sender_server_test.go",
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ This counts the number of ranges with an active rangefeed that are performing ca
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedRestartStuck = metric.Metadata{
Name: "distsender.rangefeed.restart_stuck",
Help: `Number of times a RangeFeed was restarted due to not receiving ` +
`timely updates (kv.rangefeed.range_stuck_threshold cluster setting)`,
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
)

// CanSendToFollower is used by the DistSender to determine if it needs to look
Expand Down Expand Up @@ -230,6 +237,7 @@ type DistSenderMetrics struct {
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [roachpb.NumMethods]*metric.Counter
ErrCounts [roachpb.NumErrors]*metric.Counter
}
Expand All @@ -250,6 +258,7 @@ func makeDistSenderMetrics() DistSenderMetrics {
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
}
for i := range m.MethodCounts {
method := roachpb.Method(i).String()
Expand Down
60 changes: 54 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

type singleRangeInfo struct {
Expand All @@ -63,6 +65,14 @@ var catchupScanConcurrency = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.rangefeed.range_stuck_threshold",
"restart rangefeeds if they appear to be stuck for the specified threshold; 0 disables",
time.Minute,
settings.NonNegativeDuration,
)

func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
Expand Down Expand Up @@ -203,6 +213,7 @@ func (ds *DistSender) RangeFeedSpans(
})
}(s)
}

return g.Wait()
}

Expand Down Expand Up @@ -283,10 +294,11 @@ func (a *activeRangeFeed) onRangeEvent(
}

func (a *activeRangeFeed) setLastError(err error) {
now := timeutil.Now()
a.Lock()
defer a.Unlock()
a.LastErr = errors.Wrapf(err, "disconnect at %s: checkpoint %s/-%s",
timeutil.Now().Format(time.RFC3339), a.Resolved, timeutil.Since(a.Resolved.GoTime()))
redact.SafeString(now.Format(time.RFC3339)), a.Resolved, now.Sub(a.Resolved.GoTime()))
a.NumErrs++
}

Expand Down Expand Up @@ -408,9 +420,20 @@ func (ds *DistSender) partialRangeFeed(
switch {
case errors.HasType(err, (*roachpb.StoreNotFoundError)(nil)) ||
errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)):
// These errors are likely to be unique to the replica that
// reported them, so no action is required before the next
// retry.
// These errors are likely to be unique to the replica that
// reported them, so no action is required before the next
// retry.
case errors.Is(err, errRestartStuckRange):
// Stuck ranges indicate a bug somewhere in the system. We are being
// defensive and attempt to restart this rangefeed. Usually, any
// stuck-ness is cleared out if we just attempt to re-resolve range
// descriptor and retry.
//
// The error contains the replica which we were waiting for.
log.Warningf(ctx, "restarting stuck rangefeed: %s", err)
token.Evict(ctx)
token = rangecache.EvictionToken{}
continue
case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
// Evict the descriptor from the cache and reload on next attempt.
token.Evict(ctx)
Expand Down Expand Up @@ -472,8 +495,8 @@ func (ds *DistSender) singleRangeFeed(
onRangeEvent onRangeEventCb,
) (hlc.Timestamp, error) {
// Ensure context is cancelled on all errors, to prevent gRPC stream leaks.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancelFeed := context.WithCancel(ctx)
defer cancelFeed()

args := roachpb.RangeFeedRequest{
Span: span,
Expand Down Expand Up @@ -518,6 +541,11 @@ func (ds *DistSender) singleRangeFeed(
// cleanup catchup reservation in case of early termination.
defer finishCatchupScan()

stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, func() time.Duration {
return rangefeedRangeStuckThreshold.Get(&ds.st.SV)
})
defer stuckWatcher.stop()

var streamCleanup func()
maybeCleanupStream := func() {
if streamCleanup != nil {
Expand All @@ -528,6 +556,12 @@ func (ds *DistSender) singleRangeFeed(
defer maybeCleanupStream()

for {
stuckWatcher.stop() // if timer is running from previous iteration, stop it now
if catchupRes == nil {
// Already finished catch-up scan (in an earlier iteration of this loop),
// so start timer early, not on first event received.
stuckWatcher.ping()
}
if transport.IsExhausted() {
return args.Timestamp, newSendError(
fmt.Sprintf("sending to all %d replicas failed", len(replicas)))
Expand Down Expand Up @@ -560,8 +594,19 @@ func (ds *DistSender) singleRangeFeed(
return args.Timestamp, nil
}
if err != nil {
if stuckWatcher.stuck() {
ds.metrics.RangefeedRestartStuck.Inc(1)
if catchupRes == nil {
telemetry.Count("rangefeed.stuck.after-catchup-scan")
} else {
telemetry.Count("rangefeed.stuck.during-catchup-scan")
}
return args.Timestamp, errors.Wrapf(errRestartStuckRange, "waiting for r%d %s [threshold %s]", args.RangeID, args.Replica, stuckWatcher.threshold())
}
return args.Timestamp, err
}
stuckWatcher.ping() // starts timer on first event only

msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span}
switch t := event.GetValue().(type) {
case *roachpb.RangeFeedCheckpoint:
Expand Down Expand Up @@ -613,3 +658,6 @@ func legacyRangeFeedEventProducer(
producer, err = client.RangeFeed(ctx, req)
return producer, cleanup, err
}

// sentinel error returned when cancelling rangefeed when it is stuck.
var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity")
112 changes: 112 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package kvcoord

import (
"context"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// stuckRangeFeedCanceler are a defense-in-depth mechanism to restart rangefeeds that have
// not received events from the KV layer in some time. Rangefeeds are supposed to receive
// regular updates, as at the very least they ought to be receiving closed timestamps.
// However, issues[^1] at the KV layer could prevent this.
//
// The canceler is notified via ping() whenever the associated RangeFeed receives an event.
// Should ping() not be called for the configured threshold duration, the provided cancel
// function will be invoked.
//
// This is implemented without incurring nontrivial work on each call to ping().
// Instead, work is done roughly on each threshold interval, which is assumed to
// be large enough (i.e. at least a couple of seconds) to make this negligible.
// Concretely, a timer is set that would invoke the cancellation, and the timer
// is reset on the first call to ping() after the timer is at least half
// expired. That way, we allocate only ~twice per eventCheckInterval, which is
// acceptable.
//
// The canceler detects changes to the configured threshold duration on each call
// to ping(), i.e. in the common case of no stuck rangefeeds, it will ~immediately
// pick up the new value and apply it.
type stuckRangeFeedCanceler struct {
threshold func() time.Duration
cancel context.CancelFunc
t *time.Timer
resetTimerAfter time.Time
activeThreshold time.Duration

_stuck int32 // atomic
}

// stuck returns true if the stuck detection got triggered.
// If this returns true, the cancel function will be invoked
// shortly, if it hasn't already.
func (w *stuckRangeFeedCanceler) stuck() bool {
return atomic.LoadInt32(&w._stuck) != 0
}

// stop releases the active timer, if any. It should be invoked
// unconditionally before the canceler goes out of scope.
func (w *stuckRangeFeedCanceler) stop() {
if w.t != nil {
w.t.Stop()
w.t = nil
w.activeThreshold = 0
}
}

// ping notifies the canceler that the rangefeed has received an
// event, i.e. is making progress.
func (w *stuckRangeFeedCanceler) ping() {
threshold := w.threshold()
if threshold == 0 {
w.stop()
return
}

mkTimer := func() {
w.activeThreshold = threshold
w.t = time.AfterFunc(threshold, func() {
// NB: important to store _stuck before canceling, since we
// want the caller to be able to detect stuck() after ctx
// cancels.
atomic.StoreInt32(&w._stuck, 1)
w.cancel()
})
w.resetTimerAfter = timeutil.Now().Add(threshold / 2)
}

if w.t == nil {
mkTimer()
} else if w.resetTimerAfter.Before(timeutil.Now()) || w.activeThreshold != threshold {
w.stop()
mkTimer()
}
}

// newStuckRangeFeedCanceler sets up a canceler with the provided
// cancel function (which should cancel the rangefeed if invoked)
// and uses the kv.rangefeed.range_stuck_threshold cluster setting
// to (reactively) configure the timeout.
//
// The timer will only activate with the first call to ping.
func newStuckRangeFeedCanceler(
cancel context.CancelFunc, threshold func() time.Duration,
) *stuckRangeFeedCanceler {
w := &stuckRangeFeedCanceler{
threshold: threshold,
cancel: cancel,
}
return w
}
65 changes: 65 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package kvcoord

import (
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

type cancelRec int32

func (c *cancelRec) cancel() {
atomic.StoreInt32((*int32)(c), 1)
}

func (c *cancelRec) canceled() bool {
return atomic.LoadInt32((*int32)(c)) != 0
}

func TestStuckRangeFeedCanceler(t *testing.T) {
defer leaktest.AfterTest(t)()

_dur := int64(24 * time.Hour) // atomic
var cr cancelRec
c := newStuckRangeFeedCanceler(cr.cancel, func() time.Duration {
return time.Duration(atomic.LoadInt64(&_dur))
})
require.Nil(t, c.t) // not running upon creation
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
require.False(t, c.stuck())
c.ping()
require.NotNil(t, c.t) // first call to ping sets off timer
}
atomic.StoreInt64(&_dur, int64(time.Nanosecond))
// Nothing has reset the timer yet, so we won't be stuck here.
// This isn't great but it is true, so documenting it.
require.False(t, c.stuck())
// Ping will update the timer, so it will fire very soon.
c.ping()
require.Eventually(t, cr.canceled, time.Second /* max */, 5*time.Nanosecond /* tick */)
require.True(t, c.stuck())

atomic.StoreInt64(&_dur, int64(24*time.Hour))

// Stays marked as stuck even when we ping it again.
for i := 0; i < 10; i++ {
time.Sleep(time.Nanosecond)
require.True(t, c.stuck())
c.ping()
}
}
Loading

0 comments on commit 6b4f40d

Please sign in to comment.