From 8c4d3bb3e270d600e2d6e4129d8a61b6d45ee661 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Thu, 15 Dec 2022 17:31:28 -0500 Subject: [PATCH] server: implement decommission pre-checks This adds support for the evaluation of the decommission readiness of a node (or set of nodes), by simulating their liveness to have the DECOMMISSIONING status and utilizing the allocator to ensure that we are able to perform any actions needed to repair the range. This supports a "strict" mode, in which case we expect all ranges to only need replacement or removal due to the decommissioning status, or a more permissive "non-strict" mode, which allows for other actions needed, as long as they do not encounter errors in finding a suitable allocation target. The non-strict mode allows us to permit situations where a range may have more than one action needed to repair it, such as a range that needs to reach its replication factor before the decommissioning replica can be replaced, or a range that needs to finalize an atomic replication change. Depends on #92367. Part of #91571 Release note: None --- pkg/server/BUILD.bazel | 1 + pkg/server/decommission.go | 189 +++++++++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 17984d9d6860..bbd4db00ca59 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -100,6 +100,7 @@ go_library( "//pkg/kv/kvclient/rangestats", "//pkg/kv/kvprober", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/closedts/sidetransport", diff --git a/pkg/server/decommission.go b/pkg/server/decommission.go index 797891849232..702fb7f90e00 100644 --- a/pkg/server/decommission.go +++ b/pkg/server/decommission.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -24,8 +26,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/rangedesc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -39,6 +43,25 @@ type decommissioningNodeMap struct { nodes map[roachpb.NodeID]interface{} } +// decommissionRangeCheckResult is the result of evaluating the allocator action +// and target for a single range that has an extant replica on a node targeted +// for decommission. +type decommissionRangeCheckResult struct { + desc *roachpb.RangeDescriptor + action allocatorimpl.AllocatorAction + tracingSpans tracingpb.Recording + err error +} + +// decommissionPreCheckResult is the result of checking the readiness +// of a node or set of nodes to be decommissioned. +type decommissionPreCheckResult struct { + rangesChecked int + replicasByNode map[roachpb.NodeID][]roachpb.ReplicaIdent + actionCounts map[allocatorimpl.AllocatorAction]int + rangesNotReady []decommissionRangeCheckResult +} + // makeOnNodeDecommissioningCallback returns a callback that enqueues the // decommissioning node's ranges into the `stores`' replicateQueues for // rebalancing. @@ -131,6 +154,172 @@ func getPingCheckDecommissionFn( } } +// DecommissionPreCheck is used to evaluate if nodes are ready for decommission, +// prior to starting the Decommission(..) process. This is evaluated by checking +// that any replicas on the given nodes are able to be replaced or removed, +// following the current state of the cluster as well as the configuration. +// If strictReadiness is true, all replicas are expected to need only replace +// or remove actions. If maxErrors >0, range checks will stop once maxError is +// reached. +func (s *Server) DecommissionPreCheck( + ctx context.Context, + nodeIDs []roachpb.NodeID, + strictReadiness bool, + collectTraces bool, + maxErrors int, +) (decommissionPreCheckResult, error) { + var rangesChecked int + decommissionCheckNodeIDs := make(map[roachpb.NodeID]struct{}) + replicasByNode := make(map[roachpb.NodeID][]roachpb.ReplicaIdent) + actionCounts := make(map[allocatorimpl.AllocatorAction]int) + var rangeErrors []decommissionRangeCheckResult + const pageSize = 10000 + + for _, nodeID := range nodeIDs { + decommissionCheckNodeIDs[nodeID] = struct{}{} + } + + // Counters need to be reset on any transaction retries during the scan + // through range descriptors. + initCounters := func() { + rangesChecked = 0 + for action := range actionCounts { + actionCounts[action] = 0 + } + rangeErrors = rangeErrors[:0] + for nid := range replicasByNode { + replicasByNode[nid] = replicasByNode[nid][:0] + } + } + + // Only check using the first store on this node, as they should all give + // identical results. + var evalStore *kvserver.Store + err := s.node.stores.VisitStores(func(s *kvserver.Store) error { + if evalStore == nil { + evalStore = s + } + return nil + }) + if err == nil && evalStore == nil { + err = errors.Errorf("n%d has no initialized store", s.NodeID()) + } + if err != nil { + return decommissionPreCheckResult{}, err + } + + // Define our node liveness override function to simulate that the nodeIDs + // for which we are checking decommission readiness are in the DECOMMISSIONING + // state. All other nodeIDs should use their actual liveness status. + var livenessFn storepool.NodeLivenessFunc = func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if _, ok := decommissionCheckNodeIDs[nid]; ok { + return livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + + // By returning unknown, we signal to the store to look up the true liveness. + return livenesspb.NodeLivenessStatus_UNKNOWN + } + + // Define our replica filter to only look at the replicas on the checked nodes. + predHasDecommissioningReplica := func(rDesc roachpb.ReplicaDescriptor) bool { + _, ok := decommissionCheckNodeIDs[rDesc.NodeID] + return ok + } + + // Iterate through all range descriptors using the rangedesc.Scanner, which + // will perform the requisite meta1/meta2 lookups, including retries. + rangeDescScanner := rangedesc.NewScanner(s.db) + err = rangeDescScanner.Scan(ctx, pageSize, initCounters, keys.EverythingSpan, func(descriptors ...roachpb.RangeDescriptor) error { + for _, desc := range descriptors { + // Track replicas by node for recording purposes. + // Skip checks if this range doesn't exist on a potentially decommissioning node. + replicasToMove := desc.Replicas().FilterToDescriptors(predHasDecommissioningReplica) + if len(replicasToMove) == 0 { + continue + } + for _, rDesc := range replicasToMove { + rIdent := roachpb.ReplicaIdent{ + RangeID: desc.RangeID, + Replica: rDesc, + } + replicasByNode[rDesc.NodeID] = append(replicasByNode[rDesc.NodeID], rIdent) + } + + if maxErrors > 0 && len(rangeErrors) >= maxErrors { + // TODO(sarkesian): Consider adding a per-range descriptor iterator to + // rangedesc.Scanner, which will correctly stop iteration on the + // function returning iterutil.StopIteration(). + continue + } + + action, _, recording, rErr := evalStore.AllocatorCheckRange(ctx, &desc, livenessFn) + rangesChecked += 1 + actionCounts[action] += 1 + + if passed, checkResult := evaluateRangeCheckResult(strictReadiness, collectTraces, + &desc, action, recording, rErr, + ); !passed { + rangeErrors = append(rangeErrors, checkResult) + } + } + + return nil + }) + + return decommissionPreCheckResult{ + rangesChecked: rangesChecked, + replicasByNode: replicasByNode, + actionCounts: actionCounts, + rangesNotReady: rangeErrors, + }, err +} + +// evaluateRangeCheckResult returns true or false if the range has passed +// decommissioning checks (based on if we are testing strict readiness or not), +// as well as the encapsulated range check result with errors defined as needed. +func evaluateRangeCheckResult( + strictReadiness bool, + collectTraces bool, + desc *roachpb.RangeDescriptor, + action allocatorimpl.AllocatorAction, + recording tracingpb.Recording, + rErr error, +) (passed bool, _ decommissionRangeCheckResult) { + checkResult := decommissionRangeCheckResult{ + desc: desc, + action: action, + } + + if collectTraces { + checkResult.tracingSpans = recording + } + + if rErr != nil { + return false, checkResult + } + + if action == allocatorimpl.AllocatorRangeUnavailable || + action == allocatorimpl.AllocatorNoop || + action == allocatorimpl.AllocatorConsiderRebalance { + checkResult.err = errors.Errorf("range r%d requires unexpected allocation action: %s", + desc.RangeID, action, + ) + return false, checkResult + } + + if strictReadiness && !(action == allocatorimpl.AllocatorReplaceDecommissioningVoter || + action == allocatorimpl.AllocatorReplaceDecommissioningNonVoter || + action == allocatorimpl.AllocatorRemoveDecommissioningVoter || + action == allocatorimpl.AllocatorRemoveDecommissioningNonVoter) { + checkResult.err = errors.Errorf( + "range r%d needs repair beyond replacing/removing the decommissioning replica", + ) + return false, checkResult + } + + return true, checkResult +} + // Decommission idempotently sets the decommissioning flag for specified nodes. // The error return is a gRPC error. func (s *Server) Decommission(