Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: evaluate decommission pre-checks #93758

Merged
merged 1 commit into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3501,26 +3501,28 @@ func (s *Store) ReplicateQueueDryRun(
func (s *Store) AllocatorCheckRange(
ctx context.Context,
desc *roachpb.RangeDescriptor,
collectTraces bool,
overrideStorePool storepool.AllocatorStorePool,
) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, tracingpb.Recording, error) {
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx,
s.cfg.AmbientCtx.Tracer, "allocator check range",
)
defer collectAndFinish()
var spanOptions []tracing.SpanOption
if collectTraces {
spanOptions = append(spanOptions, tracing.WithRecording(tracingpb.RecordingStructured))
}
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator check range", spanOptions...)

confReader, err := s.GetConfReader(ctx)
if err == nil {
err = s.WaitForSpanConfigSubscription(ctx)
}
if err != nil {
log.Eventf(ctx, "span configs unavailable: %s", err)
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, sp.FinishAndGetConfiguredRecording(), err
}

conf, err := confReader.GetSpanConfigForKey(ctx, desc.StartKey)
if err != nil {
log.Eventf(ctx, "error retrieving span config for range %s: %s", desc, err)
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err
return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, sp.FinishAndGetConfiguredRecording(), err
}

// If a store pool was provided, use that, otherwise use the store's
Expand All @@ -3536,14 +3538,14 @@ func (s *Store) AllocatorCheckRange(

// In the case that the action does not require a target, return immediately.
if !(action.Add() || action.Replace()) {
return action, roachpb.ReplicationTarget{}, collectAndFinish(), err
return action, roachpb.ReplicationTarget{}, sp.FinishAndGetConfiguredRecording(), err
}

liveVoters, liveNonVoters, isReplacement, nothingToDo, err :=
allocatorimpl.FilterReplicasForAction(storePool, desc, action)

if nothingToDo || err != nil {
return action, roachpb.ReplicationTarget{}, collectAndFinish(), err
return action, roachpb.ReplicationTarget{}, sp.FinishAndGetConfiguredRecording(), err
}

target, _, err := s.allocator.AllocateTarget(ctx, storePool, conf,
Expand Down Expand Up @@ -3571,7 +3573,7 @@ func (s *Store) AllocatorCheckRange(
}
}

return action, target, collectAndFinish(), err
return action, target, sp.FinishAndGetConfiguredRecording(), err
}

