diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 915640792134..edf68ae7649e 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -417,6 +417,7 @@ go_test( "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/raftutil", + "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/kv/kvserver/replicastats", diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index e399994f3ce0..0ce05c34d18a 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -27,10 +27,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -663,3 +665,13 @@ func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[stri } return diffMap } + +func NewRangefeedTxnPusher( + ir *intentresolver.IntentResolver, r *Replica, span roachpb.RSpan, +) rangefeed.TxnPusher { + return &rangefeedTxnPusher{ + ir: ir, + r: r, + span: span, + } +} diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index ccb843278acd..d8047bae184a 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -194,10 +194,19 @@ func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error { // Execute a Barrier on the leaseholder, and obtain its LAI. Error out on any // range changes (e.g. splits/merges) that we haven't applied yet. lai, desc, err := tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey) - if err != nil { - if errors.HasType(err, &kvpb.RangeKeyMismatchError{}) { - return errors.Wrap(err, "range barrier failed, range split") + if err != nil && errors.HasType(err, &kvpb.RangeKeyMismatchError{}) { + // The DistSender may have a stale range descriptor, e.g. following a merge. + // Failed unsplittable requests don't trigger a refresh, so we have to + // attempt to refresh it by sending a Get request to the start key. + // + // TODO(erikgrinaker): the DistSender should refresh its cache instead. + if _, err := tp.r.store.db.Get(ctx, tp.span.Key); err != nil { + return errors.Wrap(err, "range barrier failed: range descriptor refresh failed") } + // Retry the Barrier. + lai, desc, err = tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey) + } + if err != nil { return errors.Wrap(err, "range barrier failed") } if lai == 0 { diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index c2aa55ec8061..bb9f2395b457 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/future" @@ -1652,3 +1653,100 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) { rangeFeedCancel() } + +// TestRangefeedTxnPusherBarrierRangeKeyMismatch is a regression test for +// https://github.com/cockroachdb/cockroach/issues/119333 +// +// Specifically, it tests that a Barrier call that encounters a +// RangeKeyMismatchError will eagerly attempt to refresh the DistSender range +// cache. The DistSender does not do this itself for unsplittable requests, so +// it might otherwise continually fail. +func TestRangefeedTxnPusherBarrierRangeKeyMismatch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // too slow, times out + skip.UnderDeadlock(t) + + // Use a timeout, to prevent a hung test. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Start a cluster with 3 nodes. + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{}, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + defer cancel() + + n1 := tc.Server(0) + n3 := tc.Server(2) + db1 := n1.ApplicationLayer().DB() + db3 := n3.ApplicationLayer().DB() + + // Split off a range and upreplicate it, with leaseholder on n1. This is the + // range we'll run the barrier across. + prefix := append(n1.ApplicationLayer().Codec().TenantPrefix(), keys.ScratchRangeMin...) + _, _, err := n1.StorageLayer().SplitRange(prefix) + require.NoError(t, err) + desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...) + t.Logf("split off range %s", desc) + + rspan := desc.RSpan() + span := rspan.AsRawSpanWithNoLocals() + + // Split off three other ranges. + splitKeys := []roachpb.Key{ + append(prefix.Clone(), roachpb.Key("/a")...), + append(prefix.Clone(), roachpb.Key("/b")...), + append(prefix.Clone(), roachpb.Key("/c")...), + } + for _, key := range splitKeys { + _, desc, err = n1.StorageLayer().SplitRange(key) + require.NoError(t, err) + t.Logf("split off range %s", desc) + } + + // Scan the ranges on n3 to update the range caches, then run a barrier + // request which should fail with RangeKeyMismatchError. + _, err = db3.Scan(ctx, span.Key, span.EndKey, 0) + require.NoError(t, err) + + _, _, err = db3.BarrierWithLAI(ctx, span.Key, span.EndKey) + require.Error(t, err) + require.IsType(t, &kvpb.RangeKeyMismatchError{}, err) + t.Logf("n3 barrier returned %s", err) + + // Merge the ranges on n1. n3 will still have stale range descriptors, and + // fail the barrier request. + for range splitKeys { + desc, err = n1.StorageLayer().MergeRanges(span.Key) + require.NoError(t, err) + t.Logf("merged range %s", desc) + } + + // Barriers should now succeed on n1, which have an updated range cache, but + // fail on n3 which doesn't. + lai, _, err := db1.BarrierWithLAI(ctx, span.Key, span.EndKey) + require.NoError(t, err) + t.Logf("n1 barrier returned LAI %d", lai) + + _, _, err = db3.BarrierWithLAI(ctx, span.Key, span.EndKey) + require.Error(t, err) + require.IsType(t, &kvpb.RangeKeyMismatchError{}, err) + t.Logf("n3 barrier returned %s", err) + + // However, rangefeedTxnPusher.Barrier() will refresh the cache and + // successfully apply the barrier. + s3 := tc.GetFirstStoreFromServer(t, 2) + repl3 := s3.LookupReplica(roachpb.RKey(span.Key)) + t.Logf("repl3 desc: %s", repl3.Desc()) + txnPusher := kvserver.NewRangefeedTxnPusher(nil, repl3, rspan) + require.NoError(t, txnPusher.Barrier(ctx)) + t.Logf("n3 txnPusher barrier succeeded") +}