Skip to content

Commit

Permalink
Merge #97212
Browse files Browse the repository at this point in the history
97212: kvserver: rangefeeds check span validity before settings r=AlexTalks a=AlexTalks

Previously, rangefeed requests checked the rangefeed enabled cluster setting, or the `RangefeedEnabled` override from the `SpanConfig` intended for ranges on system tables, prior to checking if the request's span was actually in the range. This meant that if a DistSender's range cache was outdated, it could end up sending a rangefeed request to the wrong range and never get the `RangeKeyMismatchError` needed to evict the routing information from the cache, if the outdated range did not have that override enabled.

Fixes #95177.

Release note: None

Co-authored-by: Alex Sarkesian <[email protected]>
  • Loading branch information
craig[bot] and AlexTalks committed Feb 17, 2023
2 parents 5acfd5b + 0dbcfcb commit c00dbe8
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 6 deletions.
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/docs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
Expand Down Expand Up @@ -1175,6 +1176,10 @@ func (r *Replica) isRangefeedEnabled() (ret bool) {
r.mu.RLock()
defer r.mu.RUnlock()

return r.isRangefeedEnabledRLocked()
}

func (r *Replica) isRangefeedEnabledRLocked() (ret bool) {
if !r.mu.spanConfigExplicitlySet {
return true
}
Expand Down Expand Up @@ -1770,6 +1775,9 @@ func (r *Replica) checkExecutionCanProceedForRangeFeed(
return err
} else if err := r.checkSpanInRangeRLocked(ctx, rSpan); err != nil {
return err
} else if !r.isRangefeedEnabledRLocked() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`))
} else if err := r.checkTSAboveGCThresholdRLocked(ts, status, false /* isAdmin */); err != nil {
return err
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/docs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
Expand Down Expand Up @@ -146,10 +145,6 @@ func (r *Replica) rangeFeedWithRangeID(
stream roachpb.RangeFeedEventSink,
pacer *admission.Pacer,
) *roachpb.Error {
if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`))
}
ctx := r.AnnotateCtx(stream.Context())

rSpan, err := keys.SpanAddr(args.Span)
Expand Down
83 changes: 82 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func TestReplicaRangefeed(t *testing.T) {
})
}

func TestReplicaRangefeedRetryErrors(t *testing.T) {
func TestReplicaRangefeedErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -930,6 +930,87 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
pErr := <-streamErrC
assertRangefeedRetryErr(t, pErr, roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING)
})
t.Run("range key mismatch", func(t *testing.T) {
knobs := base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Use a span config override to check that we get a key mismatch error
// despite the span config's setting whenever the key is outside the
// bounds of the range.
SetSpanConfigInterceptor: func(desc *roachpb.RangeDescriptor, conf roachpb.SpanConfig) roachpb.SpanConfig {
if desc.ContainsKey(roachpb.RKey(keys.ScratchRangeMin)) {
conf.RangefeedEnabled = false
return conf
} else if desc.ContainsKey(startRKey) {
conf.RangefeedEnabled = true
return conf
}
return conf
},
},
}
tc, _ := setup(t, knobs)
defer tc.Stopper().Stop(ctx)

ts := tc.Servers[0]
store, err := ts.Stores().GetStore(ts.GetFirstStoreID())
if err != nil {
t.Fatal(err)
}
// Split the range so that the RHS should have a span config with
// rangefeeds enabled (like a range on a system table would), while the
// LHS does not. A rangefeed request on the LHS should still return a
// RangeKeyMismatchError given the span is outside the range, even though
// rangefeeds are not enabled.
tc.SplitRangeOrFatal(t, startKey)

leftReplica := store.LookupReplica(roachpb.RKey(keys.ScratchRangeMin))
leftRangeID := leftReplica.RangeID
rightReplica := store.LookupReplica(startRKey)
rightRangeID := rightReplica.RangeID

// Attempt to establish a rangefeed, sending the request to the LHS.
stream := newTestStream()
streamErrC := make(chan *roachpb.Error, 1)

endKey := keys.ScratchRangeMax
rangefeedSpan := roachpb.Span{Key: startKey, EndKey: endKey}

go func() {
req := roachpb.RangeFeedRequest{
Header: roachpb.Header{
RangeID: leftRangeID,
},
Span: rangefeedSpan,
}
timer := time.AfterFunc(10*time.Second, stream.Cancel)
defer timer.Stop()
streamErrC <- store.RangeFeed(&req, stream)
}()

// Check the error.
pErr := <-streamErrC
if _, ok := pErr.GetDetail().(*roachpb.RangeKeyMismatchError); !ok {
t.Fatalf("got incorrect error for RangeFeed: %v; expecting RangeKeyMismatchError", pErr)
}

// Now send the range feed request to the correct replica, which should not
// encounter errors.
stream = newTestStream()
go func() {
req := roachpb.RangeFeedRequest{
Header: roachpb.Header{
RangeID: rightRangeID,
},
Span: rangefeedSpan,
}
timer := time.AfterFunc(10*time.Second, stream.Cancel)
defer timer.Stop()
streamErrC <- store.RangeFeed(&req, stream)
}()

// Wait for the first checkpoint event.
waitForInitialCheckpointAcrossSpan(t, stream, streamErrC, rangefeedSpan)
})
}

// TestReplicaRangefeedMVCCHistoryMutationError tests that rangefeeds are
Expand Down

0 comments on commit c00dbe8

Please sign in to comment.