Skip to content

Commit

Permalink
kv: Restart stuck RangeFeed.
Browse files Browse the repository at this point in the history
It has been observed in the wild, that rangefeeds (and changefeeds
that use them) would appear to be stuck and not make any progress.
It has been determined that, for reasons yet unknown, the
RangeFeed RPC stops receiving events from the server, even though
the contract indicates that such events should always come in
since events also include range checkpoint records that should always
be emitted periodically.

This PR introduces a defense at depth mechanism to client side
range feed library so that ranges that appear to be stuck are
restarted automatically.

Release justification: stability improvement.
Release note (enterprise change): Rangefeed client library (used
by changefeeds among other things) is more resilient to rangefeed
RPC appearing to be stuck by restarting stuck rangefeeds automatically.
  • Loading branch information
Yevgeniy Miretskiy committed Aug 25, 2022
1 parent 07cb344 commit 3b78592
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 20 deletions.
94 changes: 89 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ var catchupScanConcurrency = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.rangefeed.range_stuck_threshold",
"restart rangefeeds if they appear to be stuck for the specified threshold; 0 disables",
time.Minute,
settings.NonNegativeDuration,
)

func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
Expand Down Expand Up @@ -203,6 +211,7 @@ func (ds *DistSender) RangeFeedSpans(
})
}(s)
}

return g.Wait()
}

