Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: support checking allocator action and target by range #94024

Merged
merged 1 commit into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ func (ae *allocatorError) Error() string {
return b.String()
}

func (*allocatorError) PurgatoryErrorMarker() {}
func (*allocatorError) AllocationErrorMarker() {}
func (*allocatorError) PurgatoryErrorMarker() {}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
Expand Down Expand Up @@ -815,6 +816,32 @@ func getRemoveIdx(
return removeIdx
}

// FilterReplicasForAction converts a range descriptor to the filtered
// voter and non-voter replicas needed to allocate a target for the given action.
// NB: This is a convenience method for callers of allocator.AllocateTarget(..).
func FilterReplicasForAction(
storePool storepool.AllocatorStorePool, desc *roachpb.RangeDescriptor, action AllocatorAction,
) (
filteredVoters, filteredNonVoters []roachpb.ReplicaDescriptor,
isReplacement, nothingToDo bool,
err error,
) {
voterReplicas, nonVoterReplicas,
liveVoterReplicas, deadVoterReplicas,
liveNonVoterReplicas, deadNonVoterReplicas := LiveAndDeadVoterAndNonVoterReplicas(storePool, desc)

removeIdx := -1
_, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter(
storePool,
action,
voterReplicas, nonVoterReplicas,
liveVoterReplicas, deadVoterReplicas,
liveNonVoterReplicas, deadNonVoterReplicas,
)

return filteredVoters, filteredNonVoters, removeIdx >= 0, nothingToDo, err
}

// ComputeAction determines the exact operation needed to repair the
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ const (
defaultLoadBasedRebalancingInterval = time.Minute
)

// AllocationError is a simple interface used to indicate a replica processing
// error originating from the allocator.
type AllocationError interface {
error
AllocationErrorMarker() // dummy method for unique interface
}

// MaxCapacityCheck returns true if the store has room for a new replica.
func MaxCapacityCheck(store roachpb.StoreDescriptor) bool {
return store.Capacity.FractionUsed() < MaxFractionUsedThreshold
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/override_store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package storepool

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -41,6 +43,24 @@ type OverrideStorePool struct {

var _ AllocatorStorePool = &OverrideStorePool{}

// OverrideNodeLivenessFunc constructs a NodeLivenessFunc based on a set of
// predefined overrides. If any nodeID does not have an override, the liveness
// status is looked up using the passed-in real node liveness function.
func OverrideNodeLivenessFunc(
overrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus, realNodeLivenessFunc NodeLivenessFunc,
) NodeLivenessFunc {
return func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if override, ok := overrides[nid]; ok {
return override
}

return realNodeLivenessFunc(nid, now, timeUntilStoreDead)
}
}

// NewOverrideStorePool constructs an OverrideStorePool that can use its own
// view of node liveness while falling through to an underlying store pool for
// the state of peer stores.
func NewOverrideStorePool(storePool *StorePool, nl NodeLivenessFunc) *OverrideStorePool {
return &OverrideStorePool{
sp: storePool,
Expand Down
42 changes: 42 additions & 0 deletions pkg/kv/kvserver/allocator_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,48 @@ var singleStore = []*roachpb.StoreDescriptor{
},
}

var threeStores = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 1,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 2,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 2,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 3,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 3,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
}

// TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance
// to a target that we'll immediately remove.
func TestAllocatorRebalanceTarget(t *testing.T) {
Expand Down
106 changes: 102 additions & 4 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3451,11 +3451,16 @@ func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeyS
return result, err
}

// AllocatorDryRun runs the given replica through the allocator without actually
// carrying out any changes, returning all trace messages collected along the way.
// ReplicateQueueDryRun runs the given replica through the replicate queue
// (using the allocator) without actually carrying out any changes, returning
// all trace messages collected along the way.
// Intended to help power a debug endpoint.
func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.Recording, error) {
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator dry run")
func (s *Store) ReplicateQueueDryRun(
ctx context.Context, repl *Replica,
) (tracingpb.Recording, error) {
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx,
s.cfg.AmbientCtx.Tracer, "replicate queue dry run",
)
defer collectAndFinish()
canTransferLease := func(ctx context.Context, repl *Replica) bool { return true }
_, err := s.replicateQueue.processOneChange(
Expand All @@ -3467,6 +3472,99 @@ func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.R
return collectAndFinish(), nil
}

// AllocatorCheckRange takes a range descriptor and a node liveness override (or
// nil, to use the configured StorePool's), looks up the configuration of
// range, and utilizes the allocator to get the action needed to repair the
// range, as well as any upreplication target if needed, returning along with
// any encountered errors as well as the collected tracing spans.
//
// This functionality is similar to ReplicateQueueDryRun, but operates on the
// basis of a range, evaluating the action and target determined by the allocator.
// The range does not need to have a replica on the store in order to check the
// needed allocator action and target. The store pool, if provided, will be
// used, otherwise it will fall back to the store's configured store pool.
//
// Assuming the span config is available, a valid allocator action should
// always be returned, even in case of errors.
//
// NB: In the case of removal or rebalance actions, a target cannot be
// evaluated, as a leaseholder is required for evaluation.
func (s *Store) AllocatorCheckRange(
ctx context.Context,
desc *roachpb.RangeDescriptor,
overrideStorePool storepool.AllocatorStorePool,
) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, tracingpb.Recording, error) {
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx,
s.cfg.AmbientCtx.Tracer, "allocator check range",
)
defer collectAndFinish()

confReader, err := s.GetConfReader(ctx)
if err == nil {
err = s.WaitForSpanConfigSubscription(ctx)
}
if err != nil {
log.Eventf(ctx, "span configs unavailable: %s", err)
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

conf, err := confReader.GetSpanConfigForKey(ctx, desc.StartKey)
if err != nil {
log.Eventf(ctx, "error retrieving span config for range %s: %s", desc, err)
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

// If a store pool was provided, use that, otherwise use the store's
// configured store pool.
var storePool storepool.AllocatorStorePool
if overrideStorePool != nil {
storePool = overrideStorePool
} else if s.cfg.StorePool != nil {
storePool = s.cfg.StorePool
}

action, _ := s.allocator.ComputeAction(ctx, storePool, conf, desc)

// In the case that the action does not require a target, return immediately.
if !(action.Add() || action.Replace()) {
return action, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

liveVoters, liveNonVoters, isReplacement, nothingToDo, err :=
allocatorimpl.FilterReplicasForAction(storePool, desc, action)

if nothingToDo || err != nil {
return action, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

target, _, err := s.allocator.AllocateTarget(ctx, storePool, conf,
liveVoters, liveNonVoters, action.ReplicaStatus(), action.TargetReplicaType(),
)
if err == nil {
log.Eventf(ctx, "found valid allocation of %s target %v", action.TargetReplicaType(), target)

// Ensure that if we are upreplicating, we are avoiding a state in which we
// have a fragile quorum that we cannot avoid by allocating more voters.
fragileQuorumErr := s.allocator.CheckAvoidsFragileQuorum(
ctx,
storePool,
conf,
desc.Replicas().VoterDescriptors(),
liveNonVoters,
action.ReplicaStatus(),
action.TargetReplicaType(),
target,
isReplacement,
)

if fragileQuorumErr != nil {
err = errors.Wrap(fragileQuorumErr, "avoid up-replicating to fragile quorum")
}
}

return action, target, collectAndFinish(), err
}

// Enqueue runs the given replica through the requested queue. If `async` is
// specified, the replica is enqueued into the requested queue for asynchronous
// processing and this method returns nothing. Otherwise, it returns all trace
Expand Down
Loading