From 2944a0ddca1677470a3b1ffd4d7c88e8c9c52f67 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 12 Jan 2021 02:27:29 -0500 Subject: [PATCH] kvclient: allow DistSender to consider non-voters when routing requests Before this patch, the DistSender would only route BatchRequests to replicas of types `VOTER_FULL` and `VOTER_INCOMING`. This patch changes that to let the DistSender route requests (for instance, follower read requests) to `NON_VOTER` replicas as well. Makes progress on #51943 Release note: None --- .../kvccl/kvfollowerreadsccl/followerreads.go | 2 + pkg/kv/kvclient/kvcoord/dist_sender.go | 10 +- .../kvclient/kvcoord/dist_sender_rangefeed.go | 3 +- pkg/kv/kvclient/kvcoord/replica_slice.go | 63 +++++-- pkg/kv/kvclient/kvcoord/replica_slice_test.go | 21 ++- pkg/kv/kvserver/closed_timestamp_test.go | 156 ++++++++++-------- pkg/kv/kvserver/replica_command.go | 6 +- pkg/kv/kvserver/replica_follower_read.go | 4 +- pkg/roachpb/api.go | 2 + pkg/roachpb/metadata_replicas.go | 24 ++- pkg/sql/physicalplan/replicaoracle/oracle.go | 15 +- 11 files changed, 197 insertions(+), 109 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go index 0695edaf01f9..bae99feef6f3 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go @@ -103,6 +103,8 @@ func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timest // canSendToFollower implements the logic for checking whether a batch request // may be sent to a follower. +// TODO(aayush): We should try to bind clusterID to the function here, rather +// than having callers plumb it in every time. func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool { return batchCanBeEvaluatedOnFollower(ba) && txnCanPerformFollowerRead(ba.Txn) && diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 8635f4fda8f8..092c711edccd 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1771,7 +1771,14 @@ func (ds *DistSender) sendToReplicas( desc := routing.Desc() ba.RangeID = desc.RangeID leaseholder := routing.Leaseholder() - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder) + canFollowerRead := (ds.clusterID != nil) && CanSendToFollower(ds.clusterID.Get(), ds.st, ba) + var replicas ReplicaSlice + var err error + if canFollowerRead { + replicas, err = NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, AllExtantReplicas) + } else { + replicas, err = NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, OnlyPotentialLeaseholders) + } if err != nil { return nil, err } @@ -1784,7 +1791,6 @@ func (ds *DistSender) sendToReplicas( // Try the leaseholder first, if the request wants it. { - canFollowerRead := (ds.clusterID != nil) && CanSendToFollower(ds.clusterID.Get(), ds.st, ba) sendToLeaseholder := (leaseholder != nil) && !canFollowerRead && ba.RequiresLeaseHolder() if sendToLeaseholder { idx := replicas.Find(leaseholder.ReplicaID) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 768e694cc610..db8acb309da8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -229,7 +229,8 @@ func (ds *DistSender) singleRangeFeed( if ds.rpcContext != nil { latencyFn = ds.rpcContext.RemoteClocks.Latency } - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil /* leaseholder */) + // TODO(aayush): We should enable creating RangeFeeds on non-voting replicas. + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, OnlyPotentialLeaseholders) if err != nil { return args.Timestamp, err } diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index 9617ecb63296..de3d10e89c94 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -39,18 +39,32 @@ func (i ReplicaInfo) addr() string { // A ReplicaSlice is a slice of ReplicaInfo. type ReplicaSlice []ReplicaInfo +// ReplicaSliceFilter controls which kinds of replicas are to be included in +// the slice for routing BatchRequests to. +type ReplicaSliceFilter int + +const ( + // OnlyPotentialLeaseholders prescribes that the ReplicaSlice should include + // only replicas that are allowed to be leaseholders (i.e. replicas of type + // VOTER_FULL). + OnlyPotentialLeaseholders ReplicaSliceFilter = iota + // AllExtantReplicas prescribes that the ReplicaSlice should include all + // replicas that are not LEARNERs, VOTER_OUTGOING, or + // VOTER_DEMOTING_{LEARNER/NON_VOTER}. + AllExtantReplicas +) + // NewReplicaSlice creates a ReplicaSlice from the replicas listed in the range // descriptor and using gossip to lookup node descriptors. Replicas on nodes // that are not gossiped are omitted from the result. // -// Generally, only voting replicas are returned. However, if a non-nil -// leaseholder is passed in, it will be included in the result even if the -// descriptor has it as a learner (we assert that the leaseholder is part of the -// descriptor). The idea is that the descriptor might be stale and list the -// leaseholder as a learner erroneously, and lease info is a strong signal in -// that direction. Note that the returned ReplicaSlice might still not include -// the leaseholder if info for the respective node is missing from the -// NodeDescStore. +// Generally, learners are not returned. However, if a non-nil leaseholder is +// passed in, it will be included in the result even if the descriptor has it as +// a learner (we assert that the leaseholder is part of the descriptor). The +// idea is that the descriptor might be stale and list the leaseholder as a +// learner erroneously, and lease info is a strong signal in that direction. +// Note that the returned ReplicaSlice might still not include the leaseholder +// if info for the respective node is missing from the NodeDescStore. // // If there's no info in gossip for any of the nodes in the descriptor, a // sendError is returned. @@ -59,21 +73,36 @@ func NewReplicaSlice( nodeDescs NodeDescStore, desc *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor, + filter ReplicaSliceFilter, ) (ReplicaSlice, error) { if leaseholder != nil { if _, ok := desc.GetReplicaDescriptorByID(leaseholder.ReplicaID); !ok { log.Fatalf(ctx, "leaseholder not in descriptor; leaseholder: %s, desc: %s", leaseholder, desc) } } + canReceiveLease := func(rDesc roachpb.ReplicaDescriptor) bool { + if err := roachpb.CheckCanReceiveLease(rDesc, desc); err != nil { + return false + } + return true + } - // Learner replicas won't serve reads/writes, so we'll send only to the - // `VoterDescriptors` replicas. This is just an optimization to save a network hop, - // everything would still work if we had `All` here. - voters := desc.Replicas().VoterDescriptors() + // Learner replicas won't serve reads/writes, so we'll send only to the voters + // and non-voting replicas. This is just an optimization to save a network + // hop, everything would still work if we had `All` here. + var replicas []roachpb.ReplicaDescriptor + switch filter { + case OnlyPotentialLeaseholders: + replicas = desc.Replicas().Filter(canReceiveLease).Descriptors() + case AllExtantReplicas: + replicas = desc.Replicas().VoterFullAndNonVoterDescriptors() + default: + log.Fatalf(ctx, "unknown ReplicaSliceFilter %s", filter) + } // If we know a leaseholder, though, let's make sure we include it. - if leaseholder != nil && len(voters) < len(desc.Replicas().Descriptors()) { + if leaseholder != nil && len(replicas) < len(desc.Replicas().Descriptors()) { found := false - for _, v := range voters { + for _, v := range replicas { if v == *leaseholder { found = true break @@ -81,11 +110,11 @@ func NewReplicaSlice( } if !found { log.Eventf(ctx, "the descriptor has the leaseholder as a learner; including it anyway") - voters = append(voters, *leaseholder) + replicas = append(replicas, *leaseholder) } } - rs := make(ReplicaSlice, 0, len(voters)) - for _, r := range voters { + rs := make(ReplicaSlice, 0, len(replicas)) + for _, r := range replicas { nd, err := nodeDescs.GetNodeDescriptor(r.NodeID) if err != nil { if log.V(1) { diff --git a/pkg/kv/kvclient/kvcoord/replica_slice_test.go b/pkg/kv/kvclient/kvcoord/replica_slice_test.go index 8c59e9eaea6f..bbebbb89f7f8 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice_test.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice_test.go @@ -68,21 +68,34 @@ func TestNewReplicaSlice(t *testing.T) { }, }, } - rs, err := NewReplicaSlice(ctx, ns, rd, nil /* leaseholder */) + rs, err := NewReplicaSlice(ctx, ns, rd, nil, OnlyPotentialLeaseholders) require.NoError(t, err) require.Equal(t, 3, rs.Len()) // Check that learners are not included. typLearner := roachpb.LEARNER rd.InternalReplicas[2].Type = &typLearner - rs, err = NewReplicaSlice(ctx, ns, rd, nil /* leaseholder */) + rs, err = NewReplicaSlice(ctx, ns, rd, nil, OnlyPotentialLeaseholders) + require.NoError(t, err) + require.Equal(t, 2, rs.Len()) + rs, err = NewReplicaSlice(ctx, ns, rd, nil, AllExtantReplicas) + require.NoError(t, err) + require.Equal(t, 2, rs.Len()) + + // Check that non-voters are included iff we ask for them to be. + typNonVoter := roachpb.NON_VOTER + rd.InternalReplicas[2].Type = &typNonVoter + rs, err = NewReplicaSlice(ctx, ns, rd, nil, AllExtantReplicas) + require.NoError(t, err) + require.Equal(t, 3, rs.Len()) + rs, err = NewReplicaSlice(ctx, ns, rd, nil, OnlyPotentialLeaseholders) require.NoError(t, err) require.Equal(t, 2, rs.Len()) - // Check that, if the leasehoder points to a learner, that learner is + // Check that, if the leaseholder points to a learner, that learner is // included. leaseholder := &roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3} - rs, err = NewReplicaSlice(ctx, ns, rd, leaseholder) + rs, err = NewReplicaSlice(ctx, ns, rd, leaseholder, OnlyPotentialLeaseholders) require.NoError(t, err) require.Equal(t, 3, rs.Len()) } diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 71db58dbbb07..15145b8e1414 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -65,53 +65,69 @@ func TestClosedTimestampCanServe(t *testing.T) { // drives up the test duration. skip.UnderRace(t) - ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") - defer tc.Stopper().Stop(ctx) - repls := replsForRange(ctx, t, tc, desc, numNodes) - - if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { - t.Fatal(err) - } - - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) - testutils.SucceedsSoon(t, func() error { - return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) - }) + testutils.RunTrueAndFalse(t, "withNonVoters", func(t *testing.T, withNonVoters bool) { + ctx := context.Background() + dbName, tableName := "cttest", "kv" + clusterArgs := aggressiveResolvedTimestampClusterArgs + // Disable the replicateQueue so that it doesn't interfere with replica + // membership ranges. + clusterArgs.ReplicationMode = base.ReplicationManual + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, + testingCloseFraction, clusterArgs, dbName, tableName) + defer tc.Stopper().Stop(ctx) - // We just served a follower read. As a sanity check, make sure that we can't write at - // that same timestamp. - { - var baWrite roachpb.BatchRequest - r := &roachpb.DeleteRequest{} - r.Key = desc.StartKey.AsRawKey() - txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, ts, 100) - baWrite.Txn = &txn - baWrite.Add(r) - baWrite.RangeID = repls[0].RangeID - if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil { + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) } - var found bool - for _, repl := range repls { - resp, pErr := repl.Send(ctx, baWrite) - if errors.HasType(pErr.GoError(), (*roachpb.NotLeaseHolderError)(nil)) { - continue - } else if pErr != nil { - t.Fatal(pErr) + if withNonVoters { + desc = tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1), + tc.Target(2)) + } else { + desc = tc.AddVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1), + tc.Target(2)) + } + + repls := replsForRange(ctx, t, tc, desc, numNodes) + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + baRead := makeReadBatchRequestForDesc(desc, ts) + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) + }) + + // We just served a follower read. As a sanity check, make sure that we can't write at + // that same timestamp. + { + var baWrite roachpb.BatchRequest + r := &roachpb.DeleteRequest{} + r.Key = desc.StartKey.AsRawKey() + txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, ts, 100) + baWrite.Txn = &txn + baWrite.Add(r) + baWrite.RangeID = repls[0].RangeID + if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil { + t.Fatal(err) } - found = true - if resp.Txn.WriteTimestamp.LessEq(ts) || resp.Txn.ReadTimestamp == resp.Txn.WriteTimestamp { - t.Fatal("timestamp did not get bumped") + + var found bool + for _, repl := range repls { + resp, pErr := repl.Send(ctx, baWrite) + if errors.HasType(pErr.GoError(), (*roachpb.NotLeaseHolderError)(nil)) { + continue + } else if pErr != nil { + t.Fatal(pErr) + } + found = true + if resp.Txn.WriteTimestamp.LessEq(ts) || resp.Txn.ReadTimestamp == resp.Txn.WriteTimestamp { + t.Fatal("timestamp did not get bumped") + } + break + } + if !found { + t.Fatal("unable to send to any replica") } - break - } - if !found { - t.Fatal("unable to send to any replica") } - } + }) } // TestClosedTimestampCanServerThroughoutLeaseTransfer verifies that lease @@ -892,6 +908,21 @@ func setupClusterForClosedTSTestingWithSplitRanges( return tc, leftDesc, rightDesc } +func getEncodedKeyForTable( + t *testing.T, db *gosql.DB, dbName, tableName string, val tree.Datum, +) roachpb.Key { + tableID, err := getTableID(db, dbName, tableName) + if err != nil { + t.Fatalf("failed to lookup ids: %+v", err) + } + idxPrefix := keys.SystemSQLCodec.IndexPrefix(uint32(tableID), 1) + k, err := rowenc.EncodeTableKey(idxPrefix, val, encoding.Ascending) + if err != nil { + t.Fatalf("failed to encode split key: %+v", err) + } + return k +} + // splitDummyRangeInTestCluster is supposed to be used in conjunction with the // dummy table created in setupTestClusterWithDummyRange. It adds two rows to // the given table and performs splits on the table's range such that the 2 @@ -912,26 +943,14 @@ func splitDummyRangeInTestCluster( t.Fatal(err) } // Manually split the table to have easier access to descriptors. - tableID, err := getTableID(db0, dbName, tableName) - if err != nil { - t.Fatalf("failed to lookup ids: %+v", err) - } - - idxPrefix := keys.SystemSQLCodec.IndexPrefix(uint32(tableID), 1) - k, err := rowenc.EncodeTableKey(idxPrefix, tree.NewDInt(1), encoding.Ascending) - if err != nil { - t.Fatalf("failed to encode split key: %+v", err) - } + k := getEncodedKeyForTable(t, db0, dbName, tableName, tree.NewDInt(1)) tcImpl := tc.(*testcluster.TestCluster) - // Split at `k` so that the table has exactly two ranges: [1,2) and [2, Max). - // This split will never be merged by the merge queue so the expiration time - // doesn't matter here. + // Split at `1` and `2` so that the table has exactly two ranges: [1,2) and + // [2, Max). This first split will never be merged by the merge queue so the + // expiration time doesn't matter here. tcImpl.SplitRangeOrFatal(t, k) - idxPrefix = keys.SystemSQLCodec.IndexPrefix(uint32(tableID), 1) - k, err = rowenc.EncodeTableKey(idxPrefix, tree.NewDInt(2), encoding.Ascending) - if err != nil { - t.Fatalf("failed to encode split key: %+v", err) - } + + k = getEncodedKeyForTable(t, db0, dbName, tableName, tree.NewDInt(2)) leftDesc, rightDesc, err := tcImpl.SplitRangeWithExpiration(k, splitExpirationTime) require.NoError(t, err) @@ -1166,7 +1185,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true; return nil } -// setupTestClusterWithDummyRange creates a TestCluster with an empty table and +// setupTestClusterWithDummyRange creates a TestCluster with an empty table. It // returns a handle to the range descriptor corresponding to this table. func setupTestClusterWithDummyRange( t *testing.T, clusterArgs base.TestClusterArgs, dbName, tableName string, numNodes int, @@ -1180,36 +1199,37 @@ func setupTestClusterWithDummyRange( -- forever under high load (testrace under high concurrency). SET statement_timeout='30s'; CREATE DATABASE %[1]s; -CREATE TABLE %[1]s.%[2]s (id INT PRIMARY KEY, value STRING); +CREATE TABLE %[1]s.%[2]s (id INT PRIMARY KEY CHECK (id >= 0), value STRING); -- Reset the timeout set above. RESET statement_timeout; `, dbName, tableName)); err != nil { t.Fatal(err) } - var rangeID roachpb.RangeID - var startKey roachpb.Key var numReplicas int + var err error + var desc roachpb.RangeDescriptor // If replicate queue is not disabled, wait until the table's range is fully // replicated. if clusterArgs.ReplicationMode != base.ReplicationManual { testutils.SucceedsSoon(t, func() error { if err := db0.QueryRow( fmt.Sprintf( - `SELECT range_id, start_key, array_length(replicas, 1) FROM crdb_internal.ranges WHERE table_name = '%s' AND database_name = '%s'`, - tableName, dbName), - ).Scan(&rangeID, &startKey, &numReplicas); err != nil { + `SELECT array_length(replicas, 1) FROM crdb_internal.ranges +WHERE table_name = '%s' AND database_name = '%s'`, tableName, dbName), + ).Scan(&numReplicas); err != nil { return err } if numReplicas != numNodes { return errors.New("not fully replicated yet") } + require.Nil(t, err) return nil }) } - - desc, err := tc.LookupRange(startKey) - require.Nil(t, err) + startKey := getEncodedKeyForTable(t, tc.ServerConn(0), dbName, tableName, tree.NewDInt(0)) + _, desc, err = tc.Server(0).SplitRange(startKey) + require.NoError(t, err) return tc, desc } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index a9c3bcdc1758..b81649d716ec 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -628,11 +628,11 @@ func (r *Replica) AdminMerge( // queues should fix things up quickly). lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas() - if len(lReplicas.VoterAndNonVoterDescriptors()) != len(lReplicas.Descriptors()) { + if len(lReplicas.VoterFullAndNonVoterDescriptors()) != len(lReplicas.Descriptors()) { return errors.Errorf("cannot merge ranges when lhs is in a joint state or has learners: %s", lReplicas) } - if len(rReplicas.VoterAndNonVoterDescriptors()) != len(rReplicas.Descriptors()) { + if len(rReplicas.VoterFullAndNonVoterDescriptors()) != len(rReplicas.Descriptors()) { return errors.Errorf("cannot merge ranges when rhs is in a joint state or has learners: %s", rReplicas) } @@ -2472,7 +2472,7 @@ func (s *Store) relocateOne( desc *roachpb.RangeDescriptor, voterTargets, nonVoterTargets []roachpb.ReplicationTarget, ) ([]roachpb.ReplicationChange, *roachpb.ReplicationTarget, error) { - if repls := desc.Replicas(); len(repls.VoterAndNonVoterDescriptors()) != len(repls.Descriptors()) { + if repls := desc.Replicas(); len(repls.VoterFullAndNonVoterDescriptors()) != len(repls.Descriptors()) { // The caller removed all the learners and left the joint config, so there // shouldn't be anything but voters and non_voters. return nil, nil, errors.AssertionFailedf( diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 6444cff93035..1aaca7940a71 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -61,7 +61,9 @@ func (r *Replica) canServeFollowerRead( if err != nil { return roachpb.NewError(err) } - if typ := repDesc.GetType(); typ != roachpb.VOTER_FULL { + + typ := repDesc.GetType() + if typ != roachpb.VOTER_FULL && typ != roachpb.NON_VOTER { log.Eventf(ctx, "%s replicas cannot serve follower reads", typ) return pErr } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 688d2b7f6d2e..ffc735259e59 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -53,6 +53,8 @@ const ( // RequiresReadLease returns whether the ReadConsistencyType requires // that a read-only request be performed on an active valid leaseholder. +// TODO(aayush): Rename the method since we no longer require a replica to be a +// leaseholder to serve a consistent read. func (rc ReadConsistencyType) RequiresReadLease() bool { switch rc { case CONSISTENT: diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 9dc5c51ae68c..8f3720f62883 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -116,6 +116,10 @@ func predNonVoter(rDesc ReplicaDescriptor) bool { } func predVoterOrNonVoter(rDesc ReplicaDescriptor) bool { + return predVoterFullOrIncoming(rDesc) || predNonVoter(rDesc) +} + +func predVoterFullOrNonVoter(rDesc ReplicaDescriptor) bool { return predVoterFull(rDesc) || predNonVoter(rDesc) } @@ -260,12 +264,13 @@ func (d ReplicaSet) NonVoterDescriptors() []ReplicaDescriptor { return d.FilterToDescriptors(predNonVoter) } -// VoterAndNonVoterDescriptors returns the descriptors of VOTER_FULL/NON_VOTER -// replicas in the set. This set will not contain learners or, during an atomic -// replication change, incoming or outgoing voters. Notably, this set must -// encapsulate all replicas of a range for a range merge to proceed. -func (d ReplicaSet) VoterAndNonVoterDescriptors() []ReplicaDescriptor { - return d.FilterToDescriptors(predVoterOrNonVoter) +// VoterFullAndNonVoterDescriptors returns the descriptors of +// VOTER_FULL/NON_VOTER replicas in the set. This set will not contain learners +// or, during an atomic replication change, incoming or outgoing voters. +// Notably, this set must encapsulate all replicas of a range for a range merge +// to proceed. +func (d ReplicaSet) VoterFullAndNonVoterDescriptors() []ReplicaDescriptor { + return d.FilterToDescriptors(predVoterFullOrNonVoter) } // Filter returns a ReplicaSet corresponding to the replicas for which the @@ -482,6 +487,9 @@ func (c ReplicaChangeType) IsRemoval() bool { } } +var errReplicaNotFound = errors.Errorf(`replica not found in RangeDescriptor`) +var errReplicaCannotHoldLease = errors.Errorf("replica cannot hold lease") + // CheckCanReceiveLease checks whether `wouldbeLeaseholder` can receive a lease. // Returns an error if the respective replica is not eligible. // @@ -497,7 +505,7 @@ func (c ReplicaChangeType) IsRemoval() bool { func CheckCanReceiveLease(wouldbeLeaseholder ReplicaDescriptor, rngDesc *RangeDescriptor) error { repDesc, ok := rngDesc.GetReplicaDescriptorByID(wouldbeLeaseholder.ReplicaID) if !ok { - return errors.Errorf(`replica %s not found in %s`, wouldbeLeaseholder, rngDesc) + return errReplicaNotFound } else if t := repDesc.GetType(); t != VOTER_FULL { // NB: there's no harm in transferring the lease to a VOTER_INCOMING, // but we disallow it anyway. On the other hand, transferring to @@ -517,7 +525,7 @@ func CheckCanReceiveLease(wouldbeLeaseholder ReplicaDescriptor, rngDesc *RangeDe // of minProposedTS needs to be "reversible" (tricky) or we make the // lease evaluation succeed, though with a lease that's "invalid" so that // a new lease can be requested right after. - return errors.Errorf(`replica %s of type %s cannot hold lease`, repDesc, t) + return errReplicaCannotHoldLease } return nil } diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index f22513eaebea..8a01bde6ed5b 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -135,7 +135,7 @@ func (o *randomOracle) Oracle(_ *kv.Txn) Oracle { func (o *randomOracle) ChoosePreferredReplica( ctx context.Context, desc *roachpb.RangeDescriptor, _ *roachpb.ReplicaDescriptor, _ QueryState, ) (roachpb.ReplicaDescriptor, error) { - replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc) + replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc, kvcoord.OnlyPotentialLeaseholders) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -165,7 +165,9 @@ func (o *closestOracle) Oracle(_ *kv.Txn) Oracle { func (o *closestOracle) ChoosePreferredReplica( ctx context.Context, desc *roachpb.RangeDescriptor, _ *roachpb.ReplicaDescriptor, _ QueryState, ) (roachpb.ReplicaDescriptor, error) { - replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc) + // We know we're serving a follower read request, so consider all non-outgoing + // replicas. + replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc, kvcoord.AllExtantReplicas) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -223,7 +225,7 @@ func (o *binPackingOracle) ChoosePreferredReplica( return *leaseholder, nil } - replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc) + replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc, kvcoord.OnlyPotentialLeaseholders) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -253,9 +255,12 @@ func (o *binPackingOracle) ChoosePreferredReplica( // is available in the provided NodeDescStore. If no nodes are available, a // RangeUnavailableError is returned. func replicaSliceOrErr( - ctx context.Context, nodeDescs kvcoord.NodeDescStore, desc *roachpb.RangeDescriptor, + ctx context.Context, + nodeDescs kvcoord.NodeDescStore, + desc *roachpb.RangeDescriptor, + filter kvcoord.ReplicaSliceFilter, ) (kvcoord.ReplicaSlice, error) { - replicas, err := kvcoord.NewReplicaSlice(ctx, nodeDescs, desc, nil /* leaseholder */) + replicas, err := kvcoord.NewReplicaSlice(ctx, nodeDescs, desc, nil, filter) if err != nil { return kvcoord.ReplicaSlice{}, sqlerrors.NewRangeUnavailableError(desc.RangeID, err) }