Skip to content

Commit

Permalink
kvcoord: reliably handle stuck watcher error
Browse files Browse the repository at this point in the history
Front-ports parts of cockroachdb#87253.

When a rangefeed gets stuck, and the server is local, the server might
notice the cancellation before the client, and may send a cancellation
error back in a rangefeed event.

We now handle this the same as the other case (where the stream client
errors out due to the cancellation).

This also checks in the test from
cockroachdb#87253 (which is on
release-22.1).

Fixes cockroachdb#87370.

No release note since this will be backported to release-22.2
Release note: None
  • Loading branch information
tbg committed Sep 12, 2022
1 parent fd588f6 commit 826f4d7
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ go_test(
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
],
Expand Down
28 changes: 21 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,13 +610,8 @@ func (ds *DistSender) singleRangeFeed(
}
if err != nil {
if stuckWatcher.stuck() {
ds.metrics.RangefeedRestartStuck.Inc(1)
if catchupRes == nil {
telemetry.Count("rangefeed.stuck.after-catchup-scan")
} else {
telemetry.Count("rangefeed.stuck.during-catchup-scan")
}
return args.Timestamp, errors.Wrapf(errRestartStuckRange, "waiting for r%d %s [threshold %s]", args.RangeID, args.Replica, stuckWatcher.threshold())
afterCatchUpScan := catchupRes == nil
return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold())
}
return args.Timestamp, err
}
Expand All @@ -638,6 +633,13 @@ func (ds *DistSender) singleRangeFeed(
if catchupRes != nil {
ds.metrics.RangefeedErrorCatchup.Inc(1)
}
if stuckWatcher.stuck() {
// When the stuck watcher fired, and the rangefeed call is local,
// the remote might notice the cancellation first and return from
// Recv with an error that we need to special-case here.
afterCatchUpScan := catchupRes == nil
return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold())
}
return args.Timestamp, t.Error.GoError()
}
onRangeEvent(args.Replica.NodeID, desc.RangeID, event)
Expand Down Expand Up @@ -674,5 +676,17 @@ func legacyRangeFeedEventProducer(
return producer, cleanup, err
}

func (ds *DistSender) handleStuckEvent(
args *roachpb.RangeFeedRequest, afterCatchupScan bool, threshold time.Duration,
) error {
ds.metrics.RangefeedRestartStuck.Inc(1)
if afterCatchupScan {
telemetry.Count("rangefeed.stuck.after-catchup-scan")
} else {
telemetry.Count("rangefeed.stuck.during-catchup-scan")
}
return errors.Wrapf(errRestartStuckRange, "waiting for r%d %s [threshold %s]", args.RangeID, args.Replica, threshold)
}

// sentinel error returned when cancelling rangefeed when it is stuck.
var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity")
131 changes: 131 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ package kvcoord_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
Expand All @@ -35,8 +37,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type wrapRangeFeedClientFn func(client roachpb.Internal_RangeFeedClient) roachpb.Internal_RangeFeedClient
Expand Down Expand Up @@ -453,3 +457,130 @@ func TestRestartsStuckRangeFeeds(t *testing.T) {
require.True(t, blockingClient.ctxCanceled)
require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().RangefeedRestartStuck.Count())
}

func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

type testKey struct{}

ctx := context.Background()

var canceled int32 // atomic

var doneErr = errors.New("gracefully terminating test")

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRangefeedFilter: func(args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink) *roachpb.Error {
md, ok := metadata.FromIncomingContext(stream.Context())
if (!ok || len(md[t.Name()]) == 0) && stream.Context().Value(testKey{}) == nil {
return nil
}
if atomic.LoadInt32(&canceled) != 0 {
return roachpb.NewError(doneErr)
}

t.Logf("intercepting %s", args)
// Send a first response to "arm" the stuck detector in DistSender.
if assert.NoError(t, stream.Send(&roachpb.RangeFeedEvent{Checkpoint: &roachpb.RangeFeedCheckpoint{
Span: args.Span,
ResolvedTS: hlc.Timestamp{Logical: 1},
}})) {
t.Log("sent first event, now blocking")
}
select {
case <-time.After(testutils.DefaultSucceedsSoonDuration):
return roachpb.NewErrorf("timed out waiting for stuck rangefeed's ctx cancellation")
case <-stream.Context().Done():
t.Log("server side rangefeed canceled (as expected)")
atomic.StoreInt32(&canceled, 1)
}
return nil
},
},
},
},
})
defer tc.Stopper().Stop(ctx)

ts := tc.Server(0)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
startTime := ts.Clock().Now()

for _, stmt := range []string{
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.rangefeed.range_stuck_threshold='1s'`,
} {
sqlDB.Exec(t, stmt)
}

span := func() roachpb.Span {
desc := tc.LookupRangeOrFatal(t, tc.ScratchRange(t))
t.Logf("r%d", desc.RangeID)
return desc.RSpan().AsRawSpanWithNoLocals()
}()

ds := ts.DistSenderI().(*kvcoord.DistSender)

// Use both gRPC metadata and a local ctx key to tag the context for the
// outgoing rangefeed. At time of writing, we're bypassing gRPC due to
// the local optimization, but it's not worth special casing on that.
ctx = metadata.AppendToOutgoingContext(ctx, t.Name(), "please block me")

rangeFeed := func(
t *testing.T,
ctx context.Context,
ds *kvcoord.DistSender,
sp roachpb.Span,
startFrom hlc.Timestamp,
) (_cancel func(), _wait func() error) {
events := make(chan kvcoord.RangeFeedMessage)
ctx, cancel := context.WithCancel(ctx)
{
origCancel := cancel
cancel = func() {
t.Helper()
t.Log("cancel invoked")
origCancel()
}
}

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
defer close(events)
err := ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events)
t.Logf("from RangeFeed: %v", err)
return err
})
g.GoCtx(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil // expected
case ev := <-events:
t.Logf("from consumer: %+v", ev)
case <-time.After(testutils.DefaultSucceedsSoonDuration):
return errors.New("timed out waiting to consume events")
}
}
})

return cancel, g.Wait
}

cancel, wait := rangeFeed(t, context.WithValue(ctx, testKey{}, testKey{}), ds, span, startTime)
defer time.AfterFunc(testutils.DefaultSucceedsSoonDuration, cancel).Stop()
{
err := wait()
require.True(t, errors.Is(err, doneErr), "%+v", err)
}

require.EqualValues(t, 1, atomic.LoadInt32(&canceled))
// NB: We really expect exactly 1 but with a 1s timeout, it's not inconceivable that
// on a particularly slow CI machine some unrelated rangefeed could also catch the occasional
// retry.
require.NotZero(t, ds.Metrics().RangefeedRestartStuck.Count())
}

0 comments on commit 826f4d7

Please sign in to comment.