Skip to content

Commit

Permalink
Merge pull request #88662 from aliher1911/backport22.1-84727
Browse files Browse the repository at this point in the history
  • Loading branch information
aliher1911 authored Oct 3, 2022
2 parents 8528e0b + 83122f9 commit edc5865
Show file tree
Hide file tree
Showing 7 changed files with 626 additions and 67 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,8 @@ func (ds *DistSender) sendToReplicas(
}

// Filter the replicas to only those that are relevant to the routing policy.
// NB: When changing leaseholder policy constraint_status_report should be
// updated appropriately.
var replicaFilter ReplicaSliceFilter
switch ba.RoutingPolicy {
case roachpb.RoutingPolicy_LEASEHOLDER:
Expand Down
232 changes: 195 additions & 37 deletions pkg/kv/kvserver/reports/constraint_stats_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ const (
// Constraint means that the entry refers to a constraint (i.e. a member of
// the constraints field in a zone config).
Constraint ConstraintType = "constraint"
// VoterConstraint means that the entry refers to a voter_constraint (i.e. a
// member of voter_constraint field in a zone config).
VoterConstraint ConstraintType = "voter_constraint"
// TODO(andrei): add leaseholder preference
)

Expand Down Expand Up @@ -109,7 +112,7 @@ func (k ConstraintStatusKey) Less(other ConstraintStatusKey) bool {
return true
}
if other.ViolationType.Less(k.ViolationType) {
return true
return false
}
return k.Constraint.Less(other.Constraint)
}
Expand Down Expand Up @@ -146,6 +149,9 @@ func (r ConstraintReport) ensureEntries(key ZoneKey, zone *zonepb.ZoneConfig) {
for _, conjunction := range zone.Constraints {
r.ensureEntry(key, Constraint, ConstraintRepr(conjunction.String()))
}
for _, conjunction := range zone.VoterConstraints {
r.ensureEntry(key, VoterConstraint, ConstraintRepr(conjunction.String()))
}
for i, sz := range zone.Subzones {
szKey := ZoneKey{ZoneID: key.ZoneID, SubzoneID: base.SubzoneIDFromIndex(i)}
r.ensureEntries(szKey, &sz.Config)
Expand Down Expand Up @@ -336,6 +342,8 @@ func (r *replicationConstraintStatsReportSaver) upsertConstraintStatus(
return nil
}

type replicaPredicate func(r roachpb.ReplicaDescriptor) bool

// constraintConformanceVisitor is a visitor that, when passed to visitRanges(),
// computes the constraint conformance report (i.e. the
// system.replication_constraint_stats table).
Expand All @@ -348,11 +356,11 @@ type constraintConformanceVisitor struct {
report ConstraintReport
visitErr bool

// prevZoneKey and prevConstraints maintain state from one range to the next.
// This state can be reused when a range is covered by the same zone config as
// the previous one. Reusing it speeds up the report generation.
prevZoneKey ZoneKey
prevConstraints []zonepb.ConstraintsConjunction
// Zone checker maintain a zone config state internally and can be reused when
// a range is covered by the same zone config as the previous one. Reusing it
// speeds up the report generation.
// It is recreated every time a range is processed with a different zone key.
zoneChecker constraintConformanceChecker
}

var _ rangeVisitor = &constraintConformanceVisitor{}
Expand Down Expand Up @@ -408,6 +416,59 @@ func (v *constraintConformanceVisitor) reset(ctx context.Context) {
}
}

// constraintCheckPolicy defines a set of predicates that define subsets of
// replicas that are checked against conjunctions.
// In order for constraint type to satisfy, each subset of replicas needs to
// satisfy provided constraint conjunction.
// For example voter constraint should match both subset of replicas
// representing outgoing and incoming consensus.
type constraintCheckPolicy struct {
predicates map[zonepb.Constraint_Type][]replicaPredicate
}

// getViolations finds constraint violations according to sets of predicates
// defined in policy.
func (p constraintCheckPolicy) getViolations(
r *roachpb.RangeDescriptor,
storeResolver StoreResolver,
conjunctions []zonepb.ConstraintsConjunction,
) (res []ConstraintRepr) {

checkConstraints := func(cj zonepb.ConstraintsConjunction) (bool, ConstraintRepr) {
for _, cc := range cj.Constraints {
t := cc.Type
if t == zonepb.Constraint_DEPRECATED_POSITIVE {
t = zonepb.Constraint_REQUIRED
}

// Check all store variants defined by policy.
for _, p := range p.predicates[t] {
rds := r.Replicas().FilterToDescriptors(p)
storeDescs := make([]roachpb.StoreDescriptor, len(rds))
for i, r := range rds {
storeDescs[i] = storeResolver(r.StoreID)
}
// Run each constraint against all store variants to find violations.
replicasRequiredToMatch := int(cj.NumReplicas)
if replicasRequiredToMatch == 0 {
replicasRequiredToMatch = len(storeDescs)
}
if !constraintSatisfied(cc, replicasRequiredToMatch, storeDescs) {
return true, ConstraintRepr(cj.String())
}
}
}
return false, ""
}

for _, cj := range conjunctions {
if ok, repr := checkConstraints(cj); ok {
res = append(res, repr)
}
}
return res
}

// visitNewZone is part of the rangeVisitor interface.
func (v *constraintConformanceVisitor) visitNewZone(
ctx context.Context, r *roachpb.RangeDescriptor,
Expand All @@ -418,68 +479,165 @@ func (v *constraintConformanceVisitor) visitNewZone(
}()

// Find the applicable constraints, which may be inherited.
var numVoters int
var constraints []zonepb.ConstraintsConjunction
var voterConstraints []zonepb.ConstraintsConjunction
var zKey ZoneKey
_, err := visitZones(ctx, r, v.cfg, ignoreSubzonePlaceholders,
func(_ context.Context, zone *zonepb.ZoneConfig, key ZoneKey) bool {
if zone.Constraints == nil {
return false
}
// Check num voters and only set it if it is different from num replicas.
var numReplicas int32
if zone.NumReplicas != nil {
numReplicas = *zone.NumReplicas
}
if zone.NumVoters != nil && numReplicas != *zone.NumVoters {
numVoters = int(*zone.NumVoters)
}
constraints = zone.Constraints
voterConstraints = zone.VoterConstraints
zKey = key
return true
})
if err != nil {
return errors.Wrap(err, "unexpected error visiting zones")
}
v.prevZoneKey = zKey
v.prevConstraints = constraints
v.countRange(ctx, r, zKey, constraints)
v.zoneChecker = constraintConformanceChecker{
zoneKey: zKey,
numVoters: numVoters,
constraints: constraints,
voterConstraints: voterConstraints,
storeResolver: v.storeResolver,
report: &v.report,
}
v.zoneChecker.checkZone(ctx, r)
return nil
}

// visitSameZone is part of the rangeVisitor interface.
func (v *constraintConformanceVisitor) visitSameZone(
ctx context.Context, r *roachpb.RangeDescriptor,
) {
v.countRange(ctx, r, v.prevZoneKey, v.prevConstraints)
v.zoneChecker.checkZone(ctx, r)
}

type constraintConformanceChecker struct {
zoneKey ZoneKey
numVoters int
constraints []zonepb.ConstraintsConjunction
voterConstraints []zonepb.ConstraintsConjunction

storeResolver StoreResolver
report *ConstraintReport
}

// visitSameZone is part of the rangeVisitor interface.
func (v *constraintConformanceChecker) checkZone(ctx context.Context, r *roachpb.RangeDescriptor) {
// replicaConstraintsAllVoters are applied to replica constraints when number
// of voters are not specified e.g. equal to number of replicas implicitly or
// equal to number of voters explicitly.
var replicaConstraintsAllVoters = constraintCheckPolicy{
predicates: map[zonepb.Constraint_Type][]replicaPredicate{
zonepb.Constraint_REQUIRED: {
isInIncomingQuorumOrNonVoter, isInOutgoingQuorumOrNonVoter,
},
zonepb.Constraint_PROHIBITED: {
isAny,
},
},
}

// replicaConstraintsWithNonVoters are applied to replica constraints when
// number of voters are explicitly specified and is below total number of
// replicas.
var replicaConstraintsWithNonVoters = constraintCheckPolicy{
predicates: map[zonepb.Constraint_Type][]replicaPredicate{
// Note that required predicate are replicas that we can route reads to
// hence LEARNER and VOTER_DEMOTING and VOTER_OUTGOING are excluded. Even
// if DEMOTING/OUTGOING replicas can still serve reads we don't route to
// them and they should disappear momentarily, so we keep constraint
// checks in sync with routing logic in DistSender.
zonepb.Constraint_REQUIRED: {
isInIncomingQuorumOrNonVoter,
},
zonepb.Constraint_PROHIBITED: {
isAny,
},
},
}

// voterConstraints are applied when voter constraints are explicitly specified
// in zone config to voter constraints.
var voterConstraints = constraintCheckPolicy{
predicates: map[zonepb.Constraint_Type][]replicaPredicate{
zonepb.Constraint_REQUIRED: {
isInIncomingQuorum, isInOutgoingQuorum,
},
zonepb.Constraint_PROHIBITED: {
isVoter,
},
},
}

if v.numVoters != 0 {
v.countRange(ctx, r, v.zoneKey, Constraint, replicaConstraintsWithNonVoters,
v.constraints)
} else {
v.countRange(ctx, r, v.zoneKey, Constraint, replicaConstraintsAllVoters, v.constraints)
}
v.countRange(ctx, r, v.zoneKey, VoterConstraint, voterConstraints, v.voterConstraints)
}

func (v *constraintConformanceVisitor) countRange(
func (v *constraintConformanceChecker) countRange(
ctx context.Context,
r *roachpb.RangeDescriptor,
key ZoneKey,
t ConstraintType,
policy constraintCheckPolicy,
constraints []zonepb.ConstraintsConjunction,
) {
storeDescs := v.storeResolver(r)
violated := getViolations(ctx, storeDescs, constraints)
for _, c := range violated {
v.report.AddViolation(key, Constraint, c)
for _, violation := range policy.getViolations(r, v.storeResolver, constraints) {
v.report.AddViolation(key, t, violation)
}
}

// getViolations returns the list of constraints violated by a range. The range
// is represented by the descriptors of the replicas' stores.
func getViolations(
ctx context.Context,
storeDescs []roachpb.StoreDescriptor,
constraintConjunctions []zonepb.ConstraintsConjunction,
) []ConstraintRepr {
var res []ConstraintRepr
// Evaluate all zone constraints for the stores (i.e. replicas) of the given range.
for _, conjunction := range constraintConjunctions {
replicasRequiredToMatch := int(conjunction.NumReplicas)
if replicasRequiredToMatch == 0 {
replicasRequiredToMatch = len(storeDescs)
}
for _, c := range conjunction.Constraints {
if !constraintSatisfied(c, replicasRequiredToMatch, storeDescs) {
res = append(res, ConstraintRepr(conjunction.String()))
break
}
}
func isAny(_ roachpb.ReplicaDescriptor) bool {
return true
}

func isInIncomingQuorumOrNonVoter(r roachpb.ReplicaDescriptor) bool {
return isInIncomingQuorum(r) || isNonVoter(r)
}

func isInOutgoingQuorumOrNonVoter(r roachpb.ReplicaDescriptor) bool {
return isInOutgoingQuorum(r) || isNonVoter(r)
}

func isNonVoter(r roachpb.ReplicaDescriptor) bool {
return r.GetType() == roachpb.NON_VOTER
}

func isVoter(r roachpb.ReplicaDescriptor) bool {
return isInOutgoingQuorum(r) || isInIncomingQuorum(r)
}

func isInIncomingQuorum(r roachpb.ReplicaDescriptor) bool {
switch r.GetType() {
case roachpb.VOTER_FULL, roachpb.VOTER_INCOMING:
return true
default:
return false
}
}

func isInOutgoingQuorum(r roachpb.ReplicaDescriptor) bool {
switch r.GetType() {
case roachpb.VOTER_FULL, roachpb.VOTER_OUTGOING, roachpb.VOTER_DEMOTING_NON_VOTER, roachpb.VOTER_DEMOTING_LEARNER:
return true
default:
return false
}
return res
}

// constraintSatisfied checks that a range (represented by its replicas' stores)
Expand Down
Loading

0 comments on commit edc5865

Please sign in to comment.