From 857229ab864a997ba0edf93b942d007030ea1722 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 22 Feb 2024 12:27:08 +0000 Subject: [PATCH] kvserver: refresh range cache on rangefeed barrier failure The DistSender does not refresh its range cache for unsplittable requests. This could cause a rangefeed transaction pusher barrier request to persistently fail following a range merge if the range cache thought the barrier spanned multiple ranges. This would only resolve once the range cache was refreshed by some other request, which might never happen. This in turn would cause the rangefeed's resolved timestamp to stall. Epic: none Release note (bug fix): fixed a bug where rangefeed resolved timestamps could get stuck, continually emitting the log message "pushing old intents failed: range barrier failed, range split", typically following a range merge. --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/helpers_test.go | 12 +++ pkg/kv/kvserver/replica_rangefeed.go | 25 +++++- pkg/kv/kvserver/replica_rangefeed_test.go | 99 +++++++++++++++++++++++ 4 files changed, 134 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 915640792134..edf68ae7649e 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -417,6 +417,7 @@ go_test( "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/raftutil", + "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/kv/kvserver/replicastats", diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index e399994f3ce0..0ce05c34d18a 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -27,10 +27,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -663,3 +665,13 @@ func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[stri } return diffMap } + +func NewRangefeedTxnPusher( + ir *intentresolver.IntentResolver, r *Replica, span roachpb.RSpan, +) rangefeed.TxnPusher { + return &rangefeedTxnPusher{ + ir: ir, + r: r, + span: span, + } +} diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index ccb843278acd..0b74e22fcba3 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -191,13 +191,32 @@ func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error { // seems very unlikely, but it doesn't really cost us anything. isV24_1 := tp.r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V24_1Start) + // The DistSender may have a stale range descriptor, e.g. following a merge. + // Unsplittable requests don't trigger a refresh, so we have to attempt to + // refresh it by sending a Get request to the start key. We eagerly do this + // instead of waiting for a RangeKeyMismatchError, to avoid spurious errors. + // + // TODO(erikgrinaker): the DistSender should refresh its cache instead. + if _, err := tp.r.store.db.Get(ctx, tp.span.Key); err != nil { + return errors.Wrap(err, "range barrier failed: range descriptor refresh failed") + } + // Execute a Barrier on the leaseholder, and obtain its LAI. Error out on any // range changes (e.g. splits/merges) that we haven't applied yet. lai, desc, err := tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey) - if err != nil { - if errors.HasType(err, &kvpb.RangeKeyMismatchError{}) { - return errors.Wrap(err, "range barrier failed, range split") + if err != nil && errors.HasType(err, &kvpb.RangeKeyMismatchError{}) { + // The DistSender may have a stale range descriptor, e.g. following a merge. + // Failed unsplittable requests don't trigger a refresh, so we have to + // attempt to refresh it by sending a Get request to the start key. + // + // TODO(erikgrinaker): the DistSender should refresh its cache instead. + if _, err := tp.r.store.db.Get(ctx, tp.span.Key); err != nil { + return errors.Wrap(err, "range barrier failed: range descriptor refresh failed") } + // Retry the Barrier. + lai, desc, err = tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey) + } + if err != nil { return errors.Wrap(err, "range barrier failed") } if lai == 0 { diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index c2aa55ec8061..e3ea2d9000e8 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/future" @@ -1652,3 +1653,101 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) { rangeFeedCancel() } + +// TestRangefeedTxnPusherBarrierRangeKeyMismatch is a regression test for +// https://github.com/cockroachdb/cockroach/issues/119333 +// +// Specifically, it tests that a Barrier call that encounters a +// RangeKeyMismatchError will eagerly attempt to refresh the DistSender range +// cache. The DistSender does not do this itself for unsplittable requests, so +// it might otherwise continually fail. +func TestRangefeedTxnPusherBarrierRangeKeyMismatch(t *testing.T) { + + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // too slow, times out + skip.UnderDeadlock(t) + + // Use a timeout, to prevent a hung test. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Start a cluster with 3 nodes. + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{}, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + defer cancel() + + n1 := tc.Server(0) + n3 := tc.Server(2) + db1 := n1.ApplicationLayer().DB() + db3 := n3.ApplicationLayer().DB() + + // Split off a range and upreplicate it, with leaseholder on n1. This is the + // range we'll run the barrier across. + prefix := append(n1.ApplicationLayer().Codec().TenantPrefix(), keys.ScratchRangeMin...) + _, _, err := n1.StorageLayer().SplitRange(prefix) + require.NoError(t, err) + desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...) + t.Logf("split off range %s", desc) + + rspan := desc.RSpan() + span := rspan.AsRawSpanWithNoLocals() + + // Split off three other ranges. + splitKeys := []roachpb.Key{ + append(prefix.Clone(), roachpb.Key("/a")...), + append(prefix.Clone(), roachpb.Key("/b")...), + append(prefix.Clone(), roachpb.Key("/c")...), + } + for _, key := range splitKeys { + _, desc, err = n1.StorageLayer().SplitRange(key) + require.NoError(t, err) + t.Logf("split off range %s", desc) + } + + // Scan the ranges on n3 to update the range caches, then run a barrier + // request which should fail with RangeKeyMismatchError. + _, err = db3.Scan(ctx, span.Key, span.EndKey, 0) + require.NoError(t, err) + + _, _, err = db3.BarrierWithLAI(ctx, span.Key, span.EndKey) + require.Error(t, err) + require.IsType(t, &kvpb.RangeKeyMismatchError{}, err) + t.Logf("n3 barrier returned %s", err) + + // Merge the ranges on n1. n3 will still have stale range descriptors, and + // fail the barrier request. + for range splitKeys { + desc, err = n1.StorageLayer().MergeRanges(span.Key) + require.NoError(t, err) + t.Logf("merged range %s", desc) + } + + // Barriers should now succeed on n1, which have an updated range cache, but + // fail on n3 which doesn't. + lai, _, err := db1.BarrierWithLAI(ctx, span.Key, span.EndKey) + require.NoError(t, err) + t.Logf("n1 barrier returned LAI %d", lai) + + _, _, err = db3.BarrierWithLAI(ctx, span.Key, span.EndKey) + require.Error(t, err) + require.IsType(t, &kvpb.RangeKeyMismatchError{}, err) + t.Logf("n3 barrier returned %s", err) + + // However, rangefeedTxnPusher.Barrier() will refresh the cache and + // successfully apply the barrier. + s3 := tc.GetFirstStoreFromServer(t, 2) + repl3 := s3.LookupReplica(roachpb.RKey(span.Key)) + t.Logf("repl3 desc: %s", repl3.Desc()) + txnPusher := kvserver.NewRangefeedTxnPusher(nil, repl3, rspan) + require.NoError(t, txnPusher.Barrier(ctx)) + t.Logf("n3 txnPusher barrier succeeded") +}