Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117326: kvnemesis: add support for non-voting replicas r=kvoli a=miraradeva

This change extends the ChangeReplicas config in kvnemesis to include adding, removing and promoting non-voting replicas, as well as demoting voting replicas.

Fixes: #59060

Release note: None

117864: concurrency: prevent panic in IsKeyLockedByConflictingTxn r=arulajmani a=arulajmani

This was missing a nil check.

Closes #117855
Closes #117854

Release note: None

Co-authored-by: Mira Radeva <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
3 people committed Jan 17, 2024
3 parents 1898adc + 11d444d + 20046d3 commit 9b10f08
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 47 deletions.
16 changes: 11 additions & 5 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,17 +661,23 @@ func getRangeDesc(ctx context.Context, key roachpb.Key, dbs ...*kv.DB) roachpb.R

func newGetReplicasFn(dbs ...*kv.DB) GetReplicasFn {
ctx := context.Background()
return func(key roachpb.Key) []roachpb.ReplicationTarget {
return func(key roachpb.Key) ([]roachpb.ReplicationTarget, []roachpb.ReplicationTarget) {
desc := getRangeDesc(ctx, key, dbs...)
replicas := desc.Replicas().Descriptors()
targets := make([]roachpb.ReplicationTarget, len(replicas))
for i, replica := range replicas {
targets[i] = roachpb.ReplicationTarget{
var voters []roachpb.ReplicationTarget
var nonVoters []roachpb.ReplicationTarget
for _, replica := range replicas {
target := roachpb.ReplicationTarget{
NodeID: replica.NodeID,
StoreID: replica.StoreID,
}
if replica.Type == roachpb.NON_VOTER {
nonVoters = append(nonVoters, target)
} else {
voters = append(voters, target)
}
}
return targets
return voters, nonVoters
}
}

Expand Down
142 changes: 113 additions & 29 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,24 @@ type MergeConfig struct {
// ChangeReplicasConfig configures the relative probability of generating a
// ChangeReplicas operation.
type ChangeReplicasConfig struct {
// AddReplica adds a single replica.
AddReplica int
// RemoveReplica removes a single replica.
RemoveReplica int
// AtomicSwapReplica adds 1 replica and removes 1 replica in a single
// ChangeReplicas call.
AtomicSwapReplica int
// AddVotingReplica adds a single voting replica.
AddVotingReplica int
// RemoveVotingReplica removes a single voting replica.
RemoveVotingReplica int
// AtomicSwapVotingReplica adds 1 voting replica and removes 1 voting replica
// in a single ChangeReplicas call.
AtomicSwapVotingReplica int
// AddNonVotingReplica adds a single non-voting replica.
AddNonVotingReplica int
// RemoveNonVotingReplica removes a single non-voting replica.
RemoveNonVotingReplica int
// AtomicSwapNonVotingReplica adds 1 non-voting replica and removes 1 non-voting
// replica in a single ChangeReplicas call.
AtomicSwapNonVotingReplica int
// PromoteReplica promotes a non-voting replica to voting.
PromoteReplica int
// DemoteReplica demotes a voting replica to non-voting.
DemoteReplica int
}

// ChangeLeaseConfig configures the relative probability of generating an
Expand Down Expand Up @@ -422,9 +433,14 @@ func newAllOperationsConfig() GeneratorConfig {
MergeIsSplit: 1,
},
ChangeReplicas: ChangeReplicasConfig{
AddReplica: 1,
RemoveReplica: 1,
AtomicSwapReplica: 1,
AddVotingReplica: 1,
RemoveVotingReplica: 1,
AtomicSwapVotingReplica: 1,
AddNonVotingReplica: 1,
RemoveNonVotingReplica: 1,
AtomicSwapNonVotingReplica: 1,
PromoteReplica: 1,
DemoteReplica: 1,
},
ChangeLease: ChangeLeaseConfig{
TransferLease: 1,
Expand Down Expand Up @@ -521,9 +537,9 @@ func GeneratorDataSpan() roachpb.Span {
}
}

// GetReplicasFn is a function that returns the current replicas for the range
// containing a key.
type GetReplicasFn func(roachpb.Key) []roachpb.ReplicationTarget
// GetReplicasFn is a function that returns the current voting and non-voting
// replicas, respectively, for the range containing a key.
type GetReplicasFn func(roachpb.Key) ([]roachpb.ReplicationTarget, []roachpb.ReplicationTarget)

// Generator incrementally constructs KV traffic designed to maximally test edge
// cases.
Expand Down Expand Up @@ -627,20 +643,40 @@ func (g *generator) RandStep(rng *rand.Rand) Step {
}

key := randKey(rng)
current := g.replicasFn(roachpb.Key(key))
if len(current) < g.Config.NumNodes {
addReplicaFn := makeAddReplicaFn(key, current, false /* atomicSwap */)
addOpGen(&allowed, addReplicaFn, g.Config.Ops.ChangeReplicas.AddReplica)
voters, nonVoters := g.replicasFn(roachpb.Key(key))
numVoters, numNonVoters := len(voters), len(nonVoters)
numReplicas := numVoters + numNonVoters
if numReplicas < g.Config.NumNodes {
addVoterFn := makeAddReplicaFn(key, voters, false /* atomicSwap */, true /* voter */)
addOpGen(&allowed, addVoterFn, g.Config.Ops.ChangeReplicas.AddVotingReplica)
addNonVoterFn := makeAddReplicaFn(key, nonVoters, false /* atomicSwap */, false /* voter */)
addOpGen(&allowed, addNonVoterFn, g.Config.Ops.ChangeReplicas.AddNonVotingReplica)
}
if len(current) == g.Config.NumReplicas && len(current) < g.Config.NumNodes {
atomicSwapReplicaFn := makeAddReplicaFn(key, current, true /* atomicSwap */)
addOpGen(&allowed, atomicSwapReplicaFn, g.Config.Ops.ChangeReplicas.AtomicSwapReplica)
if numReplicas == g.Config.NumReplicas && numReplicas < g.Config.NumNodes {
atomicSwapVoterFn := makeAddReplicaFn(key, voters, true /* atomicSwap */, true /* voter */)
addOpGen(&allowed, atomicSwapVoterFn, g.Config.Ops.ChangeReplicas.AtomicSwapVotingReplica)
if numNonVoters > 0 {
atomicSwapNonVoterFn := makeAddReplicaFn(key, nonVoters, true /* atomicSwap */, false /* voter */)
addOpGen(&allowed, atomicSwapNonVoterFn, g.Config.Ops.ChangeReplicas.AtomicSwapNonVotingReplica)
}
}
if numReplicas > g.Config.NumReplicas {
removeVoterFn := makeRemoveReplicaFn(key, voters, true /* voter */)
addOpGen(&allowed, removeVoterFn, g.Config.Ops.ChangeReplicas.RemoveVotingReplica)
if numNonVoters > 0 {
removeNonVoterFn := makeRemoveReplicaFn(key, nonVoters, false /* voter */)
addOpGen(&allowed, removeNonVoterFn, g.Config.Ops.ChangeReplicas.RemoveNonVotingReplica)
}
}
if len(current) > g.Config.NumReplicas {
removeReplicaFn := makeRemoveReplicaFn(key, current)
addOpGen(&allowed, removeReplicaFn, g.Config.Ops.ChangeReplicas.RemoveReplica)
if numVoters > 0 {
demoteVoterFn := makeDemoteReplicaFn(key, voters)
addOpGen(&allowed, demoteVoterFn, g.Config.Ops.ChangeReplicas.DemoteReplica)
}
transferLeaseFn := makeTransferLeaseFn(key, current)
if numNonVoters > 0 {
promoteNonVoterFn := makePromoteReplicaFn(key, nonVoters)
addOpGen(&allowed, promoteNonVoterFn, g.Config.Ops.ChangeReplicas.PromoteReplica)
}
transferLeaseFn := makeTransferLeaseFn(key, append(voters, nonVoters...))
addOpGen(&allowed, transferLeaseFn, g.Config.Ops.ChangeLease.TransferLease)

addOpGen(&allowed, toggleGlobalReads, g.Config.Ops.ChangeZone.ToggleGlobalReads)
Expand Down Expand Up @@ -1302,17 +1338,25 @@ func randMergeIsSplit(g *generator, rng *rand.Rand) Operation {
return merge(key)
}

func makeRemoveReplicaFn(key string, current []roachpb.ReplicationTarget) opGenFunc {
func makeRemoveReplicaFn(key string, current []roachpb.ReplicationTarget, voter bool) opGenFunc {
return func(g *generator, rng *rand.Rand) Operation {
var changeType roachpb.ReplicaChangeType
if voter {
changeType = roachpb.REMOVE_VOTER
} else {
changeType = roachpb.REMOVE_NON_VOTER
}
change := kvpb.ReplicationChange{
ChangeType: roachpb.REMOVE_VOTER,
ChangeType: changeType,
Target: current[rng.Intn(len(current))],
}
return changeReplicas(key, change)
}
}

func makeAddReplicaFn(key string, current []roachpb.ReplicationTarget, atomicSwap bool) opGenFunc {
func makeAddReplicaFn(
key string, current []roachpb.ReplicationTarget, atomicSwap bool, voter bool,
) opGenFunc {
return func(g *generator, rng *rand.Rand) Operation {
candidatesMap := make(map[roachpb.ReplicationTarget]struct{})
for i := 0; i < g.Config.NumNodes; i++ {
Expand All @@ -1327,20 +1371,60 @@ func makeAddReplicaFn(key string, current []roachpb.ReplicationTarget, atomicSwa
candidates = append(candidates, candidate)
}
candidate := candidates[rng.Intn(len(candidates))]
var addType, removeType roachpb.ReplicaChangeType
if voter {
addType, removeType = roachpb.ADD_VOTER, roachpb.REMOVE_VOTER
} else {
addType, removeType = roachpb.ADD_NON_VOTER, roachpb.REMOVE_NON_VOTER
}
changes := []kvpb.ReplicationChange{{
ChangeType: roachpb.ADD_VOTER,
ChangeType: addType,
Target: candidate,
}}
if atomicSwap {
changes = append(changes, kvpb.ReplicationChange{
ChangeType: roachpb.REMOVE_VOTER,
ChangeType: removeType,
Target: current[rng.Intn(len(current))],
})
}
return changeReplicas(key, changes...)
}
}

func makePromoteReplicaFn(key string, nonVoters []roachpb.ReplicationTarget) opGenFunc {
return func(g *generator, rng *rand.Rand) Operation {
target := nonVoters[rng.Intn(len(nonVoters))]
changes := []kvpb.ReplicationChange{
{
ChangeType: roachpb.REMOVE_NON_VOTER,
Target: target,
},
{
ChangeType: roachpb.ADD_VOTER,
Target: target,
},
}
return changeReplicas(key, changes...)
}
}

func makeDemoteReplicaFn(key string, voters []roachpb.ReplicationTarget) opGenFunc {
return func(g *generator, rng *rand.Rand) Operation {
target := voters[rng.Intn(len(voters))]
changes := []kvpb.ReplicationChange{
{
ChangeType: roachpb.REMOVE_VOTER,
Target: target,
},
{
ChangeType: roachpb.ADD_NON_VOTER,
Target: target,
},
}
return changeReplicas(key, changes...)
}
}

func makeTransferLeaseFn(key string, current []roachpb.ReplicationTarget) opGenFunc {
return func(g *generator, rng *rand.Rand) Operation {
target := current[rng.Intn(len(current))]
Expand Down
39 changes: 27 additions & 12 deletions pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func TestRandStep(t *testing.T) {

const minEachType = 5
config := newAllOperationsConfig()
config.NumNodes, config.NumReplicas = 2, 1
config.NumNodes, config.NumReplicas = 3, 2
rng, _ := randutil.NewTestRand()
getReplicasFn := func(_ roachpb.Key) []roachpb.ReplicationTarget {
return make([]roachpb.ReplicationTarget, rng.Intn(2)+1)
getReplicasFn := func(_ roachpb.Key) ([]roachpb.ReplicationTarget, []roachpb.ReplicationTarget) {
return make([]roachpb.ReplicationTarget, rng.Intn(config.NumNodes)+1),
make([]roachpb.ReplicationTarget, rng.Intn(config.NumNodes)+1)
}
g, err := MakeGenerator(config, getReplicasFn)
require.NoError(t, err)
Expand Down Expand Up @@ -339,21 +340,35 @@ func TestRandStep(t *testing.T) {
counts.Merge.MergeNotSplit++
}
case *ChangeReplicasOperation:
var adds, removes int
var voterAdds, voterRemoves, nonVoterAdds, nonVoterRemoves int
for _, change := range o.Changes {
switch change.ChangeType {
case roachpb.ADD_VOTER:
adds++
voterAdds++
case roachpb.REMOVE_VOTER:
removes++
voterRemoves++
case roachpb.ADD_NON_VOTER:
nonVoterAdds++
case roachpb.REMOVE_NON_VOTER:
nonVoterRemoves++
}
}
if adds == 1 && removes == 0 {
counts.ChangeReplicas.AddReplica++
} else if adds == 0 && removes == 1 {
counts.ChangeReplicas.RemoveReplica++
} else if adds == 1 && removes == 1 {
counts.ChangeReplicas.AtomicSwapReplica++
if voterAdds == 1 && voterRemoves == 0 && nonVoterRemoves == 0 {
counts.ChangeReplicas.AddVotingReplica++
} else if voterAdds == 0 && voterRemoves == 1 && nonVoterAdds == 0 {
counts.ChangeReplicas.RemoveVotingReplica++
} else if voterAdds == 1 && voterRemoves == 1 {
counts.ChangeReplicas.AtomicSwapVotingReplica++
} else if nonVoterAdds == 1 && nonVoterRemoves == 0 && voterRemoves == 0 {
counts.ChangeReplicas.AddNonVotingReplica++
} else if nonVoterAdds == 0 && nonVoterRemoves == 1 && voterAdds == 0 {
counts.ChangeReplicas.RemoveNonVotingReplica++
} else if nonVoterAdds == 1 && nonVoterRemoves == 1 {
counts.ChangeReplicas.AtomicSwapNonVotingReplica++
} else if voterAdds == 1 && nonVoterRemoves == 1 {
counts.ChangeReplicas.PromoteReplica++
} else if voterRemoves == 1 && nonVoterAdds == 1 {
counts.ChangeReplicas.DemoteReplica++
}
case *TransferLeaseOperation:
counts.ChangeLease.TransferLease++
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
// queuedLockingRequests is sorted in increasing order of sequence number.
break
}
if g.isSameTxn(qqg.guard.txnMeta()) {
if qqg.guard.txnMeta() != nil && g.isSameTxn(qqg.guard.txnMeta()) {
// A SKIP LOCKED request should not find another waiting request from its
// own transaction, at least not in the way that SQL uses KV. The only way
// we can end up finding another request in the lock's wait queue from our
Expand Down

0 comments on commit 9b10f08

Please sign in to comment.