Skip to content

Commit

Permalink
kvserver: rebalance ranges with one voter using joint configurations
Browse files Browse the repository at this point in the history
The allocator would add a voter, instead of both adding and removing the
existing voter when rebalancing ranges with one replica. Removing the
leaseholder replica was not possible prior to cockroachdb#74077, so the addition
only was necessary.

This restriction is no longer necessary. Allow rebalancing a one voter
range between stores using joint configurations, where the lease will be
transferred to the incoming voter store, from the outgoing demoting
voter.

Scattering ranges with one voter will now leave the range with exactly
one voter, where previously both the leaseholder voter evaluating the
scatter, and the new voter would be left.

Before this patch, scattering 1000 ranges with RF=1 on a 5 store
cluster:

```
  store_id | replica_count | replica_distribution | lease_count | lease_distribution
-----------+---------------+----------------------+-------------+---------------------
         1 |          1001 | ##########           |         500 | ##########
         5 |           291 | ###                  |         147 | ###
         4 |           275 | ###                  |         137 | ###
         3 |           229 | ###                  |         118 | ###
         2 |           206 | ###                  |          99 | ##
```

After:

```
  store_id | replica_count | replica_distribution | lease_count | lease_distribution
-----------+---------------+----------------------+-------------+---------------------
         2 |           242 | ##########           |         241 | ##########
         4 |           227 | ##########           |         227 | ##########
         5 |           217 | #########            |         216 | #########
         3 |           209 | #########            |         208 | #########
         1 |           106 | #####                |         109 | #####
```

Fixes: cockroachdb#108420
Fixes: cockroachdb#124171

Release note (bug fix): Scattering a range with replication factor=1, no
longer erroneously up-replicates the range to two replicas. Leases will
also no longer thrash between nodes when perturbed with replication
factor=1.
  • Loading branch information
