Skip to content

Commit

Permalink
kvserver: update AdminRelocateRange to leverage explicit swaps of
Browse files Browse the repository at this point in the history
voters to non-voters

This commit updates `AdminRelocateRange` to use explicit atomic swaps of
voting replicas with non-voting replicas, that cockroachdb#58627 initially added
support for. The patch does so by generalizing behavior that's already
exercised by the `replicateQueue` when it decides to rebalance replicas.
See cockroachdb#61239.

This allows us, in the next commit, to remove bespoke relocation logic
that's used by the `mergeQueue` to align replica sets for the sake of a
range merge.

Release note: None
  • Loading branch information
aayushshah15 committed Mar 28, 2021
1 parent 7c08e57 commit f460ce6
Show file tree
Hide file tree
Showing 4 changed files with 340 additions and 137 deletions.
33 changes: 31 additions & 2 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,43 @@ const (
nonVoterTarget
)

// AddChangeType returns the roachpb.ReplicaChangeType corresponding to the
// given targetReplicaType.
//
// TODO(aayush): Clean up usages of ADD_{NON_}VOTER. Use
// targetReplicaType.{Add,Remove}ChangeType methods wherever possible.
func (t targetReplicaType) AddChangeType() roachpb.ReplicaChangeType {
switch t {
case voterTarget:
return roachpb.ADD_VOTER
case nonVoterTarget:
return roachpb.ADD_NON_VOTER
default:
panic(fmt.Sprintf("unknown targetReplicaType %d", t))
}
}

// RemoveChangeType returns the roachpb.ReplicaChangeType corresponding to the
// given targetReplicaType.
func (t targetReplicaType) RemoveChangeType() roachpb.ReplicaChangeType {
switch t {
case voterTarget:
return roachpb.REMOVE_VOTER
case nonVoterTarget:
return roachpb.REMOVE_NON_VOTER
default:
panic(fmt.Sprintf("unknown targetReplicaType %d", t))
}
}

func (t targetReplicaType) String() string {
switch typ := t; typ {
switch t {
case voterTarget:
return "voter"
case nonVoterTarget:
return "non-voter"
default:
panic(fmt.Sprintf("unknown targetReplicaType %d", typ))
panic(fmt.Sprintf("unknown targetReplicaType %d", t))
}
}

Expand Down
91 changes: 86 additions & 5 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver_test

import (
"context"
"math/rand"
"sort"
"testing"

Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand All @@ -34,12 +36,16 @@ func relocateAndCheck(
voterTargets []roachpb.ReplicationTarget,
nonVoterTargets []roachpb.ReplicationTarget,
) (retries int) {
every := log.Every(1 * time.Second)
testutils.SucceedsSoon(t, func() error {
err := tc.Servers[0].DB().
AdminRelocateRange(
context.Background(), startKey.AsRawKey(), voterTargets, nonVoterTargets,
)
if err != nil {
if every.ShouldLog() {
log.Infof(context.Background(), "AdminRelocateRange failed with error: %s", err)
}
retries++
}
return err
Expand Down Expand Up @@ -96,6 +102,25 @@ func requireLeaseAt(
})
}

func usesAtomicReplicationChange(ops []roachpb.ReplicationChange) bool {
// There are 4 sets of operations that are executed atomically:
// 1. Voter rebalances (ADD_VOTER, REMOVE_VOTER)
// 2. Non-voter promoted to voter (ADD_VOTER, REMOVE_NON_VOTER)
// 3. Voter demoted to non-voter (ADD_NON_VOTER, REMOVE_VOTER)
// 4. Voter swapped with non-voter (ADD_VOTER, REMOVE_NON_VOTER,
// ADD_NON_VOTER, REMOVE_VOTER)
if len(ops) >= 2 {
if ops[0].ChangeType == roachpb.ADD_VOTER && ops[1].ChangeType.IsRemoval() {
return true
}
}
if len(ops) == 2 &&
ops[0].ChangeType == roachpb.ADD_NON_VOTER && ops[1].ChangeType == roachpb.REMOVE_VOTER {
return true
}
return false
}

func TestAdminRelocateRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -118,7 +143,7 @@ func TestAdminRelocateRange(t *testing.T) {
if ic.err != nil {
continue
}
if len(ic.ops) == 2 && ic.ops[0].ChangeType == roachpb.ADD_VOTER && ic.ops[1].ChangeType == roachpb.REMOVE_VOTER {
if usesAtomicReplicationChange(ic.ops) {
actAtomic++
} else {
actSingle += len(ic.ops)
Expand Down Expand Up @@ -206,10 +231,7 @@ func TestAdminRelocateRange(t *testing.T) {
})
}

// Relocation of non-voting replicas is not done atomically under any
// scenario.
// TODO(aayush): Update this comment and test once we support atomic swaps of
// more than 1 non-voter at a time.
// Simple non-voter relocations.
{
requireNumAtomic(0, 2, func() int {
return relocateAndCheck(t, tc, k, tc.Targets(2, 4), tc.Targets(1, 3))
Expand All @@ -223,4 +245,63 @@ func TestAdminRelocateRange(t *testing.T) {
return relocateAndCheck(t, tc, k, tc.Targets(2, 4), tc.Targets(0, 3))
})
}

// Relocation scenarios that require swapping of voters with non-voters.
{
// Single swap of voter and non-voter.
requireNumAtomic(1, 0, func() int {
return relocateAndCheck(t, tc, k, tc.Targets(0, 4), tc.Targets(2, 3))
})
// Multiple swaps.
requireNumAtomic(2, 0, func() int {
return relocateAndCheck(t, tc, k, tc.Targets(2, 3), tc.Targets(0, 4))
})
// Single promotion of non-voter to a voter.
requireNumAtomic(1, 0, func() int {
return relocateAndCheck(t, tc, k, tc.Targets(2, 3, 4), tc.Targets(0))
})
// Single demotion of voter to a non-voter.
requireNumAtomic(1, 0, func() int {
return relocateAndCheck(t, tc, k, tc.Targets(2, 4), tc.Targets(0, 3))
})
}
}

// TestAdminRelocateRangeRandom runs a series of random relocations on a scratch
// range and checks to ensure that the relocations were successfully executed.
func TestAdminRelocateRangeRandom(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
}
numNodes, numIterations := 9, 10
if util.RaceEnabled {
numNodes, numIterations = 4, 1
}

randomRelocationTargets := func() (voterTargets, nonVoterTargets []int) {
targets := make([]int, numNodes)
for i := 0; i < numNodes; i++ {
targets[i] = i
}
numVoters := 1 + rand.Intn(numNodes) // Need at least one voter.
rand.Shuffle(numNodes, func(i, j int) {
targets[i], targets[j] = targets[j], targets[i]
})

return targets[:numVoters], targets[numVoters:]
}

tc := testcluster.StartTestCluster(t, numNodes, args)
defer tc.Stopper().Stop(ctx)

k := keys.MustAddr(tc.ScratchRange(t))
for i := 0; i < numIterations; i++ {
voters, nonVoters := randomRelocationTargets()
relocateAndCheck(t, tc, k, tc.Targets(voters...), tc.Targets(nonVoters...))
}
}
Loading

0 comments on commit f460ce6

Please sign in to comment.