Skip to content

Commit

Permalink
kvserver: support checking allocator action and target by range
Browse files Browse the repository at this point in the history
This change exposes support via a store for checking the allocator
action and upreplication target (if applicable) for any range descriptor.
The range does not need to have a replica on the given store, nor is it
required to evaluate given the current state of the cluster (i.e. the
store's configured StorePool), as a store pool override can be
provided in order to evaluate possible future states.

Depends on cockroachdb#94114.

Part of cockroachdb#91570.

Release note: None
  • Loading branch information
AlexTalks committed Jan 10, 2023
1 parent b5cffc0 commit b89e59d
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 7 deletions.
29 changes: 28 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ func (ae *allocatorError) Error() string {
return b.String()
}

func (*allocatorError) PurgatoryErrorMarker() {}
func (*allocatorError) AllocationErrorMarker() {}
func (*allocatorError) PurgatoryErrorMarker() {}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
Expand Down Expand Up @@ -815,6 +816,32 @@ func getRemoveIdx(
return removeIdx
}

// FilterReplicasForAction converts a range descriptor to the filtered
// voter and non-voter replicas needed to allocate a target for the given action.
// NB: This is a convenience method for callers of allocator.AllocateTarget(..).
func FilterReplicasForAction(
storePool storepool.AllocatorStorePool, desc *roachpb.RangeDescriptor, action AllocatorAction,
) (
filteredVoters, filteredNonVoters []roachpb.ReplicaDescriptor,
isReplacement, nothingToDo bool,
err error,
) {
voterReplicas, nonVoterReplicas,
liveVoterReplicas, deadVoterReplicas,
liveNonVoterReplicas, deadNonVoterReplicas := LiveAndDeadVoterAndNonVoterReplicas(storePool, desc)

removeIdx := -1
_, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter(
storePool,
action,
voterReplicas, nonVoterReplicas,
liveVoterReplicas, deadVoterReplicas,
liveNonVoterReplicas, deadNonVoterReplicas,
)

return filteredVoters, filteredNonVoters, removeIdx >= 0, nothingToDo, err
}

// ComputeAction determines the exact operation needed to repair the
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ const (
defaultLoadBasedRebalancingInterval = time.Minute
)

// AllocationError is a simple interface used to indicate a replica processing
// error originating from the allocator.
type AllocationError interface {
error
AllocationErrorMarker() // dummy method for unique interface
}

// MaxCapacityCheck returns true if the store has room for a new replica.
func MaxCapacityCheck(store roachpb.StoreDescriptor) bool {
return store.Capacity.FractionUsed() < MaxFractionUsedThreshold
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/override_store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package storepool

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -41,6 +43,24 @@ type OverrideStorePool struct {

var _ AllocatorStorePool = &OverrideStorePool{}

// OverrideNodeLivenessFunc constructs a NodeLivenessFunc based on a set of
// predefined overrides. If any nodeID does not have an override, the liveness
// status is looked up using the passed-in real node liveness function.
func OverrideNodeLivenessFunc(
overrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus, realNodeLivenessFunc NodeLivenessFunc,
) NodeLivenessFunc {
return func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if override, ok := overrides[nid]; ok {
return override
}

return realNodeLivenessFunc(nid, now, timeUntilStoreDead)
}
}

// NewOverrideStorePool constructs an OverrideStorePool that can use its own
// view of node liveness while falling through to an underlying store pool for
// the state of peer stores.
func NewOverrideStorePool(storePool *StorePool, nl NodeLivenessFunc) *OverrideStorePool {
return &OverrideStorePool{
sp: storePool,
Expand Down
42 changes: 42 additions & 0 deletions pkg/kv/kvserver/allocator_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,48 @@ var singleStore = []*roachpb.StoreDescriptor{
},
}

var threeStores = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 1,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 2,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 2,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 3,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 3,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
}

// TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance
// to a target that we'll immediately remove.
func TestAllocatorRebalanceTarget(t *testing.T) {
Expand Down
106 changes: 102 additions & 4 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3451,11 +3451,16 @@ func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeyS
return result, err
}

// AllocatorDryRun runs the given replica through the allocator without actually
// carrying out any changes, returning all trace messages collected along the way.
// ReplicateQueueDryRun runs the given replica through the replicate queue
// (using the allocator) without actually carrying out any changes, returning
// all trace messages collected along the way.
// Intended to help power a debug endpoint.
func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.Recording, error) {
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator dry run")
func (s *Store) ReplicateQueueDryRun(
ctx context.Context, repl *Replica,
) (tracingpb.Recording, error) {
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx,
s.cfg.AmbientCtx.Tracer, "replicate queue dry run",
)
defer collectAndFinish()
canTransferLease := func(ctx context.Context, repl *Replica) bool { return true }
_, err := s.replicateQueue.processOneChange(
Expand All @@ -3467,6 +3472,99 @@ func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.R
return collectAndFinish(), nil
}

// AllocatorCheckRange takes a range descriptor and a node liveness override (or
// nil, to use the configured StorePool's), looks up the configuration of
// range, and utilizes the allocator to get the action needed to repair the
// range, as well as any upreplication target if needed, returning along with
// any encountered errors as well as the collected tracing spans.
//
// This functionality is similar to ReplicateQueueDryRun, but operates on the
// basis of a range, evaluating the action and target determined by the allocator.
// The range does not need to have a replica on the store in order to check the
// needed allocator action and target. The store pool, if provided, will be
// used, otherwise it will fall back to the store's configured store pool.
//
// Assuming the span config is available, a valid allocator action should
// always be returned, even in case of errors.
//
// NB: In the case of removal or rebalance actions, a target cannot be
// evaluated, as a leaseholder is required for evaluation.
func (s *Store) AllocatorCheckRange(
ctx context.Context,
desc *roachpb.RangeDescriptor,
overrideStorePool storepool.AllocatorStorePool,
) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, tracingpb.Recording, error) {
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx,
s.cfg.AmbientCtx.Tracer, "allocator check range",
)
defer collectAndFinish()

confReader, err := s.GetConfReader(ctx)
if err == nil {
err = s.WaitForSpanConfigSubscription(ctx)
}
if err != nil {
log.Eventf(ctx, "span configs unavailable: %s", err)
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

conf, err := confReader.GetSpanConfigForKey(ctx, desc.StartKey)
if err != nil {
log.Eventf(ctx, "error retrieving span config for range %s: %s", desc, err)
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

// If a store pool was provided, use that, otherwise use the store's
// configured store pool.
var storePool storepool.AllocatorStorePool
if overrideStorePool != nil {
storePool = overrideStorePool
} else if s.cfg.StorePool != nil {
storePool = s.cfg.StorePool
}

action, _ := s.allocator.ComputeAction(ctx, storePool, conf, desc)

// In the case that the action does not require a target, return immediately.
if !(action.Add() || action.Replace()) {
return action, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

liveVoters, liveNonVoters, isReplacement, nothingToDo, err :=
allocatorimpl.FilterReplicasForAction(storePool, desc, action)

if nothingToDo || err != nil {
return action, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

target, _, err := s.allocator.AllocateTarget(ctx, storePool, conf,
liveVoters, liveNonVoters, action.ReplicaStatus(), action.TargetReplicaType(),
)
if err == nil {
log.Eventf(ctx, "found valid allocation of %s target %v", action.TargetReplicaType(), target)

// Ensure that if we are upreplicating, we are avoiding a state in which we
// have a fragile quorum that we cannot avoid by allocating more voters.
fragileQuorumErr := s.allocator.CheckAvoidsFragileQuorum(
ctx,
storePool,
conf,
desc.Replicas().VoterDescriptors(),
liveNonVoters,
action.ReplicaStatus(),
action.TargetReplicaType(),
target,
isReplacement,
)

if fragileQuorumErr != nil {
err = errors.Wrap(fragileQuorumErr, "avoid up-replicating to fragile quorum")
}
}

return action, target, collectAndFinish(), err
}

// Enqueue runs the given replica through the requested queue. If `async` is
// specified, the replica is enqueued into the requested queue for asynchronous
// processing and this method returns nothing. Otherwise, it returns all trace
Expand Down
Loading

0 comments on commit b89e59d

Please sign in to comment.