Skip to content

Commit

Permalink
kvserver: support atomic promotions and demotions of non-voting replicas
Browse files Browse the repository at this point in the history
This PR teaches `AdminChangeReplicas` to atomically promote voters to
non-voters, demote voters to non-voters or swap voters with non-voters
via joint consensus.

Fixes #58499
Informs #51943

Release note: None
  • Loading branch information
aayushshah15 committed Feb 22, 2021
1 parent 682582f commit e924d91
Show file tree
Hide file tree
Showing 28 changed files with 939 additions and 400 deletions.
24 changes: 19 additions & 5 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (a Allocator) simulateRemoveTarget(
func (a Allocator) removeTarget(
ctx context.Context,
zone *zonepb.ZoneConfig,
candidates []roachpb.ReplicaDescriptor,
candidates []roachpb.ReplicationTarget,
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
targetType targetReplicaType,
Expand Down Expand Up @@ -815,7 +815,14 @@ func (a Allocator) RemoveVoter(
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
) (roachpb.ReplicaDescriptor, string, error) {
return a.removeTarget(ctx, zone, voterCandidates, existingVoters, existingNonVoters, voterTarget)
return a.removeTarget(
ctx,
zone,
roachpb.MakeReplicaSet(voterCandidates).ReplicationTargets(),
existingVoters,
existingNonVoters,
voterTarget,
)
}

// RemoveNonVoter returns a suitable non-voting replica to remove from the
Expand All @@ -830,7 +837,14 @@ func (a Allocator) RemoveNonVoter(
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
) (roachpb.ReplicaDescriptor, string, error) {
return a.removeTarget(ctx, zone, nonVoterCandidates, existingVoters, existingNonVoters, nonVoterTarget)
return a.removeTarget(
ctx,
zone,
roachpb.MakeReplicaSet(nonVoterCandidates).ReplicationTargets(),
existingVoters,
existingNonVoters,
nonVoterTarget,
)
}

// RebalanceTarget returns a suitable store for a rebalance target with
Expand Down Expand Up @@ -1089,7 +1103,7 @@ func (a *Allocator) TransferLeaseTarget(
// If the current leaseholder is not preferred, set checkTransferLeaseSource
// to false to motivate the below logic to transfer the lease.
existing = preferred
if !storeHasReplica(leaseStoreID, preferred) {
if !storeHasReplica(leaseStoreID, roachpb.MakeReplicaSet(preferred).ReplicationTargets()) {
checkTransferLeaseSource = false
}
}
Expand Down Expand Up @@ -1189,7 +1203,7 @@ func (a *Allocator) ShouldTransferLease(
existing = preferred
// If the current leaseholder isn't one of the preferred stores, then we
// should try to transfer the lease.
if !storeHasReplica(leaseStoreID, existing) {
if !storeHasReplica(leaseStoreID, roachpb.MakeReplicaSet(existing).ReplicationTargets()) {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ func nodeHasReplica(nodeID roachpb.NodeID, existing []roachpb.ReplicaDescriptor)

// storeHasReplica returns true if the provided StoreID contains an entry in
// the provided list of existing replicas.
func storeHasReplica(storeID roachpb.StoreID, existing []roachpb.ReplicaDescriptor) bool {
func storeHasReplica(storeID roachpb.StoreID, existing []roachpb.ReplicationTarget) bool {
for _, r := range existing {
if r.StoreID == storeID {
return true
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ func TestStoreHasReplica(t *testing.T) {
existing = append(existing, roachpb.ReplicaDescriptor{StoreID: roachpb.StoreID(i)})
}
for i := 1; i < 10; i++ {
if e, a := i%2 == 0, storeHasReplica(roachpb.StoreID(i), existing); e != a {
if e, a := i%2 == 0,
storeHasReplica(roachpb.StoreID(i), roachpb.MakeReplicaSet(existing).ReplicationTargets()); e != a {
t.Errorf("StoreID %d expected to be %t, got %t", i, e, a)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ func TestCheckCanReceiveLease(t *testing.T) {
{leaseholderType: roachpb.VOTER_FULL, eligible: true},
{leaseholderType: roachpb.VOTER_INCOMING, eligible: false},
{leaseholderType: roachpb.VOTER_OUTGOING, eligible: false},
{leaseholderType: roachpb.VOTER_DEMOTING, eligible: false},
{leaseholderType: roachpb.VOTER_DEMOTING_LEARNER, eligible: false},
{leaseholderType: roachpb.VOTER_DEMOTING_NON_VOTER, eligible: false},
{leaseholderType: roachpb.LEARNER, eligible: false},
{leaseholderType: roachpb.NON_VOTER, eligible: false},
} {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func TestCannotTransferLeaseToVoterOutgoing(t *testing.T) {
// We have a sleep loop below to try to encourage the lease transfer
// to make it past that sanity check prior to letting the change
// of replicas proceed.
"cannot transfer lease to replica of type VOTER_DEMOTING", err.Error())
"cannot transfer lease to replica of type VOTER_DEMOTING_LEARNER", err.Error())
}()
// Try really hard to make sure that our request makes it past the
// sanity check error to the evaluation error.
Expand Down
40 changes: 36 additions & 4 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2272,8 +2272,7 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) {
if rand.Intn(2) == 0 {
op = roachpb.ADD_NON_VOTER
}
_, err := db.AdminChangeReplicas(
ctx, key, rangeInfo.Desc, roachpb.MakeReplicationChanges(op, pickTargets()...))
_, err := db.AdminChangeReplicas(ctx, key, rangeInfo.Desc, roachpb.MakeReplicationChanges(op, pickTargets()...))
return err
}
wg.Add(actors)
Expand All @@ -2284,7 +2283,7 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) {
var gotSuccess bool
for _, err := range errors {
if err != nil {
assert.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
require.Truef(t, kvserver.IsRetriableReplicationChangeError(err), "%s; desc: %v", err, rangeInfo.Desc)
} else if gotSuccess {
t.Error("expected only one success")
} else {
Expand All @@ -2293,6 +2292,39 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) {
}
}

func TestChangeReplicasSwapVoterWithNonVoter(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderRace(t)

const numNodes = 7
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

key := tc.ScratchRange(t)
// NB: The test cluster starts with firstVoter having a voting replica (and
// the lease) for all ranges.
firstVoter, secondVoter, nonVoter := tc.Target(0), tc.Target(1), tc.Target(3)
firstStore, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID())
require.NoError(t, err)
firstRepl := firstStore.LookupReplica(roachpb.RKey(key))
require.NotNil(t, firstRepl, `the first node in the TestCluster must have a replica for the ScratchRange`)

// TODO(aayush): Trying to swap the last voting replica with a non-voter hits
// the safeguard inside Replica.propose() as the last voting replica is always
// the leaseholder. There are a bunch of subtleties around getting a
// leaseholder to remove itself without another voter to immediately transfer
// the lease to. Determine if/how this needs to be fixed.
tc.AddNonVotersOrFatal(t, key, nonVoter)
_, err = tc.SwapVoterWithNonVoter(key, firstVoter, nonVoter)
require.Regexp(t, "received invalid ChangeReplicasTrigger", err)

tc.AddVotersOrFatal(t, key, secondVoter)
tc.SwapVoterWithNonVoterOrFatal(t, key, secondVoter, nonVoter)
}

// TestReplicaTombstone ensures that tombstones are written when we expect
// them to be. Tombstones are laid down when replicas are removed.
// Replicas are removed for several reasons:
Expand Down Expand Up @@ -2871,7 +2903,7 @@ func TestChangeReplicasLeaveAtomicRacesWithMerge(t *testing.T) {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: blockOnChangeReplicasRead,
ReplicaAddStopAfterJointConfig: func() bool {
VoterAddStopAfterJointConfig: func() bool {
return stopAfterJointConfig.Load().(bool)
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) {
t.Fatal(err)
}
// Add a new voting replica, which should get the range into a joint config.
// It will stay in that state because of the `ReplicaAddStopAfterJointConfig`
// It will stay in that state because of the `VoterAddStopAfterJointConfig`
// testing knob `makeReplicationTestKnobs()`.
ltk.withStopAfterJointConfig(func() {
tc.AddVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2))
Expand Down
32 changes: 16 additions & 16 deletions pkg/kv/kvserver/raft.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ message SnapshotRequest {
// for upreplication, before they're promoted to full voters.
//
// As of the time of writing, we only send this snapshot from the
// atomicReplicationChange after creating a new LEARNER replica.
// execReplicationChangesForVoters after creating a new LEARNER replica.
LEARNER_INITIAL = 1;
reserved 2;
}
Expand Down
Loading

0 comments on commit e924d91

Please sign in to comment.