Expand Down Expand Up @@ -408,9 +417,17 @@ func (ds *DistSender) partialRangeFeed(
switch {
case errors.HasType(err, (*roachpb.StoreNotFoundError)(nil)) ||
errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)):
// These errors are likely to be unique to the replica that
// reported them, so no action is required before the next
// retry.
// These errors are likely to be unique to the replica that
// reported them, so no action is required before the next
// retry.
case errors.Is(err, restartStuckErr):
// Stuck ranges indicate a bug somewhere in the system. We are being
// defensive and attempt to restart this rangefeed. Usually, any
// stuck-ness is cleared out if we just attempt to re-resolve range
// descriptor and retry.
token.Evict(ctx)
token = rangecache.EvictionToken{}
continue
case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
// Evict the descriptor from the cache and reload on next attempt.
token.Evict(ctx)
Expand Down Expand Up @@ -472,8 +489,8 @@ func (ds *DistSender) singleRangeFeed(
onRangeEvent onRangeEventCb,
) (hlc.Timestamp, error) {
// Ensure context is cancelled on all errors, to prevent gRPC stream leaks.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancelFeed := context.WithCancel(ctx)
defer cancelFeed()

args := roachpb.RangeFeedRequest{
Span: span,
Expand Down Expand Up @@ -518,6 +535,9 @@ func (ds *DistSender) singleRangeFeed(
// cleanup catchup reservation in case of early termination.
defer finishCatchupScan()

stuckWatcher := makeStuckRangeWatcher(cancelFeed, &ds.st.SV)
defer stuckWatcher.stop()

var streamCleanup func()
maybeCleanupStream := func() {
if streamCleanup != nil {
Expand Down Expand Up @@ -560,8 +580,13 @@ func (ds *DistSender) singleRangeFeed(
return args.Timestamp, nil
}
if err != nil {
if stuckWatcher.wasStuck {
return args.Timestamp, restartStuckErr
}
return args.Timestamp, err
}
stuckWatcher.recordEvent()

msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span}
switch t := event.GetValue().(type) {
case *roachpb.RangeFeedCheckpoint:
Expand Down Expand Up @@ -613,3 +638,62 @@ func legacyRangeFeedEventProducer(
producer, err = client.RangeFeed(ctx, req)
return producer, cleanup, err
}

// sentinel error returned when cancelling rangefeed when it is stuck.
var restartStuckErr = errors.New("rangefeed restarting due to liveness")

// We want to cancel the context if we don't receive an event
// in a while. We try to do this without doing too much work
// (allocations, etc.) for each message received to bound the
// overhead in case everything is working fine and messages
// are potentially rushing in at high frequency. To do this,
// we set up a timer that would cancel the context, and
// whenever it is ~half expired, stop it (after the next
// message is there) and re-start it. That way, we allocate
// only ~twice per eventCheckInterval, which is acceptable.
type stuckRangeWatcher struct {
wasStuck bool
cancel context.CancelFunc
t *time.Timer
sv *settings.Values
resetTimerAfter time.Time
}

func (w *stuckRangeWatcher) stop() {
if w.t != nil {
w.t.Stop()
w.t = nil
}
}

func (w *stuckRangeWatcher) recordEvent() {
stuckThreshold := rangefeedRangeStuckThreshold.Get(w.sv)
if stuckThreshold == 0 {
w.stop()
return
}

mkTimer := func() {
w.t = time.AfterFunc(stuckThreshold, func() {
w.wasStuck = true
w.cancel()
})
w.resetTimerAfter = timeutil.Now().Add(stuckThreshold / 2)
}

if w.t == nil {
mkTimer()
} else if w.resetTimerAfter.Before(timeutil.Now()) {
w.stop()
mkTimer()
}
}

func makeStuckRangeWatcher(cancel context.CancelFunc, sv *settings.Values) *stuckRangeWatcher {
w := &stuckRangeWatcher{
cancel: cancel,
sv: sv,
}
w.recordEvent()
return w
}
111 changes: 96 additions & 15 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ import (
"google.golang.org/grpc"
)

type wrapRangeFeedClientFn func(client roachpb.Internal_RangeFeedClient) roachpb.Internal_RangeFeedClient
type testRangefeedClient struct {
rpc.RestrictedInternalClient
muxRangeFeedEnabled bool
count func()
wrapRangeFeedClient wrapRangeFeedClientFn
}

func (c *testRangefeedClient) RangeFeed(
Expand All @@ -53,7 +55,15 @@ func (c *testRangefeedClient) RangeFeed(
if c.muxRangeFeedEnabled && ctx.Value(useMuxRangeFeedCtxKey{}) != nil {
panic(errors.AssertionFailedf("unexpected call to RangeFeed"))
}
return c.RestrictedInternalClient.RangeFeed(ctx, args, opts...)

rfClient, err := c.RestrictedInternalClient.RangeFeed(ctx, args, opts...)
if err != nil {
return nil, err
}
if c.wrapRangeFeedClient == nil {
return rfClient, nil
}
return c.wrapRangeFeedClient(rfClient), nil
}

func (c *testRangefeedClient) MuxRangeFeed(
Expand Down Expand Up @@ -82,9 +92,10 @@ func (c *internalClientCounts) Inc(ic rpc.RestrictedInternalClient) {
}

type countConnectionsTransport struct {
wrapped kvcoord.Transport
counts *internalClientCounts
rfStreamEnabled bool
wrapped kvcoord.Transport
counts *internalClientCounts
wrapRangeFeedClient wrapRangeFeedClientFn
rfStreamEnabled bool
}

var _ kvcoord.Transport = (*countConnectionsTransport)(nil)
Expand All @@ -110,17 +121,21 @@ func (c *countConnectionsTransport) NextInternalClient(
return nil, err
}

// Use regular client if we're not running this tests rangefeed.
if ctx.Value(testFeedCtxKey{}) == nil {
return client, nil
}

tc := &testRangefeedClient{
RestrictedInternalClient: client,
muxRangeFeedEnabled: c.rfStreamEnabled,
wrapRangeFeedClient: c.wrapRangeFeedClient,
}
// Count rangefeed calls but only for feeds started by this test.
if ctx.Value(testFeedCtxKey{}) != nil {
tc.count = func() {

tc.count = func() {
if c.counts != nil {
c.counts.Inc(tc)
}
} else {
tc.count = func() {}
}

return tc, nil
Expand All @@ -143,7 +158,7 @@ func (c *countConnectionsTransport) Release() {
}

func makeTransportFactory(
rfStreamEnabled bool, counts *internalClientCounts,
rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn,
) kvcoord.TransportFactory {
return func(
options kvcoord.SendOptions,
Expand All @@ -155,9 +170,10 @@ func makeTransportFactory(
return nil, err
}
countingTransport := &countConnectionsTransport{
wrapped: transport,
rfStreamEnabled: rfStreamEnabled,
counts: counts,
wrapped: transport,
rfStreamEnabled: rfStreamEnabled,
counts: counts,
wrapRangeFeedClient: wrapFn,
}
return countingTransport, nil
}
Expand Down Expand Up @@ -249,7 +265,7 @@ func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) {
Settings: st,
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
TransportFactory: makeTransportFactory(false, nil),
TransportFactory: makeTransportFactory(false, nil, nil),
},

Server: &server.TestingKnobs{
Expand Down Expand Up @@ -314,7 +330,7 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
TransportFactory: makeTransportFactory(true, connCounts),
TransportFactory: makeTransportFactory(true, connCounts, nil),
},
},
},
Expand Down Expand Up @@ -361,3 +377,68 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) {
require.Equal(t, 1, c)
}
}

type blockRecvRangeFeedClient struct {
roachpb.Internal_RangeFeedClient
wasStuck bool
}

func (b *blockRecvRangeFeedClient) Recv() (*roachpb.RangeFeedEvent, error) {
if !b.wasStuck {
ctx := b.Internal_RangeFeedClient.Context()
<-ctx.Done()
b.wasStuck = true
return nil, ctx.Err()
}
return b.Internal_RangeFeedClient.Recv()
}

var _ roachpb.Internal_RangeFeedClient = (*blockRecvRangeFeedClient)(nil)

func TestRestartsStuckRangeFeeds(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

blockingClient := &blockRecvRangeFeedClient{}
var wrapRfClient wrapRangeFeedClientFn = func(client roachpb.Internal_RangeFeedClient) roachpb.Internal_RangeFeedClient {
blockingClient.Internal_RangeFeedClient = client
return blockingClient
}

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
TransportFactory: makeTransportFactory(false, nil, wrapRfClient),
},
},
},
})
defer tc.Stopper().Stop(ctx)

ts := tc.Server(0)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
startTime := ts.Clock().Now()

// Create a table, and split it so that we have multiple ranges, distributed across
// test cluster nodes.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.rangefeed.range_stuck_threshold='1s'`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 100)`,
)

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo")
fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)

allSeen, onValue := observeNValues(100)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false)
channelWaitWithTimeout(t, allSeen)
closeFeed()

require.True(t, blockingClient.wasStuck)
}

0 comments on commit 3b78592

Please sign in to comment.