// Enqueue runs the given replica through the requested queue. If `async` is
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3404,7 +3404,9 @@ func TestAllocatorCheckRangeUnconfigured(t *testing.T) {
tc.StartWithStoreConfig(ctx, t, stopper, cfg)
s := tc.store

action, _, _, err := s.AllocatorCheckRange(ctx, tc.repl.Desc(), nil /* overrideStorePool */)
action, _, _, err := s.AllocatorCheckRange(ctx, tc.repl.Desc(),
false /* collectTraces */, nil, /* overrideStorePool */
)
require.Error(t, err)

if confAvailable {
Expand Down Expand Up @@ -3554,7 +3556,9 @@ func TestAllocatorCheckRange(t *testing.T) {
}

// Execute actual allocator range repair check.
action, target, recording, err := s.AllocatorCheckRange(ctx, desc, storePoolOverride)
action, target, recording, err := s.AllocatorCheckRange(ctx, desc,
true /* collectTraces */, storePoolOverride,
)

// Validate expectations from test case.
if tc.expectErr || tc.expectAllocatorErr {
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ go_library(
"//pkg/kv/kvclient/rangestats",
"//pkg/kv/kvprober",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/sidetransport",
Expand Down Expand Up @@ -366,6 +367,7 @@ go_test(
"bench_test.go",
"config_test.go",
"connectivity_test.go",
"decommission_test.go",
"drain_test.go",
"graphite_test.go",
"index_usage_stats_test.go",
Expand Down Expand Up @@ -417,6 +419,8 @@ go_test(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/kvserverbase",
Expand Down
197 changes: 197 additions & 0 deletions pkg/server/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
Expand All @@ -39,6 +43,25 @@ type decommissioningNodeMap struct {
nodes map[roachpb.NodeID]interface{}
}

// decommissionRangeCheckResult is the result of evaluating the allocator action
// and target for a single range that has an extant replica on a node targeted
// for decommission.
type decommissionRangeCheckResult struct {
desc *roachpb.RangeDescriptor
action string
tracingSpans tracingpb.Recording
err error
}

// decommissionPreCheckResult is the result of checking the readiness
// of a node or set of nodes to be decommissioned.
type decommissionPreCheckResult struct {
rangesChecked int
replicasByNode map[roachpb.NodeID][]roachpb.ReplicaIdent
actionCounts map[string]int
rangesNotReady []decommissionRangeCheckResult
}

// makeOnNodeDecommissioningCallback returns a callback that enqueues the
// decommissioning node's ranges into the `stores`' replicateQueues for
// rebalancing.
Expand Down Expand Up @@ -131,6 +154,180 @@ func getPingCheckDecommissionFn(
}
}

// DecommissionPreCheck is used to evaluate if nodes are ready for decommission,
// prior to starting the Decommission(..) process. This is evaluated by checking
// that any replicas on the given nodes are able to be replaced or removed,
// following the current state of the cluster as well as the configuration.
// If strictReadiness is true, all replicas are expected to need only replace
// or remove actions. If maxErrors >0, range checks will stop once maxError is
// reached.
// The error returned is a gRPC error.
func (s *Server) DecommissionPreCheck(
ctx context.Context,
nodeIDs []roachpb.NodeID,
strictReadiness bool,
collectTraces bool,
maxErrors int,
) (decommissionPreCheckResult, error) {
// Ensure that if collectTraces is enabled, that a maxErrors >0 is set in
// order to avoid unlimited memory usage.
if collectTraces && maxErrors <= 0 {
return decommissionPreCheckResult{},
grpcstatus.Error(codes.InvalidArgument, "MaxErrors must be set to collect traces.")
}

var rangesChecked int
decommissionCheckNodeIDs := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus)
replicasByNode := make(map[roachpb.NodeID][]roachpb.ReplicaIdent)
actionCounts := make(map[string]int)
var rangeErrors []decommissionRangeCheckResult
const pageSize = 10000

for _, nodeID := range nodeIDs {
decommissionCheckNodeIDs[nodeID] = livenesspb.NodeLivenessStatus_DECOMMISSIONING
}

// Counters need to be reset on any transaction retries during the scan
// through range descriptors.
initCounters := func() {
rangesChecked = 0
for action := range actionCounts {
actionCounts[action] = 0
}
rangeErrors = rangeErrors[:0]
for nid := range replicasByNode {
replicasByNode[nid] = replicasByNode[nid][:0]
}
}

// Only check using the first store on this node, as they should all give
// identical results.
var evalStore *kvserver.Store
err := s.node.stores.VisitStores(func(s *kvserver.Store) error {
if evalStore == nil {
evalStore = s
}
return nil
})
if err == nil && evalStore == nil {
err = errors.Errorf("n%d has no initialized store", s.NodeID())
}
if err != nil {
return decommissionPreCheckResult{}, grpcstatus.Error(codes.NotFound, err.Error())
}

// Define our node liveness overrides to simulate that the nodeIDs for which
// we are checking decommission readiness are in the DECOMMISSIONING state.
// All other nodeIDs should use their actual liveness status.
existingStorePool := evalStore.GetStoreConfig().StorePool
overrideNodeLivenessFn := storepool.OverrideNodeLivenessFunc(
decommissionCheckNodeIDs, existingStorePool.NodeLivenessFn,
)
overrideStorePool := storepool.NewOverrideStorePool(existingStorePool, overrideNodeLivenessFn)

// Define our replica filter to only look at the replicas on the checked nodes.
predHasDecommissioningReplica := func(rDesc roachpb.ReplicaDescriptor) bool {
_, ok := decommissionCheckNodeIDs[rDesc.NodeID]
return ok
}

// Iterate through all range descriptors using the rangedesc.Scanner, which
// will perform the requisite meta1/meta2 lookups, including retries.
rangeDescScanner := rangedesc.NewScanner(s.db)
err = rangeDescScanner.Scan(ctx, pageSize, initCounters, keys.EverythingSpan, func(descriptors ...roachpb.RangeDescriptor) error {
for _, desc := range descriptors {
// Track replicas by node for recording purposes.
// Skip checks if this range doesn't exist on a potentially decommissioning node.
replicasToMove := desc.Replicas().FilterToDescriptors(predHasDecommissioningReplica)
if len(replicasToMove) == 0 {
continue
}
for _, rDesc := range replicasToMove {
rIdent := roachpb.ReplicaIdent{
RangeID: desc.RangeID,
Replica: rDesc,
}
replicasByNode[rDesc.NodeID] = append(replicasByNode[rDesc.NodeID], rIdent)
}

if maxErrors > 0 && len(rangeErrors) >= maxErrors {
// TODO(sarkesian): Consider adding a per-range descriptor iterator to
// rangedesc.Scanner, which will correctly stop iteration on the
// function returning iterutil.StopIteration().
continue
}

action, _, recording, rErr := evalStore.AllocatorCheckRange(ctx, &desc, collectTraces, overrideStorePool)
rangesChecked += 1
actionCounts[action.String()] += 1

if passed, checkResult := evaluateRangeCheckResult(strictReadiness, collectTraces,
&desc, action, recording, rErr,
); !passed {
rangeErrors = append(rangeErrors, checkResult)
}
}

return nil
})

if err != nil {
return decommissionPreCheckResult{}, grpcstatus.Errorf(codes.Internal, err.Error())
}

return decommissionPreCheckResult{
rangesChecked: rangesChecked,
replicasByNode: replicasByNode,
actionCounts: actionCounts,
rangesNotReady: rangeErrors,
}, nil
}

// evaluateRangeCheckResult returns true or false if the range has passed
// decommissioning checks (based on if we are testing strict readiness or not),
// as well as the encapsulated range check result with errors defined as needed.
func evaluateRangeCheckResult(
strictReadiness bool,
collectTraces bool,
desc *roachpb.RangeDescriptor,
action allocatorimpl.AllocatorAction,
recording tracingpb.Recording,
rErr error,
) (passed bool, _ decommissionRangeCheckResult) {
checkResult := decommissionRangeCheckResult{
desc: desc,
action: action.String(),
err: rErr,
}

if collectTraces {
checkResult.tracingSpans = recording
}

if rErr != nil {
return false, checkResult
}

if action == allocatorimpl.AllocatorRangeUnavailable ||
action == allocatorimpl.AllocatorNoop ||
action == allocatorimpl.AllocatorConsiderRebalance {
checkResult.err = errors.Errorf("range r%d requires unexpected allocation action: %s",
desc.RangeID, action,
)
return false, checkResult
}

if strictReadiness && !(action.Replace() || action.Remove()) {
checkResult.err = errors.Errorf(
"range r%d needs repair beyond replacing/removing the decommissioning replica: %s",
desc.RangeID, action,
)
return false, checkResult
}

return true, checkResult
}

// Decommission idempotently sets the decommissioning flag for specified nodes.
// The error return is a gRPC error.
func (s *Server) Decommission(
Expand Down
Loading