diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 8acddc1111aa..fe4988232304 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3817,7 +3817,7 @@ func RelocateOne( var ops []kvpb.ReplicationChange if shouldAdd && shouldRemove { - ops, _, err = replicationChangesForRebalance( + ops, _, err = ReplicationChangesForRebalance( ctx, desc, len(existingVoters), additionTarget, removalTarget, args.targetType, ) if err != nil { diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 3c4cba0dc09e..4ba2bf01d968 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -1827,7 +1827,7 @@ func (rq *replicateQueue) considerRebalance( // If we have a valid rebalance action (ok == true) and we haven't // transferred our lease away, find the rebalance changes and return them // in an operation. - chgs, performingSwap, err := replicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget, + chgs, performingSwap, err := ReplicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget, removeTarget, rebalanceTargetType) if err != nil { return nil, err @@ -1861,13 +1861,13 @@ func (rq *replicateQueue) considerRebalance( return op, nil } -// replicationChangesForRebalance returns a list of ReplicationChanges to +// ReplicationChangesForRebalance returns a list of ReplicationChanges to // execute for a rebalancing decision made by the allocator. // // This function assumes that `addTarget` and `removeTarget` are produced by the // allocator (i.e. they satisfy replica `constraints` and potentially // `voter_constraints` if we're operating over voter targets). -func replicationChangesForRebalance( +func ReplicationChangesForRebalance( ctx context.Context, desc *roachpb.RangeDescriptor, numExistingVoters int, @@ -1875,51 +1875,6 @@ func replicationChangesForRebalance( rebalanceTargetType allocatorimpl.TargetReplicaType, ) (chgs []kvpb.ReplicationChange, performingSwap bool, err error) { rdesc, found := desc.GetReplicaDescriptor(addTarget.StoreID) - if rebalanceTargetType == allocatorimpl.VoterTarget && numExistingVoters == 1 { - // If there's only one replica, the removal target is the - // leaseholder and this is unsupported and will fail. However, - // this is also the only way to rebalance in a single-replica - // range. If we try the atomic swap here, we'll fail doing - // nothing, and so we stay locked into the current distribution - // of replicas. (Note that maybeTransferLeaseAway above will not - // have found a target, and so will have returned (false, nil). - // - // Do the best thing we can, which is carry out the addition - // only, which should succeed, and the next time we touch this - // range, we will have one more replica and hopefully it will - // take the lease and remove the current leaseholder. - // - // It's possible that "rebalancing deadlock" can occur in other - // scenarios, it's really impossible to tell from the code given - // the constraints we support. However, the lease transfer often - // does not happen spuriously, and we can't enter dangerous - // configurations sporadically, so this code path is only hit - // when we know it's necessary, picking the smaller of two evils. - // - // See https://github.com/cockroachdb/cockroach/issues/40333. - log.KvDistribution.Infof(ctx, "can't swap replica due to lease; falling back to add") - - // Even when there is only 1 existing voter, there may be other replica - // types in the range. Check if the add target already has a replica, if so - // it must be a non-voter or the rebalance is invalid. - if found && rdesc.Type == roachpb.NON_VOTER { - // The receiving store already has a non-voting replica. Instead of just - // adding a voter to the receiving store, we *must* promote the non-voting - // replica to a voter. - chgs = kvpb.ReplicationChangesForPromotion(addTarget) - } else if !found { - chgs = []kvpb.ReplicationChange{ - {ChangeType: roachpb.ADD_VOTER, Target: addTarget}, - } - } else { - return nil, false, errors.AssertionFailedf( - "invalid rebalancing decision: trying to"+ - " move voter to a store that already has a replica %s for the range", rdesc, - ) - } - return chgs, false, err - } - switch rebalanceTargetType { case allocatorimpl.VoterTarget: // Check if the target being added already has a non-voting replica. diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 31ec41ca8438..6998a10a21d6 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -2612,3 +2612,168 @@ func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) { return checkLeaseCount(nextPreferredNode, numRanges) }) } + +// TestReplicationChangesForRebalance asserts that the replication changes for +// rebalancing are correct, given a a range descriptor and rebalance target. +func TestReplicationChangesForRebalance(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + testCases := []struct { + name string + desc *roachpb.RangeDescriptor + addTarget, removeTarget roachpb.ReplicationTarget + rebalanceTargetType allocatorimpl.TargetReplicaType + expectedChanges []kvpb.ReplicationChange + expectedPerformingSwap bool + expectedErrStr string + }{ + { + name: "rf=1 rebalance voter 1->2", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.VoterTarget, + expectedChanges: []kvpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}}, + {ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }, + expectedPerformingSwap: false, + expectedErrStr: "", + }, + { + name: "rf=3 rebalance voter 1->4", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.VoterTarget, + expectedChanges: []kvpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}}, + {ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }, + expectedPerformingSwap: false, + expectedErrStr: "", + }, + { + name: "rf=3 rebalance voter 1->3 error: already has a voter", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.VoterTarget, + expectedChanges: nil, + expectedPerformingSwap: false, + expectedErrStr: "programming error: store being rebalanced to(3) already has a voting replica", + }, + { + name: "rf=3 rebalance non-voter: 1->4", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.NonVoterTarget, + expectedChanges: []kvpb.ReplicationChange{ + {ChangeType: roachpb.ADD_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}}, + {ChangeType: roachpb.REMOVE_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }, + expectedPerformingSwap: false, + expectedErrStr: "", + }, + { + name: "rf=3 rebalance non-voter 1->3 error: already has a voter", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.NonVoterTarget, + expectedChanges: nil, + expectedPerformingSwap: false, + expectedErrStr: "invalid rebalancing decision: trying to move non-voter to a store that already has a replica", + }, + { + name: "rf=3 rebalance non-voter 1->3 error: already has a non-voter", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.NON_VOTER}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.NonVoterTarget, + expectedChanges: nil, + expectedPerformingSwap: false, + expectedErrStr: "invalid rebalancing decision: trying to move non-voter to a store that already has a replica", + }, + { + name: "rf=3 rebalance voter 1->3 swap", + desc: &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL}, + {NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {NodeID: 3, StoreID: 3, Type: roachpb.NON_VOTER}, + }, + }, + addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}, + removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}, + rebalanceTargetType: allocatorimpl.VoterTarget, + expectedChanges: []kvpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}}, + {ChangeType: roachpb.REMOVE_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}}, + {ChangeType: roachpb.ADD_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }, + expectedPerformingSwap: true, + expectedErrStr: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + chgs, performingSwap, err := kvserver.ReplicationChangesForRebalance( + ctx, + tc.desc, + len(tc.desc.Replicas().VoterDescriptors()), + tc.addTarget, + tc.removeTarget, + tc.rebalanceTargetType, + ) + require.Equal(t, tc.expectedChanges, chgs) + require.Equal(t, tc.expectedPerformingSwap, performingSwap) + if tc.expectedErrStr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErrStr) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/sql/scatter_test.go b/pkg/sql/scatter_test.go index edcdf16687f2..f08380bae5c0 100644 --- a/pkg/sql/scatter_test.go +++ b/pkg/sql/scatter_test.go @@ -17,8 +17,10 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -28,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" ) func TestScatterRandomizeLeases(t *testing.T) { @@ -175,3 +179,98 @@ func TestScatterResponse(t *testing.T) { t.Fatalf("expected %d rows, but got %d", e, a) } } + +// TestScatterWithOneVoter tests that the scatter command works when the +// replication factor is set to 1. We expect that the total number of replicas +// remains unchanged and that the scattering store loses some replicas. Note we +// don't assert on the final distribution being even across all stores, scatter +// promises randomness, not necessarily uniformity. +func TestScatterWithOneVoter(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // Too slow under stressrace. + skip.UnderDeadlock(t) + skip.UnderShort(t) + + zcfg := zonepb.DefaultZoneConfig() + zcfg.NumReplicas = proto.Int32(1) + + tc := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DefaultZoneConfigOverride: &zcfg, + DefaultSystemZoneConfigOverride: &zcfg, + }, + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + + sqlutils.CreateTable( + t, tc.ServerConn(0), "t", + "k INT PRIMARY KEY, v INT", + 500, /* numRows */ + sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(10)), + ) + + r := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + // Create 49 splits, for 50 ranges in the test table. + r.Exec(t, "ALTER TABLE test.t SPLIT AT (SELECT i*10 FROM generate_series(1, 49) AS g(i))") + + getReplicaCounts := func() (map[int]int, int, error) { + rows := r.Query(t, ` + WITH ranges_info AS ( + SHOW RANGES FROM TABLE test.t + ) + SELECT + store_id, + count(*) AS replica_count + FROM + ( + SELECT + unnest(replicas) AS store_id + FROM + ranges_info + ) AS store_replicas + GROUP BY + store_id;`) + replicaCounts := make(map[int]int) + totalReplicas := 0 + for rows.Next() { + var storeID, replicaCount int + if err := rows.Scan(&storeID, &replicaCount); err != nil { + return nil, 0, err + } + replicaCounts[storeID] = replicaCount + totalReplicas += replicaCount + } + if err := rows.Err(); err != nil { + return nil, 0, err + } + return replicaCounts, totalReplicas, nil + } + + oldReplicaCounts, oldTotalReplicas, err := getReplicaCounts() + if err != nil { + t.Fatal(err) + } + + // Expect that the number of replicas on store 1 to have changed. We can't + // assert that the distribution will be even across all three stores, but s1 + // (the initial leaseholder and replica) should have a different number of + // replicas than before. Rebalancing is otherwise disabled in this test, so + // the only replica movements are from the scatter. + r.Exec(t, "ALTER TABLE test.t SCATTER") + newReplicaCounts, newTotalReplicas, err := getReplicaCounts() + require.NoError(t, err) + + require.Equal(t, oldTotalReplicas, newTotalReplicas, + "expected total replica count to remain the same post-scatter, "+ + "old replica counts(%d): %v, new replica counts(%d): %v", + oldTotalReplicas, oldReplicaCounts, newTotalReplicas, newReplicaCounts) + require.NotEqual(t, oldReplicaCounts[1], newReplicaCounts[1]) +}