From aaabb1299c61bd0308b3df89ca6c1aba420ce1c3 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 16 May 2024 11:28:55 -0400 Subject: [PATCH] kvserver: rebalance ranges with one voter using joint configurations The allocator would add a voter, instead of both adding and removing the existing voter when rebalancing ranges with one replica. Removing the leaseholder replica was not possible prior to #74077, so the addition only was necessary. This restriction is no longer necessary. Allow rebalancing a one voter range between stores using joint configurations, where the lease will be transferred to the incoming voter store, from the outgoing demoting voter. Scattering ranges with one voter will now leave the range with exactly one voter, where previously both the leaseholder voter evaluating the scatter, and the new voter would be left. Before this patch, scattering 1000 ranges with RF=1 on a 5 store cluster: ``` store_id | replica_count | replica_distribution | lease_count | lease_distribution -----------+---------------+----------------------+-------------+--------------------- 1 | 1001 | ########## | 500 | ########## 5 | 291 | ### | 147 | ### 4 | 275 | ### | 137 | ### 3 | 229 | ### | 118 | ### 2 | 206 | ### | 99 | ## ``` After: ``` store_id | replica_count | replica_distribution | lease_count | lease_distribution -----------+---------------+----------------------+-------------+--------------------- 2 | 242 | ########## | 241 | ########## 4 | 227 | ########## | 227 | ########## 5 | 217 | ######### | 216 | ######### 3 | 209 | ######### | 208 | ######### 1 | 106 | ##### | 109 | ##### ``` Fixes: #108420 Fixes: #124171 Release note (bug fix): Scattering a range with replication factor=1, no longer erroneously up-replicates the range to two replicas. Leases will also no longer thrash between nodes when perturbed with replication factor=1. --- pkg/kv/kvserver/replica_command.go | 2 +- pkg/kv/kvserver/replicate_queue.go | 51 +------- pkg/kv/kvserver/replicate_queue_test.go | 165 ++++++++++++++++++++++++ pkg/sql/scatter_test.go | 99 ++++++++++++++ 4 files changed, 268 insertions(+), 49 deletions(-) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 8acddc1111aa..fe4988232304 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3817,7 +3817,7 @@ func RelocateOne( var ops []kvpb.ReplicationChange if shouldAdd && shouldRemove { - ops, _, err = replicationChangesForRebalance( + ops, _, err = ReplicationChangesForRebalance( ctx, desc, len(existingVoters), additionTarget, removalTarget, args.targetType, ) if err != nil { diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 3c4cba0dc09e..4ba2bf01d968 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -1827,7 +1827,7 @@ func (rq *replicateQueue) considerRebalance( // If we have a valid rebalance action (ok == true) and we haven't // transferred our lease away, find the rebalance changes and return them // in an operation. - chgs, performingSwap, err := replicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget, + chgs, performingSwap, err := ReplicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget, removeTarget, rebalanceTargetType) if err != nil { return nil, err @@ -1861,13 +1861,13 @@ func (rq *replicateQueue) considerRebalance( return op, nil } -// replicationChangesForRebalance returns a list of ReplicationChanges to +// ReplicationChangesForRebalance returns a list of ReplicationChanges to // execute for a rebalancing decision made by the allocator. // // This function assumes that `addTarget` and `removeTarget` are produced by the // allocator (i.e. they satisfy replica `constraints` and potentially // `voter_constraints` if we're operating over voter targets). -func replicationChangesForRebalance( +func ReplicationChangesForRebalance( ctx context.Context, desc *roachpb.RangeDescriptor, numExistingVoters int, @@ -1875,51 +1875,6 @@ func replicationChangesForRebalance( rebalanceTargetType allocatorimpl.TargetReplicaType, ) (chgs []kvpb.ReplicationChange, performingSwap bool, err error) { rdesc, found := desc.GetReplicaDescriptor(addTarget.StoreID) - if rebalanceTargetType == allocatorimpl.VoterTarget && numExistingVoters == 1 { - // If there's only one replica, the removal target is the - // leaseholder and this is unsupported and will fail. However, - // this is also the only way to rebalance in a single-replica - // range. If we try the atomic swap here, we'll fail doing - // nothing, and so we stay locked into the current distribution - // of replicas. (Note that maybeTransferLeaseAway above will not - // have found a target, and so will have returned (false, nil). - // - // Do the best thing we can, which is carry out the addition - // only, which should succeed, and the next time we touch this - // range, we will have one more replica and hopefully it will - // take the lease and remove the current leaseholder. - // - // It's possible that "rebalancing deadlock" can occur in other - // scenarios, it's really impossible to tell from the code given - // the constraints we support. However, the lease transfer often - // does not happen spuriously, and we can't enter dangerous - // configurations sporadically, so this code path is only hit - // when we know it's necessary, picking the smaller of two evils. - // - // See https://github.com/cockroachdb/cockroach/issues/40333. - log.KvDistribution.Infof(ctx, "can't swap replica due to lease; falling back to add") - - // Even when there is only 1 existing voter, there may be other replica - // types in the range. Check if the add target already has a replica, if so - // it must be a non-voter or the rebalance is invalid. - if found && rdesc.Type == roachpb.NON_VOTER { - // The receiving store already has a non-voting replica. Instead of just - // adding a voter to the receiving store, we *must* promote the non-voting - // replica to a voter. - chgs = kvpb.ReplicationChangesForPromotion(addTarget) - } else if !found { - chgs = []kvpb.ReplicationChange{ - {ChangeType: roachpb.ADD_VOTER, Target: addTarget}, - } - } else { - return nil, false, errors.AssertionFailedf( - "invalid rebalancing decision: trying to"+ - " move voter to a store that already has a replica %s for the range", rdesc, - ) - } - return chgs, false, err - } - switch rebalanceTargetType { case allocatorimpl.VoterTarget: // Check if the target being added already has a non-voting replica. diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 31ec41ca8438..6998a10a21d6 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -2612,3 +2612,168 @@ func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) { return checkLeaseCount(nextPreferredNode, numRanges) }) } + +// TestReplicationChangesForRebalance asserts that the replication changes for +// rebalancing are correct, given a a range descriptor and rebalance target. +func TestReplicationChangesForRebalance(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + testCases := []struct { + name string + desc *roachpb.RangeDescriptor + addTarget, removeTarget roachpb.ReplicationTarget + rebalanceTargetType allocatorimpl.TargetReplicaType + expectedChanges []kvpb.ReplicationChange + expectedPerformingSwap bool + expectedErrStr string + }{ + { + name: "rf=1 rebalance voter 1->2", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.VoterTarget, + expectedChanges: []kvpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}}, + {ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }, + expectedPerformingSwap: false, + expectedErrStr: "", + }, + { + name: "rf=3 rebalance voter 1->4", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.VoterTarget, + expectedChanges: []kvpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}}, + {ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }, + expectedPerformingSwap: false, + expectedErrStr: "", + }, + { + name: "rf=3 rebalance voter 1->3 error: already has a voter", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.VoterTarget, + expectedChanges: nil, + expectedPerformingSwap: false, + expectedErrStr: "programming error: store being rebalanced to(3) already has a voting replica", + }, + { + name: "rf=3 rebalance non-voter: 1->4", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.NonVoterTarget, + expectedChanges: []kvpb.ReplicationChange{ + {ChangeType: roachpb.ADD_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}}, + {ChangeType: roachpb.REMOVE_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }, + expectedPerformingSwap: false, + expectedErrStr: "", + }, + { + name: "rf=3 rebalance non-voter 1->3 error: already has a voter", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.NonVoterTarget, + expectedChanges: nil, + expectedPerformingSwap: false, + expectedErrStr: "invalid rebalancing decision: trying to move non-voter to a store that already has a replica", + }, + { + name: "rf=3 rebalance non-voter 1->3 error: already has a non-voter", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.NON_VOTER}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.NonVoterTarget, + expectedChanges: nil, + expectedPerformingSwap: false, + expectedErrStr: "invalid rebalancing decision: trying to move non-voter to a store that already has a replica", + }, + { + name: "rf=3 rebalance voter 1->3 swap", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.NON_VOTER}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.VoterTarget, + expectedChanges: []kvpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}}, + {ChangeType: roachpb.REMOVE_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}}, + {ChangeType: roachpb.ADD_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }, + expectedPerformingSwap: true, + expectedErrStr: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + chgs, performingSwap, err := kvserver.ReplicationChangesForRebalance( + ctx, + tc.desc, + len(tc.desc.Replicas().VoterDescriptors()), + tc.addTarget, + tc.removeTarget, + tc.rebalanceTargetType, + ) + require.Equal(t, tc.expectedChanges, chgs) + require.Equal(t, tc.expectedPerformingSwap, performingSwap) + if tc.expectedErrStr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErrStr) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/sql/scatter_test.go b/pkg/sql/scatter_test.go index edcdf16687f2..f08380bae5c0 100644 --- a/pkg/sql/scatter_test.go +++ b/pkg/sql/scatter_test.go @@ -17,8 +17,10 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -28,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" ) func TestScatterRandomizeLeases(t *testing.T) { @@ -175,3 +179,98 @@ func TestScatterResponse(t *testing.T) { t.Fatalf("expected %d rows, but got %d", e, a) } } + +// TestScatterWithOneVoter tests that the scatter command works when the +// replication factor is set to 1. We expect that the total number of replicas +// remains unchanged and that the scattering store loses some replicas. Note we +// don't assert on the final distribution being even across all stores, scatter +// promises randomness, not necessarily uniformity. +func TestScatterWithOneVoter(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // Too slow under stressrace. + skip.UnderDeadlock(t) + skip.UnderShort(t) + + zcfg := zonepb.DefaultZoneConfig() + zcfg.NumReplicas = proto.Int32(1) + + tc := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DefaultZoneConfigOverride: &zcfg, + DefaultSystemZoneConfigOverride: &zcfg, + }, + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + + sqlutils.CreateTable( + t, tc.ServerConn(0), "t", + "k INT PRIMARY KEY, v INT", + 500, /* numRows */ + sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(10)), + ) + + r := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + // Create 49 splits, for 50 ranges in the test table. + r.Exec(t, "ALTER TABLE test.t SPLIT AT (SELECT i*10 FROM generate_series(1, 49) AS g(i))") + + getReplicaCounts := func() (map[int]int, int, error) { + rows := r.Query(t, ` + WITH ranges_info AS ( + SHOW RANGES FROM TABLE test.t + ) + SELECT + store_id, + count(*) AS replica_count + FROM + ( + SELECT + unnest(replicas) AS store_id + FROM + ranges_info + ) AS store_replicas + GROUP BY + store_id;`) + replicaCounts := make(map[int]int) + totalReplicas := 0 + for rows.Next() { + var storeID, replicaCount int + if err := rows.Scan(&storeID, &replicaCount); err != nil { + return nil, 0, err + } + replicaCounts[storeID] = replicaCount + totalReplicas += replicaCount + } + if err := rows.Err(); err != nil { + return nil, 0, err + } + return replicaCounts, totalReplicas, nil + } + + oldReplicaCounts, oldTotalReplicas, err := getReplicaCounts() + if err != nil { + t.Fatal(err) + } + + // Expect that the number of replicas on store 1 to have changed. We can't + // assert that the distribution will be even across all three stores, but s1 + // (the initial leaseholder and replica) should have a different number of + // replicas than before. Rebalancing is otherwise disabled in this test, so + // the only replica movements are from the scatter. + r.Exec(t, "ALTER TABLE test.t SCATTER") + newReplicaCounts, newTotalReplicas, err := getReplicaCounts() + require.NoError(t, err) + + require.Equal(t, oldTotalReplicas, newTotalReplicas, + "expected total replica count to remain the same post-scatter, "+ + "old replica counts(%d): %v, new replica counts(%d): %v", + oldTotalReplicas, oldReplicaCounts, newTotalReplicas, newReplicaCounts) + require.NotEqual(t, oldReplicaCounts[1], newReplicaCounts[1]) +}