kvoli committed May 28, 2024
1 parent 164525e commit aaabb12
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3817,7 +3817,7 @@ func RelocateOne(

var ops []kvpb.ReplicationChange
if shouldAdd && shouldRemove {
ops, _, err = replicationChangesForRebalance(
ops, _, err = ReplicationChangesForRebalance(
ctx, desc, len(existingVoters), additionTarget, removalTarget, args.targetType,
)
if err != nil {
Expand Down
51 changes: 3 additions & 48 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,7 @@ func (rq *replicateQueue) considerRebalance(
// If we have a valid rebalance action (ok == true) and we haven't
// transferred our lease away, find the rebalance changes and return them
// in an operation.
chgs, performingSwap, err := replicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget,
chgs, performingSwap, err := ReplicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget,
removeTarget, rebalanceTargetType)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1861,65 +1861,20 @@ func (rq *replicateQueue) considerRebalance(
return op, nil
}

// replicationChangesForRebalance returns a list of ReplicationChanges to
// ReplicationChangesForRebalance returns a list of ReplicationChanges to
// execute for a rebalancing decision made by the allocator.
//
// This function assumes that `addTarget` and `removeTarget` are produced by the
// allocator (i.e. they satisfy replica `constraints` and potentially
// `voter_constraints` if we're operating over voter targets).
func replicationChangesForRebalance(
func ReplicationChangesForRebalance(
ctx context.Context,
desc *roachpb.RangeDescriptor,
numExistingVoters int,
addTarget, removeTarget roachpb.ReplicationTarget,
rebalanceTargetType allocatorimpl.TargetReplicaType,
) (chgs []kvpb.ReplicationChange, performingSwap bool, err error) {
rdesc, found := desc.GetReplicaDescriptor(addTarget.StoreID)
if rebalanceTargetType == allocatorimpl.VoterTarget && numExistingVoters == 1 {
// If there's only one replica, the removal target is the
// leaseholder and this is unsupported and will fail. However,
// this is also the only way to rebalance in a single-replica
// range. If we try the atomic swap here, we'll fail doing
// nothing, and so we stay locked into the current distribution
// of replicas. (Note that maybeTransferLeaseAway above will not
// have found a target, and so will have returned (false, nil).
//
// Do the best thing we can, which is carry out the addition
// only, which should succeed, and the next time we touch this
// range, we will have one more replica and hopefully it will
// take the lease and remove the current leaseholder.
//
// It's possible that "rebalancing deadlock" can occur in other
// scenarios, it's really impossible to tell from the code given
// the constraints we support. However, the lease transfer often
// does not happen spuriously, and we can't enter dangerous
// configurations sporadically, so this code path is only hit
// when we know it's necessary, picking the smaller of two evils.
//
// See https://github.com/cockroachdb/cockroach/issues/40333.
log.KvDistribution.Infof(ctx, "can't swap replica due to lease; falling back to add")

// Even when there is only 1 existing voter, there may be other replica
// types in the range. Check if the add target already has a replica, if so
// it must be a non-voter or the rebalance is invalid.
if found && rdesc.Type == roachpb.NON_VOTER {
// The receiving store already has a non-voting replica. Instead of just
// adding a voter to the receiving store, we *must* promote the non-voting
// replica to a voter.
chgs = kvpb.ReplicationChangesForPromotion(addTarget)
} else if !found {
chgs = []kvpb.ReplicationChange{
{ChangeType: roachpb.ADD_VOTER, Target: addTarget},
}
} else {
return nil, false, errors.AssertionFailedf(
"invalid rebalancing decision: trying to"+
" move voter to a store that already has a replica %s for the range", rdesc,
)
}
return chgs, false, err
}

switch rebalanceTargetType {
case allocatorimpl.VoterTarget:
// Check if the target being added already has a non-voting replica.
Expand Down
165 changes: 165 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2612,3 +2612,168 @@ func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) {
return checkLeaseCount(nextPreferredNode, numRanges)
})
}

// TestReplicationChangesForRebalance asserts that the replication changes for
// rebalancing are correct, given a a range descriptor and rebalance target.
func TestReplicationChangesForRebalance(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

testCases := []struct {
name string
desc *roachpb.RangeDescriptor
addTarget, removeTarget roachpb.ReplicationTarget
rebalanceTargetType allocatorimpl.TargetReplicaType
expectedChanges []kvpb.ReplicationChange
expectedPerformingSwap bool
expectedErrStr string
}{
{
name: "rf=1 rebalance voter 1->2",
desc: &roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL},
},
},
addTarget: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2},
removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1},
rebalanceTargetType: allocatorimpl.VoterTarget,
expectedChanges: []kvpb.ReplicationChange{
{ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}},
{ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}},
},
expectedPerformingSwap: false,
expectedErrStr: "",
},
{
name: "rf=3 rebalance voter 1->4",
desc: &roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL},
{NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL},
{NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL},
},
},
addTarget: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4},
removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1},
rebalanceTargetType: allocatorimpl.VoterTarget,
expectedChanges: []kvpb.ReplicationChange{
{ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}},
{ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}},
},
expectedPerformingSwap: false,
expectedErrStr: "",
},
{
name: "rf=3 rebalance voter 1->3 error: already has a voter",
desc: &roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL},
{NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL},
{NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL},
},
},
addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3},
removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1},
rebalanceTargetType: allocatorimpl.VoterTarget,
expectedChanges: nil,
expectedPerformingSwap: false,
expectedErrStr: "programming error: store being rebalanced to(3) already has a voting replica",
},
{
name: "rf=3 rebalance non-voter: 1->4",
desc: &roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER},
{NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL},
{NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL},
},
},
addTarget: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4},
removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1},
rebalanceTargetType: allocatorimpl.NonVoterTarget,
expectedChanges: []kvpb.ReplicationChange{
{ChangeType: roachpb.ADD_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}},
{ChangeType: roachpb.REMOVE_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}},
},
expectedPerformingSwap: false,
expectedErrStr: "",
},
{
name: "rf=3 rebalance non-voter 1->3 error: already has a voter",
desc: &roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER},
{NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL},
{NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL},
},
},
addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3},
removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1},
rebalanceTargetType: allocatorimpl.NonVoterTarget,
expectedChanges: nil,
expectedPerformingSwap: false,
expectedErrStr: "invalid rebalancing decision: trying to move non-voter to a store that already has a replica",
},
{
name: "rf=3 rebalance non-voter 1->3 error: already has a non-voter",
desc: &roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, Type: roachpb.NON_VOTER},
{NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL},
{NodeID: 3, StoreID: 3, Type: roachpb.NON_VOTER},
},
},
addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3},
removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1},
rebalanceTargetType: allocatorimpl.NonVoterTarget,
expectedChanges: nil,
expectedPerformingSwap: false,
expectedErrStr: "invalid rebalancing decision: trying to move non-voter to a store that already has a replica",
},
{
name: "rf=3 rebalance voter 1->3 swap",
desc: &roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, Type: roachpb.VOTER_FULL},
{NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL},
{NodeID: 3, StoreID: 3, Type: roachpb.NON_VOTER},
},
},
addTarget: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3},
removeTarget: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1},
rebalanceTargetType: allocatorimpl.VoterTarget,
expectedChanges: []kvpb.ReplicationChange{
{ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}},
{ChangeType: roachpb.REMOVE_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}},
{ChangeType: roachpb.ADD_NON_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}},
{ChangeType: roachpb.REMOVE_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}},
},
expectedPerformingSwap: true,
expectedErrStr: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
chgs, performingSwap, err := kvserver.ReplicationChangesForRebalance(
ctx,
tc.desc,
len(tc.desc.Replicas().VoterDescriptors()),
tc.addTarget,
tc.removeTarget,
tc.rebalanceTargetType,
)
require.Equal(t, tc.expectedChanges, chgs)
require.Equal(t, tc.expectedPerformingSwap, performingSwap)
if tc.expectedErrStr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrStr)
} else {
require.NoError(t, err)
}
})
}
}
99 changes: 99 additions & 0 deletions pkg/sql/scatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand All @@ -28,6 +30,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
)

