diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 5210fa9ef308..ed48cd9a4f12 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -430,7 +430,8 @@ func (ae *allocatorError) Error() string { return b.String() } -func (*allocatorError) PurgatoryErrorMarker() {} +func (*allocatorError) AllocationErrorMarker() {} +func (*allocatorError) PurgatoryErrorMarker() {} // allocatorRand pairs a rand.Rand with a mutex. // NOTE: Allocator is typically only accessed from a single thread (the @@ -815,6 +816,32 @@ func getRemoveIdx( return removeIdx } +// FilterReplicasForAction converts a range descriptor to the filtered +// voter and non-voter replicas needed to allocate a target for the given action. +// NB: This is a convenience method for callers of allocator.AllocateTarget(..). +func FilterReplicasForAction( + storePool storepool.AllocatorStorePool, desc *roachpb.RangeDescriptor, action AllocatorAction, +) ( + filteredVoters, filteredNonVoters []roachpb.ReplicaDescriptor, + isReplacement, nothingToDo bool, + err error, +) { + voterReplicas, nonVoterReplicas, + liveVoterReplicas, deadVoterReplicas, + liveNonVoterReplicas, deadNonVoterReplicas := LiveAndDeadVoterAndNonVoterReplicas(storePool, desc) + + removeIdx := -1 + _, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter( + storePool, + action, + voterReplicas, nonVoterReplicas, + liveVoterReplicas, deadVoterReplicas, + liveNonVoterReplicas, deadNonVoterReplicas, + ) + + return filteredVoters, filteredNonVoters, removeIdx >= 0, nothingToDo, err +} + // ComputeAction determines the exact operation needed to repair the // supplied range, as governed by the supplied zone configuration. It // returns the required action that should be taken and a priority. diff --git a/pkg/kv/kvserver/allocator/base.go b/pkg/kv/kvserver/allocator/base.go index 92d7bf598384..0f70f5b9a3bc 100644 --- a/pkg/kv/kvserver/allocator/base.go +++ b/pkg/kv/kvserver/allocator/base.go @@ -42,6 +42,13 @@ const ( defaultLoadBasedRebalancingInterval = time.Minute ) +// AllocationError is a simple interface used to indicate a replica processing +// error originating from the allocator. +type AllocationError interface { + error + AllocationErrorMarker() // dummy method for unique interface +} + // MaxCapacityCheck returns true if the store has room for a new replica. func MaxCapacityCheck(store roachpb.StoreDescriptor) bool { return store.Capacity.FractionUsed() < MaxFractionUsedThreshold diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go index 8ce08868a955..4cd59b90a008 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -12,8 +12,10 @@ package storepool import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -41,6 +43,24 @@ type OverrideStorePool struct { var _ AllocatorStorePool = &OverrideStorePool{} +// OverrideNodeLivenessFunc constructs a NodeLivenessFunc based on a set of +// predefined overrides. If any nodeID does not have an override, the liveness +// status is looked up using the passed-in real node liveness function. +func OverrideNodeLivenessFunc( + overrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus, realNodeLivenessFunc NodeLivenessFunc, +) NodeLivenessFunc { + return func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if override, ok := overrides[nid]; ok { + return override + } + + return realNodeLivenessFunc(nid, now, timeUntilStoreDead) + } +} + +// NewOverrideStorePool constructs an OverrideStorePool that can use its own +// view of node liveness while falling through to an underlying store pool for +// the state of peer stores. func NewOverrideStorePool(storePool *StorePool, nl NodeLivenessFunc) *OverrideStorePool { return &OverrideStorePool{ sp: storePool, diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index 1549865b8b06..5490ed3d5c2f 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -60,6 +60,48 @@ var singleStore = []*roachpb.StoreDescriptor{ }, } +var threeStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, +} + // TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance // to a target that we'll immediately remove. func TestAllocatorRebalanceTarget(t *testing.T) { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 141f8d31c3a4..820bf567d132 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3451,11 +3451,16 @@ func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeyS return result, err } -// AllocatorDryRun runs the given replica through the allocator without actually -// carrying out any changes, returning all trace messages collected along the way. +// ReplicateQueueDryRun runs the given replica through the replicate queue +// (using the allocator) without actually carrying out any changes, returning +// all trace messages collected along the way. // Intended to help power a debug endpoint. -func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.Recording, error) { - ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator dry run") +func (s *Store) ReplicateQueueDryRun( + ctx context.Context, repl *Replica, +) (tracingpb.Recording, error) { + ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx, + s.cfg.AmbientCtx.Tracer, "replicate queue dry run", + ) defer collectAndFinish() canTransferLease := func(ctx context.Context, repl *Replica) bool { return true } _, err := s.replicateQueue.processOneChange( @@ -3467,6 +3472,99 @@ func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.R return collectAndFinish(), nil } +// AllocatorCheckRange takes a range descriptor and a node liveness override (or +// nil, to use the configured StorePool's), looks up the configuration of +// range, and utilizes the allocator to get the action needed to repair the +// range, as well as any upreplication target if needed, returning along with +// any encountered errors as well as the collected tracing spans. +// +// This functionality is similar to ReplicateQueueDryRun, but operates on the +// basis of a range, evaluating the action and target determined by the allocator. +// The range does not need to have a replica on the store in order to check the +// needed allocator action and target. The store pool, if provided, will be +// used, otherwise it will fall back to the store's configured store pool. +// +// Assuming the span config is available, a valid allocator action should +// always be returned, even in case of errors. +// +// NB: In the case of removal or rebalance actions, a target cannot be +// evaluated, as a leaseholder is required for evaluation. +func (s *Store) AllocatorCheckRange( + ctx context.Context, + desc *roachpb.RangeDescriptor, + overrideStorePool storepool.AllocatorStorePool, +) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, tracingpb.Recording, error) { + ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx, + s.cfg.AmbientCtx.Tracer, "allocator check range", + ) + defer collectAndFinish() + + confReader, err := s.GetConfReader(ctx) + if err == nil { + err = s.WaitForSpanConfigSubscription(ctx) + } + if err != nil { + log.Eventf(ctx, "span configs unavailable: %s", err) + return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err + } + + conf, err := confReader.GetSpanConfigForKey(ctx, desc.StartKey) + if err != nil { + log.Eventf(ctx, "error retrieving span config for range %s: %s", desc, err) + return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err + } + + // If a store pool was provided, use that, otherwise use the store's + // configured store pool. + var storePool storepool.AllocatorStorePool + if overrideStorePool != nil { + storePool = overrideStorePool + } else if s.cfg.StorePool != nil { + storePool = s.cfg.StorePool + } + + action, _ := s.allocator.ComputeAction(ctx, storePool, conf, desc) + + // In the case that the action does not require a target, return immediately. + if !(action.Add() || action.Replace()) { + return action, roachpb.ReplicationTarget{}, collectAndFinish(), err + } + + liveVoters, liveNonVoters, isReplacement, nothingToDo, err := + allocatorimpl.FilterReplicasForAction(storePool, desc, action) + + if nothingToDo || err != nil { + return action, roachpb.ReplicationTarget{}, collectAndFinish(), err + } + + target, _, err := s.allocator.AllocateTarget(ctx, storePool, conf, + liveVoters, liveNonVoters, action.ReplicaStatus(), action.TargetReplicaType(), + ) + if err == nil { + log.Eventf(ctx, "found valid allocation of %s target %v", action.TargetReplicaType(), target) + + // Ensure that if we are upreplicating, we are avoiding a state in which we + // have a fragile quorum that we cannot avoid by allocating more voters. + fragileQuorumErr := s.allocator.CheckAvoidsFragileQuorum( + ctx, + storePool, + conf, + desc.Replicas().VoterDescriptors(), + liveNonVoters, + action.ReplicaStatus(), + action.TargetReplicaType(), + target, + isReplacement, + ) + + if fragileQuorumErr != nil { + err = errors.Wrap(fragileQuorumErr, "avoid up-replicating to fragile quorum") + } + } + + return action, target, collectAndFinish(), err +} + // Enqueue runs the given replica through the requested queue. If `async` is // specified, the replica is enqueued into the requested queue for asynchronous // processing and this method returns nothing. Otherwise, it returns all trace diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index a046a1f8e872..739fe928a024 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -32,12 +32,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -50,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/errorutil" @@ -3378,6 +3382,214 @@ func TestSnapshotRateLimit(t *testing.T) { } } +// TestAllocatorCheckRangeUnconfigured tests evaluating the allocation decisions +// for a range with a single replica using the default system configuration and +// no other available allocation targets. +func TestAllocatorCheckRangeUnconfigured(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "confAvailable", func(t *testing.T, confAvailable bool) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + tc := testContext{} + tc.manualClock = timeutil.NewManualTime(timeutil.Unix(0, 123)) + cfg := TestStoreConfig(hlc.NewClock(tc.manualClock, time.Nanosecond) /* maxOffset */) + if !confAvailable { + cfg.TestingKnobs.MakeSystemConfigSpanUnavailableToQueues = true + } + + tc.StartWithStoreConfig(ctx, t, stopper, cfg) + s := tc.store + + action, _, _, err := s.AllocatorCheckRange(ctx, tc.repl.Desc(), nil /* overrideStorePool */) + require.Error(t, err) + + if confAvailable { + // Expect allocator error if range has nowhere to upreplicate. + var allocatorError allocator.AllocationError + require.ErrorAs(t, err, &allocatorError) + require.Equal(t, allocatorimpl.AllocatorAddVoter, action) + } else { + // Expect error looking up spanConfig if we can't use the system config span, + // as the spanconfig.KVSubscriber infrastructure is not initialized. + require.ErrorIs(t, err, errSysCfgUnavailable) + require.Equal(t, allocatorimpl.AllocatorNoop, action) + } + }) +} + +// TestAllocatorCheckRange runs a number of tests to check the allocator's +// range repair action and target based on a number of different configured +// stores. +func TestAllocatorCheckRange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + for _, tc := range []struct { + name string + stores []*roachpb.StoreDescriptor + existingReplicas []roachpb.ReplicaDescriptor + livenessOverrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus + expectedAction allocatorimpl.AllocatorAction + expectValidTarget bool + expectedLogMessage string + expectErr bool + expectAllocatorErr bool + expectedErr error + expectedErrStr string + }{ + { + name: "overreplicated", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + {NodeID: 4, StoreID: 4, ReplicaID: 4}, + }, + livenessOverrides: nil, + expectedAction: allocatorimpl.AllocatorRemoveVoter, + expectErr: false, + }, + { + name: "overreplicated but store dead", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + {NodeID: 4, StoreID: 4, ReplicaID: 4}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 3: livenesspb.NodeLivenessStatus_DEAD, + }, + expectedAction: allocatorimpl.AllocatorRemoveDeadVoter, + expectErr: false, + }, + { + name: "decommissioning but underreplicated", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 2: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + expectedAction: allocatorimpl.AllocatorAddVoter, + expectErr: false, + expectValidTarget: true, + }, + { + name: "decommissioning with replacement", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectErr: false, + expectValidTarget: true, + }, + { + name: "decommissioning without valid replacement", + stores: threeStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectAllocatorErr: true, + expectedErrStr: "likely not enough nodes in cluster", + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Setup store pool based on store descriptors and configure test store. + nodesSeen := make(map[roachpb.NodeID]struct{}) + for _, storeDesc := range tc.stores { + nodesSeen[storeDesc.Node.NodeID] = struct{}{} + } + numNodes := len(nodesSeen) + + // Create a test store simulating n1s1, where we have other nodes/stores as + // determined by the test configuration. As we do not start the test store, + // queues will not be running. + stopper, g, sp, _, manual := allocatorimpl.CreateTestAllocator(ctx, numNodes, false /* deterministic */) + defer stopper.Stop(context.Background()) + gossiputil.NewStoreGossiper(g).GossipStores(tc.stores, t) + + clock := hlc.NewClock(manual, time.Nanosecond) + cfg := TestStoreConfig(clock) + cfg.Gossip = g + cfg.StorePool = sp + + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + + desc := &roachpb.RangeDescriptor{ + RangeID: 789, + StartKey: roachpb.RKey("a"), + EndKey: roachpb.RKey("b"), + InternalReplicas: tc.existingReplicas, + } + + var storePoolOverride storepool.AllocatorStorePool + if len(tc.livenessOverrides) > 0 { + livenessOverride := storepool.OverrideNodeLivenessFunc(tc.livenessOverrides, sp.NodeLivenessFn) + storePoolOverride = storepool.NewOverrideStorePool(sp, livenessOverride) + } + + // Execute actual allocator range repair check. + action, target, recording, err := s.AllocatorCheckRange(ctx, desc, storePoolOverride) + + // Validate expectations from test case. + if tc.expectErr || tc.expectAllocatorErr { + require.Error(t, err) + + if tc.expectedErr != nil { + require.ErrorIs(t, err, tc.expectedErr) + } + if tc.expectAllocatorErr { + var allocatorError allocator.AllocationError + require.ErrorAs(t, err, &allocatorError) + } + if tc.expectedErrStr != "" { + require.ErrorContains(t, err, tc.expectedErrStr) + } + } else { + require.NoError(t, err) + } + + require.Equalf(t, tc.expectedAction, action, + "expected action \"%s\", got action \"%s\"", tc.expectedAction, action, + ) + + if tc.expectValidTarget { + require.NotEqualf(t, roachpb.ReplicationTarget{}, target, "expected valid target") + } + + if tc.expectedLogMessage != "" { + _, ok := recording.FindLogMessage(tc.expectedLogMessage) + require.Truef(t, ok, "expected to find trace \"%s\"", tc.expectedLogMessage) + } + }) + } +} + // TestManuallyEnqueueUninitializedReplica makes sure that uninitialized // replicas cannot be enqueued. func TestManuallyEnqueueUninitializedReplica(t *testing.T) { diff --git a/pkg/server/status.go b/pkg/server/status.go index 35f005f2c594..f53a252196b3 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -816,7 +816,7 @@ func (s *systemStatusServer) Allocator( return true // continue. } var allocatorSpans tracingpb.Recording - allocatorSpans, err = store.AllocatorDryRun(ctx, rep) + allocatorSpans, err = store.ReplicateQueueDryRun(ctx, rep) if err != nil { return false // break and bubble up the error. } @@ -841,7 +841,7 @@ func (s *systemStatusServer) Allocator( if !rep.OwnsValidLease(ctx, store.Clock().NowAsClockTimestamp()) { continue } - allocatorSpans, err := store.AllocatorDryRun(ctx, rep) + allocatorSpans, err := store.ReplicateQueueDryRun(ctx, rep) if err != nil { return err }