Skip to content

Commit

Permalink
kvserver: add support for allocator range check via store
Browse files Browse the repository at this point in the history
This change exposes support via a store for checking the allocator
action and upreplication target (if applicable) for any range descriptor.
The range does not need to have a replica on the given store, nor is it
required to evaluate given the current state of the cluster (i.e. the
store's configured `StorePool`), as a node liveness override can be
provided in order to evaluate possible future states.

Depends on cockroachdb#92176.

Part of cockroachdb#91570.

Release note: None
  • Loading branch information
AlexTalks committed Dec 17, 2022
1 parent 46f13fb commit 2ebb088
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func (ae *allocatorError) Error() string {
return b.String()
}

func (*allocatorError) PurgatoryErrorMarker() {}
func (*allocatorError) AllocationErrorMarker() {}
func (*allocatorError) PurgatoryErrorMarker() {}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const (
defaultLoadBasedRebalancingInterval = time.Minute
)

// AllocationError is a simple interface used to indicate a replica processing
// error originating from the allocator.
type AllocationError interface {
error
AllocationErrorMarker() // dummy method for unique interface
}

// MaxCapacityCheck returns true if the store has room for a new replica.
func MaxCapacityCheck(store roachpb.StoreDescriptor) bool {
return store.Capacity.FractionUsed() < MaxFractionUsedThreshold
Expand Down
42 changes: 42 additions & 0 deletions pkg/kv/kvserver/allocator_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,48 @@ var singleStore = []*roachpb.StoreDescriptor{
},
}

var threeStores = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 1,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 2,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 2,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 3,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 3,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
}

var twoDCStores = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Expand Down
63 changes: 63 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3650,6 +3650,69 @@ func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.R
return collectAndFinish(), nil
}

// AllocatorCheckRange takes a range descriptor and a node liveness override (or
// nil, to use the configured StorePool's), looks up the configuration of
// range, and utilizes the allocator to get the action needed to repair the
// range, as well as any upreplication target if needed, returning along with
// any encountered errors as well as the collected tracing spans.
//
// This functionality is similar to AllocatorDryRun, but operates on the basis
// of a range, evaluating the action and target determined by the allocator.
// The range does not need to have a replica on the store in order to check the
// needed allocator action and target. The liveness override function, if
// provided, may return UNKNOWN to fall back to the actual node liveness.
//
// Assuming the span config is available, a valid allocator action should
// always be returned, even in case of errors.
//
// NB: In the case of removal or rebalance actions, a target cannot be
// evaluated, as a leaseholder is required for evaluation.
func (s *Store) AllocatorCheckRange(
ctx context.Context,
desc *roachpb.RangeDescriptor,
nodeLivenessOverride storepool.NodeLivenessFunc,
) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, tracingpb.Recording, error) {
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator check range")
defer collectAndFinish()

confReader, err := s.GetConfReader(ctx)
if err == nil {
err = s.WaitForSpanConfigSubscription(ctx)
}
if err != nil {
log.Eventf(ctx, "span configs unavailable: %s", err)
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

conf, err := confReader.GetSpanConfigForKey(ctx, desc.StartKey)
if err != nil {
log.Eventf(ctx, "error retrieving span config for range %s: %s", desc, err)
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err
}

var storePool storepool.AllocatorStorePool
if nodeLivenessOverride != nil {
internalNodeLivenessFn := func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
status := nodeLivenessOverride(nid, now, timeUntilStoreDead)
if status == livenesspb.NodeLivenessStatus_UNKNOWN {
return s.cfg.StorePool.NodeLivenessFn(nid, now, timeUntilStoreDead)
}

return status
}
storePool = storepool.NewOverrideStorePool(s.cfg.StorePool, internalNodeLivenessFn)
} else if s.cfg.StorePool != nil {
storePool = s.cfg.StorePool
}

action, target, err := s.replicateQueue.CheckRangeAction(ctx, storePool, desc, conf)
if err != nil {
log.Eventf(ctx, "error simulating allocator on range %s: %s", desc, err)
}

return action, target, collectAndFinish(), err
}

// Enqueue runs the given replica through the requested queue. If `async` is
// specified, the replica is enqueued into the requested queue for asynchronous
// processing and this method returns nothing. Otherwise, it returns all trace
Expand Down
202 changes: 202 additions & 0 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
Expand All @@ -48,6 +51,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
Expand Down Expand Up @@ -3387,6 +3391,204 @@ func TestSnapshotRateLimit(t *testing.T) {
}
}

