diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 2940c828308f..8b56287695ca 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -18,6 +18,7 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" @@ -1175,6 +1176,10 @@ func (r *Replica) isRangefeedEnabled() (ret bool) { r.mu.RLock() defer r.mu.RUnlock() + return r.isRangefeedEnabledRLocked() +} + +func (r *Replica) isRangefeedEnabledRLocked() (ret bool) { if !r.mu.spanConfigExplicitlySet { return true } @@ -1770,6 +1775,9 @@ func (r *Replica) checkExecutionCanProceedForRangeFeed( return err } else if err := r.checkSpanInRangeRLocked(ctx, rSpan); err != nil { return err + } else if !r.isRangefeedEnabledRLocked() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { + return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", + docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) } else if err := r.checkTSAboveGCThresholdRLocked(ts, status, false /* isAdmin */); err != nil { return err } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index fcaacce991b9..8852c0a90d1b 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -16,7 +16,6 @@ import ( "sync" "time" - "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" @@ -146,10 +145,6 @@ func (r *Replica) rangeFeedWithRangeID( stream roachpb.RangeFeedEventSink, pacer *admission.Pacer, ) *roachpb.Error { - if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { - return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", - docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) - } ctx := r.AnnotateCtx(stream.Context()) rSpan, err := keys.SpanAddr(args.Span) diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index e0d474c4da0c..469b3cf032fa 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -493,7 +493,7 @@ func TestReplicaRangefeed(t *testing.T) { }) } -func TestReplicaRangefeedRetryErrors(t *testing.T) { +func TestReplicaRangefeedErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -930,6 +930,87 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { pErr := <-streamErrC assertRangefeedRetryErr(t, pErr, roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING) }) + t.Run("range key mismatch", func(t *testing.T) { + knobs := base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Use a span config override to check that we get a key mismatch error + // despite the span config's setting whenever the key is outside the + // bounds of the range. + SetSpanConfigInterceptor: func(desc *roachpb.RangeDescriptor, conf roachpb.SpanConfig) roachpb.SpanConfig { + if desc.ContainsKey(roachpb.RKey(keys.ScratchRangeMin)) { + conf.RangefeedEnabled = false + return conf + } else if desc.ContainsKey(startRKey) { + conf.RangefeedEnabled = true + return conf + } + return conf + }, + }, + } + tc, _ := setup(t, knobs) + defer tc.Stopper().Stop(ctx) + + ts := tc.Servers[0] + store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + // Split the range so that the RHS should have a span config with + // rangefeeds enabled (like a range on a system table would), while the + // LHS does not. A rangefeed request on the LHS should still return a + // RangeKeyMismatchError given the span is outside the range, even though + // rangefeeds are not enabled. + tc.SplitRangeOrFatal(t, startKey) + + leftReplica := store.LookupReplica(roachpb.RKey(keys.ScratchRangeMin)) + leftRangeID := leftReplica.RangeID + rightReplica := store.LookupReplica(startRKey) + rightRangeID := rightReplica.RangeID + + // Attempt to establish a rangefeed, sending the request to the LHS. + stream := newTestStream() + streamErrC := make(chan *roachpb.Error, 1) + + endKey := keys.ScratchRangeMax + rangefeedSpan := roachpb.Span{Key: startKey, EndKey: endKey} + + go func() { + req := roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + RangeID: leftRangeID, + }, + Span: rangefeedSpan, + } + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() + streamErrC <- store.RangeFeed(&req, stream) + }() + + // Check the error. + pErr := <-streamErrC + if _, ok := pErr.GetDetail().(*roachpb.RangeKeyMismatchError); !ok { + t.Fatalf("got incorrect error for RangeFeed: %v; expecting RangeKeyMismatchError", pErr) + } + + // Now send the range feed request to the correct replica, which should not + // encounter errors. + stream = newTestStream() + go func() { + req := roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + RangeID: rightRangeID, + }, + Span: rangefeedSpan, + } + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() + streamErrC <- store.RangeFeed(&req, stream) + }() + + // Wait for the first checkpoint event. + waitForInitialCheckpointAcrossSpan(t, stream, streamErrC, rangefeedSpan) + }) } // TestReplicaRangefeedMVCCHistoryMutationError tests that rangefeeds are