From e51323be78cad0ec4fc4aed954f7516963728114 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 20 Jan 2022 19:46:17 -0800 Subject: [PATCH 1/2] sql,kvstreamer: use Streamer with multiple column families This commit takes advantage of the recently introduced `WholeRowsOfSize` argument of the `BatchRequest`s (which allows for SQL rows not being split across multiple responses when multiple column families are present) to support multi-column family case in the `Streamer`. Release note: None --- pkg/kv/kvclient/kvstreamer/BUILD.bazel | 1 + pkg/kv/kvclient/kvstreamer/streamer.go | 21 ++--- pkg/kv/kvclient/kvstreamer/streamer_test.go | 96 +++++++++++++++++++-- pkg/sql/colfetcher/cfetcher.go | 3 +- pkg/sql/colfetcher/index_join.go | 23 ++--- pkg/sql/row/kv_batch_streamer.go | 2 +- pkg/sql/rowexec/joinreader.go | 35 ++++---- 7 files changed, 128 insertions(+), 53 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/BUILD.bazel b/pkg/kv/kvclient/kvstreamer/BUILD.bazel index 508f70955d90..ae5ad00d565a 100644 --- a/pkg/kv/kvclient/kvstreamer/BUILD.bazel +++ b/pkg/kv/kvclient/kvstreamer/BUILD.bazel @@ -52,6 +52,7 @@ go_test( "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "@com_github_cockroachdb_errors//:errors", "@com_github_dustin_go_humanize//:go-humanize", "@com_github_stretchr_testify//require", ], diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 2d411e1a72b3..63a13e6dd8df 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -142,12 +142,7 @@ func (r Result) Release(ctx context.Context) { } } -// Streamer provides a streaming oriented API for reading from the KV layer. At -// the moment the Streamer only works when SQL rows are comprised of a single KV -// (i.e. a single column family). -// TODO(yuzefovich): lift the restriction on a single column family once KV is -// updated so that rows are never split across different BatchResponses when -// TargetBytes limitBytes is exceeded. +// Streamer provides a streaming oriented API for reading from the KV layer. // // The example usage is roughly as follows: // @@ -207,9 +202,10 @@ type Streamer struct { distSender *kvcoord.DistSender stopper *stop.Stopper - mode OperationMode - hints Hints - budget *budget + mode OperationMode + hints Hints + maxKeysPerRow int32 + budget *budget coordinator workerCoordinator coordinatorStarted bool @@ -353,7 +349,10 @@ func NewStreamer( // Hints can be used to hint the aggressiveness of the caching policy. In // particular, it can be used to disable caching when the client knows that all // looked-up keys are unique (e.g. in the case of an index-join). -func (s *Streamer) Init(mode OperationMode, hints Hints) { +// +// maxKeysPerRow indicates the maximum number of KV pairs that comprise a single +// SQL row (i.e. the number of column families in the index being scanned). +func (s *Streamer) Init(mode OperationMode, hints Hints, maxKeysPerRow int) { if mode != OutOfOrder { panic(errors.AssertionFailedf("only OutOfOrder mode is supported")) } @@ -362,6 +361,7 @@ func (s *Streamer) Init(mode OperationMode, hints Hints) { panic(errors.AssertionFailedf("only unique requests are currently supported")) } s.hints = hints + s.maxKeysPerRow = int32(maxKeysPerRow) s.waitForResults = make(chan struct{}, 1) } @@ -1008,6 +1008,7 @@ func (w *workerCoordinator) performRequestAsync( ba.Header.WaitPolicy = w.lockWaitPolicy ba.Header.TargetBytes = targetBytes ba.Header.AllowEmpty = !headOfLine + ba.Header.WholeRowsOfSize = w.s.maxKeysPerRow // TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever // applicable (#67885). ba.AdmissionHeader = w.requestAdmissionHeader diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 7b5ac7d45caf..bc65005148d0 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "math" + "sync" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" ) @@ -64,21 +66,21 @@ func TestStreamerLimitations(t *testing.T) { t.Run("InOrder mode unsupported", func(t *testing.T) { require.Panics(t, func() { streamer := getStreamer() - streamer.Init(InOrder, Hints{UniqueRequests: true}) + streamer.Init(InOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) }) }) t.Run("non-unique requests unsupported", func(t *testing.T) { require.Panics(t, func() { streamer := getStreamer() - streamer.Init(OutOfOrder, Hints{UniqueRequests: false}) + streamer.Init(OutOfOrder, Hints{UniqueRequests: false}, 1 /* maxKeysPerRow */) }) }) t.Run("invalid enqueueKeys", func(t *testing.T) { streamer := getStreamer() defer streamer.Close() - streamer.Init(OutOfOrder, Hints{UniqueRequests: true}) + streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) // Use a single request but two keys which is invalid. reqs := []roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Get{}}} enqueueKeys := []int{0, 1} @@ -88,7 +90,7 @@ func TestStreamerLimitations(t *testing.T) { t.Run("pipelining unsupported", func(t *testing.T) { streamer := getStreamer() defer streamer.Close() - streamer.Init(OutOfOrder, Hints{UniqueRequests: true}) + streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */) reqs := []roachpb.RequestUnion{{ Value: &roachpb.RequestUnion_Get{ @@ -232,7 +234,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) { getStreamer := func(limitBytes int64) *Streamer { acc.Clear(ctx) s := getStreamer(ctx, s, limitBytes, &acc) - s.Init(OutOfOrder, Hints{UniqueRequests: true}) + s.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) return s } @@ -336,3 +338,87 @@ func TestStreamerCorrectlyDiscardsResponses(t *testing.T) { }) } } + +// TestStreamerColumnFamilies verifies that the Streamer works correctly with +// large rows and multiple column families. The goal is to make sure that KVs +// from different rows are not intertwined. +func TestStreamerWideRows(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Start a cluster with large --max-sql-memory parameter so that the + // Streamer isn't hitting the root budget exceeded error. + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + SQLMemoryPoolSize: 1 << 30, /* 1GiB */ + }) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + const blobSize = 10 * initialAvgResponseSize + const numRows = 2 + + _, err := db.Exec("CREATE TABLE t (pk INT PRIMARY KEY, k INT, blob1 STRING, blob2 STRING, INDEX (k), FAMILY (pk, k, blob1), FAMILY (blob2))") + require.NoError(t, err) + for i := 0; i < numRows; i++ { + if i > 0 { + // Split each row into a separate range. + _, err = db.Exec(fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES(%d)", i)) + require.NoError(t, err) + } + _, err = db.Exec(fmt.Sprintf("INSERT INTO t SELECT %d, 1, repeat('a', %d), repeat('b', %d)", i, blobSize, blobSize)) + require.NoError(t, err) + } + + // Populate the range cache. + _, err = db.Exec("SELECT count(*) from t") + require.NoError(t, err) + + // Perform an index join to read large blobs. + query := "SELECT count(*), sum(length(blob1)), sum(length(blob2)) FROM t@t_k_idx WHERE k = 1" + const concurrency = 3 + // Different values for the distsql_workmem setting allow us to exercise the + // behavior in some degenerate cases (e.g. a small value results in a single + // KV exceeding the limit). + for _, workmem := range []int{ + 3 * blobSize / 2, + 3 * blobSize, + 4 * blobSize, + } { + t.Run(fmt.Sprintf("workmem=%s", humanize.Bytes(uint64(workmem))), func(t *testing.T) { + _, err = db.Exec(fmt.Sprintf("SET distsql_workmem = '%dB'", workmem)) + require.NoError(t, err) + var wg sync.WaitGroup + wg.Add(concurrency) + errCh := make(chan error, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + row := db.QueryRow(query) + var count, sum1, sum2 int + if err := row.Scan(&count, &sum1, &sum2); err != nil { + errCh <- err + return + } + if count != numRows { + errCh <- errors.Newf("expected %d rows, read %d", numRows, count) + return + } + if sum1 != numRows*blobSize { + errCh <- errors.Newf("expected total length %d of blob1, read %d", numRows*blobSize, sum1) + return + } + if sum2 != numRows*blobSize { + errCh <- errors.Newf("expected total length %d of blob2, read %d", numRows*blobSize, sum2) + return + } + }() + } + wg.Wait() + close(errCh) + err, ok := <-errCh + if ok { + t.Fatal(err) + } + }) + } +} diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index a6b02c60cf93..f6d92810c3ab 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -242,7 +242,8 @@ type cFetcher struct { // maxKeysPerRow memoizes the maximum number of keys per row in the index // we're fetching from. This is used to calculate the kvBatchFetcher's - // firstBatchLimit. + // firstBatchLimit as well as by the ColIndexJoin when it is using the + // Streamer API. maxKeysPerRow int // True if the index key must be decoded. This is only false if there are no diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 830cc3d3cf74..fab7a415d915 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -144,6 +144,7 @@ func (s *ColIndexJoin) Init(ctx context.Context) { s.streamerInfo.Streamer.Init( kvstreamer.OutOfOrder, kvstreamer.Hints{UniqueRequests: true}, + s.cf.maxKeysPerRow, ) } } @@ -455,23 +456,13 @@ func NewColIndexJoin( useStreamer := row.CanUseStreamer(ctx, flowCtx.EvalCtx.Settings) && !spec.MaintainOrdering if useStreamer { - // TODO(yuzefovich): remove this conditional once multiple column - // families are supported. - if maxKeysPerRow, err := tableArgs.desc.KeysPerRow(tableArgs.index.GetID()); err != nil { - return nil, err - } else if maxKeysPerRow > 1 { - // Currently, the streamer only supports cases with a single column - // family. - useStreamer = false - } else { - if streamerBudgetAcc == nil { - return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") - } - // Keep the quarter of the memory limit for the output batch of the - // cFetcher, and we'll give the remaining three quarters to the - // streamer budget below. - memoryLimit = int64(math.Ceil(float64(memoryLimit) / 4.0)) + if streamerBudgetAcc == nil { + return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") } + // Keep the quarter of the memory limit for the output batch of the + // cFetcher, and we'll give the remaining three quarters to the streamer + // budget below. + memoryLimit = int64(math.Ceil(float64(memoryLimit) / 4.0)) } fetcher := cFetcherPool.Get().(*cFetcher) diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 550db6a32803..2390d50a66e1 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -26,7 +26,7 @@ import ( // CanUseStreamer returns whether the kvstreamer.Streamer API should be used. func CanUseStreamer(ctx context.Context, settings *cluster.Settings) bool { // TODO(yuzefovich): remove the version gate in 22.2 cycle. - return settings.Version.IsActive(ctx, clusterversion.TargetBytesAvoidExcess) && + return settings.Version.IsActive(ctx, clusterversion.ScanWholeRows) && useStreamerEnabled.Get(&settings.SV) } diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 80c79b410aa1..90a07e29431a 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -130,6 +130,7 @@ type joinReader struct { unlimitedMemMonitor *mon.BytesMonitor budgetAcc mon.BoundAccount budgetLimit int64 + maxKeysPerRow int } input execinfra.RowSource @@ -462,29 +463,22 @@ func newJoinReader( jr.batchSizeBytes = jr.strategy.getLookupRowsBatchSizeHint(flowCtx.EvalCtx.SessionData()) if jr.usesStreamer { - maxKeysPerRow, err := jr.desc.KeysPerRow(jr.index.GetID()) + // jr.batchSizeBytes will be used up by the input batch, and we'll give + // everything else to the streamer budget. Note that budgetLimit will + // always be positive given that memoryLimit is at least 8MiB and + // batchSizeBytes is at most 4MiB. + jr.streamerInfo.budgetLimit = memoryLimit - jr.batchSizeBytes + // We need to use an unlimited monitor for the streamer's budget since + // the streamer itself is responsible for staying under the limit. + jr.streamerInfo.unlimitedMemMonitor = mon.NewMonitorInheritWithLimit( + "joinreader-streamer-unlimited" /* name */, math.MaxInt64, flowCtx.EvalCtx.Mon, + ) + jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) + jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() + jr.streamerInfo.maxKeysPerRow, err = jr.desc.KeysPerRow(jr.index.GetID()) if err != nil { return nil, err } - if maxKeysPerRow > 1 { - // Currently, the streamer only supports cases with a single column - // family. - jr.usesStreamer = false - } else { - // jr.batchSizeBytes will be used up by the input batch, and we'll - // give everything else to the streamer budget. Note that - // budgetLimit will always be positive given that memoryLimit is at - // least 8MiB and batchSizeBytes is at most 4MiB. - jr.streamerInfo.budgetLimit = memoryLimit - jr.batchSizeBytes - // We need to use an unlimited monitor for the streamer's budget - // since the streamer itself is responsible for staying under the - // limit. - jr.streamerInfo.unlimitedMemMonitor = mon.NewMonitorInheritWithLimit( - "joinreader-streamer-unlimited" /* name */, math.MaxInt64, flowCtx.EvalCtx.Mon, - ) - jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) - jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() - } } // TODO(radu): verify the input types match the index key types @@ -1039,6 +1033,7 @@ func (jr *joinReader) Start(ctx context.Context) { jr.streamerInfo.Streamer.Init( kvstreamer.OutOfOrder, kvstreamer.Hints{UniqueRequests: true}, + jr.streamerInfo.maxKeysPerRow, ) } jr.runningState = jrReadingInput From 92f0260764a8d31d28f4ecae288c19c8468c2e0e Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Mon, 17 Jan 2022 04:34:36 -0500 Subject: [PATCH 2/2] kvclient: ignore stale lease information from lagging replicas This commit makes it such that the `DistSender`'s range descriptor cache doesn't trigger a cache eviction based on incompatible lease information in a `NotLeaseHolderError` when it is coming from a replica that has a stale view of the range's descriptor (characterized by an older `DescriptorGeneration` on the replica) Not doing so before was hazardous because, if we received an NLHE that pointed to a replica that did not belong in the cached descriptor, we'd trigger a cache evicion. This assumed that the replica returning the error had a fresher view of the range than what we had in the cache, which is not always true. This meant that we'd keep doing range lookups and subsequent evictions until this lagging replica caught up to the current state of the range. Release note (bug fix): A bug that caused high SQL tail latencies during background rebalancing in the cluster has been fixed. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 4 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 106 ++++++++++++++++++ pkg/kv/kvclient/rangecache/range_cache.go | 42 +++++-- .../kvclient/rangecache/range_cache_test.go | 50 +++++---- pkg/kv/kvserver/replica_range_lease.go | 5 +- pkg/roachpb/errors.proto | 6 + 6 files changed, 176 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index c51e77fbe2c0..22205f7e205d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2102,10 +2102,10 @@ func (ds *DistSender) sendToReplicas( var updatedLeaseholder bool if tErr.Lease != nil { - updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease) + updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease, tErr.DescriptorGeneration) } else if tErr.LeaseHolder != nil { // tErr.LeaseHolder might be set when tErr.Lease isn't. - routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder) + routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder, tErr.DescriptorGeneration) updatedLeaseholder = true } // Move the new leaseholder to the head of the queue for the next diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 6d4f4e0170ad..58b7fa1edcb0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -971,6 +971,112 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { require.LessOrEqual(t, callsToNode2, 11) } +// TestDistSenderIgnodesNLHEBasedOnOldRangeGeneration tests that a +// NotLeaseHolderError received from a replica that has a stale range descriptor +// version is ignored, and the next replica is attempted. +func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tracer := tracing.NewTracer() + ctx, finishAndGetRecording := tracing.ContextWithRecordingSpan( + context.Background(), tracer, "test", + ) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(n.NodeID), + newNodeDesc(n.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + oldGeneration := roachpb.RangeGeneration(1) + newGeneration := roachpb.RangeGeneration(2) + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: newGeneration, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + } + // Ambiguous lease refers to a replica that is incompatible with the cached + // range descriptor. + ambiguousLease := roachpb.Lease{ + Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4}, + } + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[1], + } + + // The cache starts with a lease on node 2, so the first request will be + // routed there. That replica will reply with an NLHE with an old descriptor + // generation value, which should make the DistSender try the next replica. + var calls []roachpb.NodeID + sendFn := func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + calls = append(calls, ba.Replica.NodeID) + if ba.Replica.NodeID == 2 { + reply := &roachpb.BatchResponse{} + err := &roachpb.NotLeaseHolderError{Lease: &ambiguousLease, DescriptorGeneration: oldGeneration} + reply.Error = roachpb.NewError(err) + return reply, nil + } + require.Equal(t, ba.Replica.NodeID, roachpb.NodeID(1)) + return ba.CreateReply(), nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracer}, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(sendFn), + }, + RangeDescriptorDB: threeReplicaMockRangeDescriptorDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + } + ds := NewDistSender(cfg) + + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + get := roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */) + _, pErr := kv.SendWrapped(ctx, ds, get) + require.Nil(t, pErr) + + require.Equal(t, int64(0), ds.Metrics().RangeLookups.Count()) + // We expect to backoff and retry the same replica 11 times when we get an + // NLHE with stale info. See `sameReplicaRetryLimit`. + require.Equal(t, int64(11), ds.Metrics().NextReplicaErrCount.Count()) + require.Equal(t, int64(11), ds.Metrics().NotLeaseHolderErrCount.Count()) + + // Ensure that we called Node 2 11 times and then finally called Node 1. + var expectedCalls []roachpb.NodeID + for i := 0; i < 11; i++ { + expectedCalls = append(expectedCalls, roachpb.NodeID(2)) + } + expectedCalls = append(expectedCalls, roachpb.NodeID(1)) + require.Equal(t, expectedCalls, calls) + + require.Regexp( + t, + "backing off due to .* stale info", + finishAndGetRecording().String(), + ) +} + func TestDistSenderRetryOnTransportErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 29e068bcda1f..086c585f51f2 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -379,7 +379,9 @@ func (et *EvictionToken) syncRLocked( // It's legal to pass in a lease with a zero Sequence; it will be treated as a // speculative lease and considered newer than any existing lease (and then in // turn will be overridden by any subsequent update). -func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool { +func (et *EvictionToken) UpdateLease( + ctx context.Context, l *roachpb.Lease, descGeneration roachpb.RangeGeneration, +) bool { rdc := et.rdc rdc.rangeCache.Lock() defer rdc.rangeCache.Unlock() @@ -388,7 +390,7 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool if !stillValid { return false } - ok, newEntry := cachedEntry.updateLease(l) + ok, newEntry := cachedEntry.updateLease(l, descGeneration) if !ok { return false } @@ -407,11 +409,13 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool // a full lease. This is called when a likely leaseholder is known, but not a // full lease. The lease we'll insert into the cache will be considered // "speculative". -func (et *EvictionToken) UpdateLeaseholder(ctx context.Context, lh roachpb.ReplicaDescriptor) { +func (et *EvictionToken) UpdateLeaseholder( + ctx context.Context, lh roachpb.ReplicaDescriptor, descGeneration roachpb.RangeGeneration, +) { // Notice that we don't initialize Lease.Sequence, which will make // entry.LeaseSpeculative() return true. l := &roachpb.Lease{Replica: lh} - et.UpdateLease(ctx, l) + et.UpdateLease(ctx, l, descGeneration) } // EvictLease evicts information about the current lease from the cache, if the @@ -1303,11 +1307,15 @@ func compareEntryLeases(a, b *CacheEntry) int { // This means that the passed-in lease is older than the lease already in the // entry. // -// If the new leaseholder is not a replica in the descriptor, we assume the -// lease information to be more recent than the entry's descriptor, and we -// return true, nil. The caller should evict the receiver from the cache, but -// it'll have to do extra work to figure out what to insert instead. -func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *CacheEntry) { +// If the new leaseholder is not a replica in the descriptor, and the error is +// coming from a replica with range descriptor generation at least as high as +// the cache's, we deduce the lease information to be more recent than the +// entry's descriptor, and we return true, nil. The caller should evict the +// receiver from the cache, but it'll have to do extra work to figure out what +// to insert instead. +func (e *CacheEntry) updateLease( + l *roachpb.Lease, descGeneration roachpb.RangeGeneration, +) (updated bool, newEntry *CacheEntry) { // If l is older than what the entry has (or the same), return early. // // This method handles speculative leases: a new lease with a sequence of 0 is @@ -1326,11 +1334,21 @@ func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *Cach return false, e } - // Check whether the lease we were given is compatible with the replicas in - // the descriptor. If it's not, the descriptor must be really stale, and the - // RangeCacheEntry needs to be evicted. + // If the lease is incompatible with the cached descriptor and the error is + // coming from a replica that has a non-stale descriptor, the cached + // descriptor must be stale and the RangeCacheEntry needs to be evicted. _, ok := e.desc.GetReplicaDescriptorByID(l.Replica.ReplicaID) if !ok { + // If the error is coming from a replica that has a stale range descriptor, + // we cannot trigger a cache eviction since this means we've rebalanced the + // old leaseholder away. If we were to evict here, we'd keep evicting until + // this replica applied the new lease. Not updating the cache here means + // that we'll end up trying all voting replicas until we either hit the new + // leaseholder or hit a replica that has accurate knowledge of the + // leaseholder. + if descGeneration != 0 && descGeneration < e.desc.Generation { + return false, nil + } return true, nil } diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index a88c7c27404a..0fe5fc1bd9cb 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -1570,13 +1570,16 @@ func TestRangeCacheUpdateLease(t *testing.T) { StoreID: 4, ReplicaID: 4, } + + staleRangeGeneration := roachpb.RangeGeneration(2) + nonStaleRangeGeneration := roachpb.RangeGeneration(3) desc1 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, EndKey: roachpb.RKeyMax, InternalReplicas: []roachpb.ReplicaDescriptor{ rep1, rep2, }, - Generation: 0, + Generation: nonStaleRangeGeneration, } desc2 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, @@ -1584,7 +1587,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { InternalReplicas: []roachpb.ReplicaDescriptor{ rep2, rep3, }, - Generation: 1, + Generation: nonStaleRangeGeneration + 1, } desc3 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, @@ -1592,7 +1595,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { InternalReplicas: []roachpb.ReplicaDescriptor{ rep1, rep2, }, - Generation: 2, + Generation: nonStaleRangeGeneration + 2, } startKey := desc1.StartKey @@ -1621,7 +1624,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { Sequence: 1, } oldTok := tok - ok := tok.UpdateLease(ctx, l) + ok := tok.UpdateLease(ctx, l, 0 /* descGeneration */) require.True(t, ok) require.Equal(t, oldTok.Desc(), tok.Desc()) require.Equal(t, &l.Replica, tok.Leaseholder()) @@ -1643,13 +1646,21 @@ func TestRangeCacheUpdateLease(t *testing.T) { require.True(t, ri.lease.Empty()) require.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ri.ClosedTimestampPolicy()) - // Check that trying to update the lease to a non-member replica results - // in the entry's eviction and the token's invalidation. + // Check that trying to update the lease to a non-member replica results in + // the entry's eviction and the token's invalidation if the descriptor + // generation in the error is not older than the cached descriptor generation. l = &roachpb.Lease{ Replica: repNonMember, Sequence: 2, } - ok = tok.UpdateLease(ctx, l) + // Check that there's no eviction if the range desc generation in the error is + // stale. + ok = tok.UpdateLease(ctx, l, staleRangeGeneration) + require.False(t, ok) + require.True(t, tok.Valid()) + + // However, expect an eviction when the error's desc generation is non-stale. + ok = tok.UpdateLease(ctx, l, nonStaleRangeGeneration) require.False(t, ok) require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) @@ -1672,10 +1683,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { Desc: desc2, Lease: roachpb.Lease{}, }) - ok = tok.UpdateLease(ctx, - // Specify a lease compatible with desc2. - &roachpb.Lease{Replica: rep2, Sequence: 3}, - ) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep2, Sequence: 3}, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, tok) require.Equal(t, &desc2, tok.Desc()) @@ -1690,7 +1698,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { }) // This time try to specify a lease that's not compatible with the desc. The // entry should end up evicted from the cache. - ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}, 0 /* descGeneration */) require.False(t, ok) require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) @@ -1734,7 +1742,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 1, } - ok, e := e.updateLease(l) + ok, e := e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1743,7 +1751,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1755,7 +1763,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1765,7 +1773,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1775,7 +1783,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 2, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Equal(*e.Lease())) @@ -1785,7 +1793,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 1, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.False(t, ok) require.False(t, l.Equal(*e.Lease())) @@ -1794,7 +1802,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 2, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1804,7 +1812,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Sequence: 2, } require.True(t, l.Equal(e.Lease())) - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.False(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1814,7 +1822,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: repNonMember, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.Nil(t, e) } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 7eadd4a8084c..ca9172ea364e 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -913,8 +913,9 @@ func newNotLeaseHolderError( l roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, msg string, ) *roachpb.NotLeaseHolderError { err := &roachpb.NotLeaseHolderError{ - RangeID: rangeDesc.RangeID, - CustomMsg: msg, + RangeID: rangeDesc.RangeID, + DescriptorGeneration: rangeDesc.Generation, + CustomMsg: msg, } if proposerStoreID != 0 { err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID) diff --git a/pkg/roachpb/errors.proto b/pkg/roachpb/errors.proto index e77b56e58290..1aff2829da0d 100644 --- a/pkg/roachpb/errors.proto +++ b/pkg/roachpb/errors.proto @@ -56,6 +56,12 @@ message NotLeaseHolderError { optional roachpb.Lease lease = 4; optional int64 range_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // The range descriptor generation of the replica the error originated from. + // Used by the DistSender's RangeCache to determine whether the error was + // returned because the replica had a stale understanding of who the + // leaseholder is. + optional int64 descriptor_generation = 6 [(gogoproto.nullable) = false, + (gogoproto.customname) = "DescriptorGeneration", (gogoproto.casttype) = "RangeGeneration"]; // If set, the Error() method will return this instead of composing its // regular spiel. Useful because we reuse this error when rejecting a command // because the lease under which its application was attempted is different