// TestAllocatorCheckRangeUnconfigured tests evaluating the allocation decisions
// for a range with a single replica using the default system configuration and
// no other available allocation targets.
func TestAllocatorCheckRangeUnconfigured(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc := testContext{}
tc.Start(ctx, t, stopper)

s := tc.store

// Expect allocator error if range has nowhere to upreplicate.
action, _, _, err := s.AllocatorCheckRange(ctx, tc.repl.Desc(), nil /* nodeLivenessOverride */)
require.Error(t, err)
var allocatorError allocator.AllocationError
require.ErrorAs(t, err, &allocatorError)
require.Equal(t, allocatorimpl.AllocatorAddVoter, action)

// Expect error looking up spanConfig if we can't use the system config span,
// as the spanconfig.KVSubscriber infrastructure is not initialized.
s.cfg.TestingKnobs.MakeSystemConfigSpanUnavailableToQueues = true
action, _, _, err = s.AllocatorCheckRange(ctx, tc.repl.Desc(), nil /* nodeLivenessOverride */)
require.Error(t, err)
require.ErrorIs(t, err, errSysCfgUnavailable)
require.Equal(t, allocatorimpl.AllocatorNoop, action)
}

// TODO(sarkesian): more testing
func TestAllocatorCheckRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

for _, tc := range []struct {
name string
stores []*roachpb.StoreDescriptor
existingReplicas []roachpb.ReplicaDescriptor
livenessOverrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus
expectedAction allocatorimpl.AllocatorAction
expectValidTarget bool
expectedLogMessage string
expectErr bool
expectAllocatorErr bool
expectedErr error
}{
{
name: "overreplicated",
stores: multiRegionStores,
existingReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
{NodeID: 4, StoreID: 4, ReplicaID: 4},
},
livenessOverrides: nil,
expectedAction: allocatorimpl.AllocatorRemoveVoter,
expectErr: false,
},
{
name: "overreplicated but store dead",
stores: multiRegionStores,
existingReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
{NodeID: 4, StoreID: 4, ReplicaID: 4},
},
livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{
3: livenesspb.NodeLivenessStatus_DEAD,
},
expectedAction: allocatorimpl.AllocatorRemoveDeadVoter,
expectErr: false,
},
{
name: "decommissioning but underreplicated",
stores: multiRegionStores,
existingReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{
2: livenesspb.NodeLivenessStatus_DECOMMISSIONING,
},
expectedAction: allocatorimpl.AllocatorAddVoter,
expectErr: false,
expectValidTarget: true,
},
{
name: "decommissioning with replacement",
stores: multiRegionStores,
existingReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
},
livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{
3: livenesspb.NodeLivenessStatus_DECOMMISSIONING,
},
expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter,
expectErr: false,
expectValidTarget: true,
},
{
name: "decommissioning without valid replacement",
stores: threeStores,
existingReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
},
livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{
3: livenesspb.NodeLivenessStatus_DECOMMISSIONING,
},
expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter,
expectAllocatorErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
// Setup store pool based on store descriptors and configure test store.
nodesSeen := make(map[roachpb.NodeID]struct{})
for _, storeDesc := range tc.stores {
nodesSeen[storeDesc.Node.NodeID] = struct{}{}
}
numNodes := len(nodesSeen)

// Create a test store simulating n1s1, where we have other nodes/stores as
// determined by the test configuration. As we do not start the test store,
// queues will not be running.
stopper, g, sp, _, manual := allocatorimpl.CreateTestAllocator(ctx, numNodes, false /* deterministic */)
defer stopper.Stop(context.Background())
gossiputil.NewStoreGossiper(g).GossipStores(tc.stores, t)

clock := hlc.NewClock(manual, time.Nanosecond)
cfg := TestStoreConfig(clock)
cfg.Gossip = g
cfg.StorePool = sp

s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)

desc := &roachpb.RangeDescriptor{
RangeID: 789,
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("b"),
InternalReplicas: tc.existingReplicas,
}

var livenessOverride storepool.NodeLivenessFunc
if len(tc.livenessOverrides) > 0 {
livenessOverride = func(nid roachpb.NodeID, _ time.Time, _ time.Duration) livenesspb.NodeLivenessStatus {
if liveness, ok := tc.livenessOverrides[nid]; ok {
return liveness
}

return livenesspb.NodeLivenessStatus_UNKNOWN
}
}

// Execute actual allocator range repair check.
action, target, recording, err := s.AllocatorCheckRange(ctx, desc, livenessOverride)

// Validate expectations from test case.
if tc.expectErr || tc.expectAllocatorErr {
require.Error(t, err)

if tc.expectedErr != nil {
require.ErrorIs(t, err, tc.expectedErr)
}
if tc.expectAllocatorErr {
var allocatorError allocator.AllocationError
require.ErrorAs(t, err, &allocatorError)
}
} else {
require.NoError(t, err)
}

require.Equalf(t, tc.expectedAction, action,
"expected action \"%s\", got action \"%s\"", tc.expectedAction, action,
)

if tc.expectValidTarget {
require.NotEqualf(t, roachpb.ReplicationTarget{}, target, "expected valid target")
}

if tc.expectedLogMessage != "" {
_, ok := recording.FindLogMessage(tc.expectedLogMessage)
require.Truef(t, ok, "expected to find trace \"%s\"", tc.expectedLogMessage)
}
})
}
}

// TestManuallyEnqueueUninitializedReplica makes sure that uninitialized
// replicas cannot be enqueued.
func TestManuallyEnqueueUninitializedReplica(t *testing.T) {
Expand Down

0 comments on commit 2ebb088

Please sign in to comment.