From 0dbcfcb005d94f3c3b0b8119daaea9a8b8f717c5 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Wed, 15 Feb 2023 15:59:09 -0500 Subject: [PATCH] kvserver: rangefeeds check span validity before settings Previously, rangefeed requests checked the rangefeed enabled cluster setting, or the `RangefeedEnabled` override from the `SpanConfig` intended for ranges on system tables, prior to checking if the request's span was actually in the range. This meant that if a DistSender's range cache was outdated, it could end up sending a rangefeed request to the wrong range and never get the `RangeKeyMismatchError` needed to evict the routing information from the cache, if the outdated range did not have that override enabled. Fixes #95177. Release note (bug fix): Since 22.1 when rangefeed enablement overrides in span configs were introduced, rangefeed requests that reached spans outside the range would not cause range cache invalidation due to the setting being checked first, thus requests could repeatedly hit the same incorrect range, causing errors until cache invalidation or node restart. This fix correctly checks that the span is within the range prior to checking the enablement settings, thus invalidating the cache when a request reaches an incorrect range and causing subsequent requests to successfully reach the correct range. --- pkg/kv/kvserver/replica.go | 8 +++ pkg/kv/kvserver/replica_rangefeed.go | 5 -- pkg/kv/kvserver/replica_rangefeed_test.go | 83 ++++++++++++++++++++++- 3 files changed, 90 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 2940c828308f..8b56287695ca 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -18,6 +18,7 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" @@ -1175,6 +1176,10 @@ func (r *Replica) isRangefeedEnabled() (ret bool) { r.mu.RLock() defer r.mu.RUnlock() + return r.isRangefeedEnabledRLocked() +} + +func (r *Replica) isRangefeedEnabledRLocked() (ret bool) { if !r.mu.spanConfigExplicitlySet { return true } @@ -1770,6 +1775,9 @@ func (r *Replica) checkExecutionCanProceedForRangeFeed( return err } else if err := r.checkSpanInRangeRLocked(ctx, rSpan); err != nil { return err + } else if !r.isRangefeedEnabledRLocked() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { + return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", + docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) } else if err := r.checkTSAboveGCThresholdRLocked(ts, status, false /* isAdmin */); err != nil { return err } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index fcaacce991b9..8852c0a90d1b 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -16,7 +16,6 @@ import ( "sync" "time" - "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" @@ -146,10 +145,6 @@ func (r *Replica) rangeFeedWithRangeID( stream roachpb.RangeFeedEventSink, pacer *admission.Pacer, ) *roachpb.Error { - if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { - return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", - docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) - } ctx := r.AnnotateCtx(stream.Context()) rSpan, err := keys.SpanAddr(args.Span) diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index e0d474c4da0c..469b3cf032fa 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -493,7 +493,7 @@ func TestReplicaRangefeed(t *testing.T) { }) } -func TestReplicaRangefeedRetryErrors(t *testing.T) { +func TestReplicaRangefeedErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -930,6 +930,87 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { pErr := <-streamErrC assertRangefeedRetryErr(t, pErr, roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING) }) + t.Run("range key mismatch", func(t *testing.T) { + knobs := base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Use a span config override to check that we get a key mismatch error + // despite the span config's setting whenever the key is outside the + // bounds of the range. + SetSpanConfigInterceptor: func(desc *roachpb.RangeDescriptor, conf roachpb.SpanConfig) roachpb.SpanConfig { + if desc.ContainsKey(roachpb.RKey(keys.ScratchRangeMin)) { + conf.RangefeedEnabled = false + return conf + } else if desc.ContainsKey(startRKey) { + conf.RangefeedEnabled = true + return conf + } + return conf + }, + }, + } + tc, _ := setup(t, knobs) + defer tc.Stopper().Stop(ctx) + + ts := tc.Servers[0] + store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + // Split the range so that the RHS should have a span config with + // rangefeeds enabled (like a range on a system table would), while the + // LHS does not. A rangefeed request on the LHS should still return a + // RangeKeyMismatchError given the span is outside the range, even though + // rangefeeds are not enabled. + tc.SplitRangeOrFatal(t, startKey) + + leftReplica := store.LookupReplica(roachpb.RKey(keys.ScratchRangeMin)) + leftRangeID := leftReplica.RangeID + rightReplica := store.LookupReplica(startRKey) + rightRangeID := rightReplica.RangeID + + // Attempt to establish a rangefeed, sending the request to the LHS. + stream := newTestStream() + streamErrC := make(chan *roachpb.Error, 1) + + endKey := keys.ScratchRangeMax + rangefeedSpan := roachpb.Span{Key: startKey, EndKey: endKey} + + go func() { + req := roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + RangeID: leftRangeID, + }, + Span: rangefeedSpan, + } + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() + streamErrC <- store.RangeFeed(&req, stream) + }() + + // Check the error. + pErr := <-streamErrC + if _, ok := pErr.GetDetail().(*roachpb.RangeKeyMismatchError); !ok { + t.Fatalf("got incorrect error for RangeFeed: %v; expecting RangeKeyMismatchError", pErr) + } + + // Now send the range feed request to the correct replica, which should not + // encounter errors. + stream = newTestStream() + go func() { + req := roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + RangeID: rightRangeID, + }, + Span: rangefeedSpan, + } + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() + streamErrC <- store.RangeFeed(&req, stream) + }() + + // Wait for the first checkpoint event. + waitForInitialCheckpointAcrossSpan(t, stream, streamErrC, rangefeedSpan) + }) } // TestReplicaRangefeedMVCCHistoryMutationError tests that rangefeeds are