Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: transfer lease when acquiring lease outside preferences #106079

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2827,7 +2827,7 @@ func (a Allocator) PreferredLeaseholders(
if !ok {
continue
}
if constraint.ConjunctionsCheck(storeDesc, preference.Constraints) {
if constraint.CheckStoreConjunction(storeDesc, preference.Constraints) {
preferred = append(preferred, repl)
}
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2010,7 +2010,7 @@ func allocateConstraintsCheck(
}

for i, constraints := range analyzed.Constraints {
if constraintsOK := constraint.ConjunctionsCheck(
if constraintsOK := constraint.CheckStoreConjunction(
store, constraints.Constraints,
); constraintsOK {
valid = true
Expand Down Expand Up @@ -2050,9 +2050,7 @@ func replaceConstraintsCheck(
for i, constraints := range analyzed.Constraints {
matchingStores := analyzed.SatisfiedBy[i]
satisfiedByExistingStore := containsStore(matchingStores, existingStore.StoreID)
satisfiedByCandidateStore := constraint.ConjunctionsCheck(
store, constraints.Constraints,
)
satisfiedByCandidateStore := constraint.CheckStoreConjunction(store, constraints.Constraints)
if satisfiedByCandidateStore {
valid = true
}
Expand Down Expand Up @@ -2147,7 +2145,7 @@ func rebalanceFromConstraintsCheck(
// satisfied by existing replicas or that is only fully satisfied because of
// fromStoreID, then it's necessary.
for i, constraints := range analyzed.Constraints {
if constraintsOK := constraint.ConjunctionsCheck(
if constraintsOK := constraint.CheckStoreConjunction(
store, constraints.Constraints,
); constraintsOK {
valid = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func IsStoreValid(
return true
}
for _, subConstraints := range constraints {
if constraintsOK := constraint.ConjunctionsCheck(
if constraintsOK := constraint.CheckStoreConjunction(
store, subConstraints.Constraints,
); constraintsOK {
return true
Expand Down
47 changes: 41 additions & 6 deletions pkg/kv/kvserver/allocator/plan/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (rp ReplicaPlanner) considerRebalance(
if !canTransferLeaseFrom(ctx, repl) {
return nil, stats, nil
}
return rp.shedLeaseTarget(
op, err := rp.shedLeaseTarget(
ctx,
repl,
desc,
Expand All @@ -878,8 +878,11 @@ func (rp ReplicaPlanner) considerRebalance(
ExcludeLeaseRepl: false,
CheckCandidateFullness: true,
},
), stats, nil

)
if err != nil {
return nil, stats, err
}
return op, stats, nil
}

// If we have a valid rebalance action (ok == true) and we haven't
Expand Down Expand Up @@ -925,7 +928,7 @@ func (rp ReplicaPlanner) shedLeaseTarget(
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
opts allocator.TransferLeaseOptions,
) (op AllocationOp) {
) (op AllocationOp, _ error) {
usage := repl.RangeUsageInfo()
// Learner replicas aren't allowed to become the leaseholder or raft leader,
// so only consider the `VoterDescriptors` replicas.
Expand All @@ -940,7 +943,16 @@ func (rp ReplicaPlanner) shedLeaseTarget(
opts,
)
if target == (roachpb.ReplicaDescriptor{}) {
return nil
// If we don't find a suitable target, but we own a lease violating the
// lease preferences, return an error to place the replica in purgatory and
// retry sooner. This typically happens when we've just acquired a violating
// lease and we eagerly enqueue the replica before we've received Raft
// leadership, which prevents us from finding appropriate lease targets
// since we can't determine if any are behind.
if repl.LeaseViolatesPreferences(ctx) {
return nil, CantTransferLeaseViolatingPreferencesError{RangeID: desc.RangeID}
}
return nil, nil
}

op = AllocationTransferLeaseOp{
Expand All @@ -949,7 +961,7 @@ func (rp ReplicaPlanner) shedLeaseTarget(
Usage: usage,
bypassSafetyChecks: false,
}
return op
return op, nil
}

// maybeTransferLeaseAwayTarget is called whenever a replica on a given store
Expand Down Expand Up @@ -1015,3 +1027,26 @@ func (rp ReplicaPlanner) maybeTransferLeaseAwayTarget(

return op, nil
}

// CantTransferLeaseViolatingPreferencesError is an error returned when a lease
// violates the lease preferences, but we couldn't find a valid target to
// transfer the lease to. It indicates that the replica should be sent to
// purgatory, to retry the transfer faster.
type CantTransferLeaseViolatingPreferencesError struct {
RangeID roachpb.RangeID
}

var _ errors.SafeFormatter = CantTransferLeaseViolatingPreferencesError{}

func (e CantTransferLeaseViolatingPreferencesError) Error() string { return fmt.Sprint(e) }

func (e CantTransferLeaseViolatingPreferencesError) Format(s fmt.State, verb rune) {
errors.FormatError(e, s, verb)
}

func (e CantTransferLeaseViolatingPreferencesError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("can't transfer r%d lease violating preferences, no suitable target", e.RangeID)
return nil
}

func (CantTransferLeaseViolatingPreferencesError) PurgatoryErrorMarker() {}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/asim/queue/allocator_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sr *SimulatorReplica) LeaseViolatesPreferences(context.Context) bool {
return false
}
for _, preference := range conf.LeasePreferences {
if constraint.ConjunctionsCheck(storeDesc, preference.Constraints) {
if constraint.CheckStoreConjunction(storeDesc, preference.Constraints) {
return false
}
}
Expand Down
33 changes: 23 additions & 10 deletions pkg/kv/kvserver/constraint/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func AnalyzeConstraints(
// is a much more stable failure state than frantically moving everything
// off such a node.
store, ok := storeResolver.GetStoreDescriptor(repl.StoreID)
if !ok || ConjunctionsCheck(store, subConstraints.Constraints) {
if !ok || CheckStoreConjunction(store, subConstraints.Constraints) {
result.SatisfiedBy[i] = append(result.SatisfiedBy[i], store.StoreID)
result.Satisfies[store.StoreID] = append(result.Satisfies[store.StoreID], i)
}
Expand All @@ -80,18 +80,31 @@ func AnalyzeConstraints(
return result
}

// ConjunctionsCheck checks a store against a single set of constraints (out of
// the possibly numerous sets that apply to a range), returning true iff the
// store matches the constraints. The constraints are AND'ed together; a store
// matches the conjunction if it matches all of them.
func ConjunctionsCheck(store roachpb.StoreDescriptor, constraints []roachpb.Constraint) bool {
// CheckConjunction checks the given attributes and locality tags against all
// the given constraints. Every constraint must be satisfied by any
// attribute/tier, i.e. they are ANDed together.
func CheckConjunction(
storeAttrs, nodeAttrs roachpb.Attributes,
nodeLocality roachpb.Locality,
constraints []roachpb.Constraint,
) bool {
for _, constraint := range constraints {
// StoreMatchesConstraint returns whether a store matches the given constraint.
hasConstraint := roachpb.StoreMatchesConstraint(store, constraint)
if (constraint.Type == roachpb.Constraint_REQUIRED && !hasConstraint) ||
(constraint.Type == roachpb.Constraint_PROHIBITED && hasConstraint) {
matchesConstraint := roachpb.MatchesConstraint(storeAttrs, nodeAttrs, nodeLocality, constraint)
if (constraint.Type == roachpb.Constraint_REQUIRED && !matchesConstraint) ||
(constraint.Type == roachpb.Constraint_PROHIBITED && matchesConstraint) {
return false
}
}
return true
}

// CheckStoreConjunction checks a store against a single set of constraints (out of
// the possibly numerous sets that apply to a range), returning true iff the
// store matches the constraints. The constraints are AND'ed together; a store
// matches the conjunction if it matches all of them.
func CheckStoreConjunction(
storeDesc roachpb.StoreDescriptor, constraints []roachpb.Constraint,
) bool {
return CheckConjunction(
storeDesc.Attrs, storeDesc.Node.Attrs, storeDesc.Node.Locality, constraints)
}
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,19 @@ func (r *Replica) leasePostApplyLocked(
})
}

// If we acquired a new lease, and it violates the lease preferences, enqueue
// it in the replicate queue.
if leaseChangingHands && iAmTheLeaseHolder {
violatesPreferences := leaseViolatesPreferences(st, r.store.StoreID(), r.store.Attrs(),
r.store.nodeDesc.Attrs, r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences)
if violatesPreferences {
log.VEventf(ctx, 2,
"acquired lease violates lease preferences, enqueueing for transfer [lease=%s preferences=%s]",
newLease, r.mu.conf.LeasePreferences)
r.store.replicateQueue.AddAsync(ctx, r, replicateQueuePriorityHigh)
}
}

// Inform the store of this lease.
if iAmTheLeaseHolder {
r.store.registerLeaseholder(ctx, r, newLease.Sequence)
Expand Down
41 changes: 29 additions & 12 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,25 +1540,42 @@ func (r *Replica) hasCorrectLeaseTypeRLocked(lease roachpb.Lease) bool {
return hasExpirationLease == r.shouldUseExpirationLeaseRLocked()
}

// LeaseViolatesPreferences checks if current replica owns the lease and if it
// violates the lease preferences defined in the span config. If there is an
// error or no preferences defined then it will return false and consider that
// to be in-conformance.
// LeaseViolatesPreferences checks if this replica owns the lease and if it
// violates the lease preferences defined in the span config. If no preferences
// are defined then it will return false and consider it to be in conformance.
func (r *Replica) LeaseViolatesPreferences(ctx context.Context) bool {
storeDesc, err := r.store.Descriptor(ctx, true /* useCached */)
if err != nil {
log.Infof(ctx, "Unable to load the descriptor %v: cannot check if lease violates preference", err)
storeID := r.store.StoreID()
storeAttrs := r.store.Attrs()
nodeAttrs := r.store.nodeDesc.Attrs
nodeLocality := r.store.nodeDesc.Locality
now := r.Clock().NowAsClockTimestamp()
r.mu.RLock()
leaseStatus := r.leaseStatusAtRLocked(ctx, now)
preferences := r.mu.conf.LeasePreferences
r.mu.RUnlock()
return leaseViolatesPreferences(
leaseStatus, storeID, storeAttrs, nodeAttrs, nodeLocality, preferences)
}

// leaseViolatesPreferences returns true if the given lease is valid, owned by
// the given store, and does not satisfy the lease preferences.
func leaseViolatesPreferences(
leaseStatus kvserverpb.LeaseStatus,
storeID roachpb.StoreID,
storeAttrs, nodeAttrs roachpb.Attributes,
nodeLocality roachpb.Locality,
preferences []roachpb.LeasePreference,
) bool {
if !leaseStatus.IsValid() || !leaseStatus.Lease.OwnedBy(storeID) {
return false
}
conf := r.SpanConfig()
if len(conf.LeasePreferences) == 0 {
if len(preferences) == 0 {
return false
}
for _, preference := range conf.LeasePreferences {
if constraint.ConjunctionsCheck(*storeDesc, preference.Constraints) {
for _, preference := range preferences {
if constraint.CheckConjunction(storeAttrs, nodeAttrs, nodeLocality, preference.Constraints) {
return false
}
}
// We have at lease one preference set up, but we don't satisfy any.
return true
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const (
// replicateQueueTimerDuration is the duration between replication of queued
// replicas.
replicateQueueTimerDuration = 0 // zero duration to process replication greedily

replicateQueuePriorityHigh = 1000 // see AllocatorAction.Priority
)

// MinLeaseTransferInterval controls how frequently leases can be transferred
Expand Down
12 changes: 6 additions & 6 deletions pkg/roachpb/span_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"github.com/cockroachdb/errors"
)

// StoreMatchesConstraint returns whether a store's attributes or node's
// locality match the constraint's spec. It notably ignores whether the
// constraint is required, prohibited, positive, or otherwise.
func StoreMatchesConstraint(store StoreDescriptor, c Constraint) bool {
// MatchesConstraint return whether the given attributes and locality tags match
// the constraint's spec. It ignores whether the constraint is required,
// prohibited, positive, or otherwise.
func MatchesConstraint(storeAttrs, nodeAttrs Attributes, nodeLocality Locality, c Constraint) bool {
if c.Key == "" {
for _, attrs := range []Attributes{store.Attrs, store.Node.Attrs} {
for _, attrs := range []Attributes{storeAttrs, nodeAttrs} {
for _, attr := range attrs.Attrs {
if attr == c.Value {
return true
Expand All @@ -32,7 +32,7 @@ func StoreMatchesConstraint(store StoreDescriptor, c Constraint) bool {
}
return false
}
for _, tier := range store.Node.Locality.Tiers {
for _, tier := range nodeLocality.Tiers {
if c.Key == tier.Key && c.Value == tier.Value {
return true
}
Expand Down