func TestScatterRandomizeLeases(t *testing.T) {
Expand Down Expand Up @@ -175,3 +179,98 @@ func TestScatterResponse(t *testing.T) {
t.Fatalf("expected %d rows, but got %d", e, a)
}
}

// TestScatterWithOneVoter tests that the scatter command works when the
// replication factor is set to 1. We expect that the total number of replicas
// remains unchanged and that the scattering store loses some replicas. Note we
// don't assert on the final distribution being even across all stores, scatter
// promises randomness, not necessarily uniformity.
func TestScatterWithOneVoter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // Too slow under stressrace.
skip.UnderDeadlock(t)
skip.UnderShort(t)

zcfg := zonepb.DefaultZoneConfig()
zcfg.NumReplicas = proto.Int32(1)

tc := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DefaultZoneConfigOverride: &zcfg,
DefaultSystemZoneConfigOverride: &zcfg,
},
},
},
})
defer tc.Stopper().Stop(context.Background())

sqlutils.CreateTable(
t, tc.ServerConn(0), "t",
"k INT PRIMARY KEY, v INT",
500, /* numRows */
sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(10)),
)

r := sqlutils.MakeSQLRunner(tc.ServerConn(0))

// Create 49 splits, for 50 ranges in the test table.
r.Exec(t, "ALTER TABLE test.t SPLIT AT (SELECT i*10 FROM generate_series(1, 49) AS g(i))")

getReplicaCounts := func() (map[int]int, int, error) {
rows := r.Query(t, `
WITH ranges_info AS (
SHOW RANGES FROM TABLE test.t
)
SELECT
store_id,
count(*) AS replica_count
FROM
(
SELECT
unnest(replicas) AS store_id
FROM
ranges_info
) AS store_replicas
GROUP BY
store_id;`)
replicaCounts := make(map[int]int)
totalReplicas := 0
for rows.Next() {
var storeID, replicaCount int
if err := rows.Scan(&storeID, &replicaCount); err != nil {
return nil, 0, err
}
replicaCounts[storeID] = replicaCount
totalReplicas += replicaCount
}
if err := rows.Err(); err != nil {
return nil, 0, err
}
return replicaCounts, totalReplicas, nil
}

oldReplicaCounts, oldTotalReplicas, err := getReplicaCounts()
if err != nil {
t.Fatal(err)
}

// Expect that the number of replicas on store 1 to have changed. We can't
// assert that the distribution will be even across all three stores, but s1
// (the initial leaseholder and replica) should have a different number of
// replicas than before. Rebalancing is otherwise disabled in this test, so
// the only replica movements are from the scatter.
r.Exec(t, "ALTER TABLE test.t SCATTER")
newReplicaCounts, newTotalReplicas, err := getReplicaCounts()
require.NoError(t, err)

require.Equal(t, oldTotalReplicas, newTotalReplicas,
"expected total replica count to remain the same post-scatter, "+
"old replica counts(%d): %v, new replica counts(%d): %v",
oldTotalReplicas, oldReplicaCounts, newTotalReplicas, newReplicaCounts)
require.NotEqual(t, oldReplicaCounts[1], newReplicaCounts[1])
}

0 comments on commit aaabb12

Please sign in to comment.