Skip to content

Commit

Permalink
server: evaluate decommission pre-checks
Browse files Browse the repository at this point in the history
This adds support for the evaluation of the decommission readiness of a
node (or set of nodes), by simulating their liveness to have the
DECOMMISSIONING status and utilizing the allocator to ensure that we are
able to perform any actions needed to repair the range. This supports a
"strict" mode, in which case we expect all ranges to only need
replacement or removal due to the decommissioning status, or a more
permissive "non-strict" mode, which allows for other actions needed, as
long as they do not encounter errors in finding a suitable allocation
target. The non-strict mode allows us to permit situations where a range
may have more than one action needed to repair it, such as a range that
needs to reach its replication factor before the decommissioning replica
can be replaced, or a range that needs to finalize an atomic replication
change.

Depends on #94024.

Part of #91568

Release note: None
  • Loading branch information
AlexTalks committed Jan 10, 2023
1 parent 4643153 commit 1e57036
Show file tree
Hide file tree
Showing 3 changed files with 383 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ go_library(
"//pkg/kv/kvclient/rangestats",
"//pkg/kv/kvprober",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/sidetransport",
Expand Down
185 changes: 185 additions & 0 deletions pkg/server/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
Expand All @@ -39,6 +43,25 @@ type decommissioningNodeMap struct {
nodes map[roachpb.NodeID]interface{}
}

// decommissionRangeCheckResult is the result of evaluating the allocator action
// and target for a single range that has an extant replica on a node targeted
// for decommission.
type decommissionRangeCheckResult struct {
desc *roachpb.RangeDescriptor
action allocatorimpl.AllocatorAction
tracingSpans tracingpb.Recording
err error
}

// decommissionPreCheckResult is the result of checking the readiness
// of a node or set of nodes to be decommissioned.
type decommissionPreCheckResult struct {
rangesChecked int
replicasByNode map[roachpb.NodeID][]roachpb.ReplicaIdent
actionCounts map[allocatorimpl.AllocatorAction]int
rangesNotReady []decommissionRangeCheckResult
}

// makeOnNodeDecommissioningCallback returns a callback that enqueues the
// decommissioning node's ranges into the `stores`' replicateQueues for
// rebalancing.
Expand Down Expand Up @@ -131,6 +154,168 @@ func getPingCheckDecommissionFn(
}
}

// DecommissionPreCheck is used to evaluate if nodes are ready for decommission,
// prior to starting the Decommission(..) process. This is evaluated by checking
// that any replicas on the given nodes are able to be replaced or removed,
// following the current state of the cluster as well as the configuration.
// If strictReadiness is true, all replicas are expected to need only replace
// or remove actions. If maxErrors >0, range checks will stop once maxError is
// reached.
func (s *Server) DecommissionPreCheck(
ctx context.Context,
nodeIDs []roachpb.NodeID,
strictReadiness bool,
collectTraces bool,
maxErrors int,
) (decommissionPreCheckResult, error) {
var rangesChecked int
decommissionCheckNodeIDs := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus)
replicasByNode := make(map[roachpb.NodeID][]roachpb.ReplicaIdent)
actionCounts := make(map[allocatorimpl.AllocatorAction]int)
var rangeErrors []decommissionRangeCheckResult
const pageSize = 10000

for _, nodeID := range nodeIDs {
decommissionCheckNodeIDs[nodeID] = livenesspb.NodeLivenessStatus_DECOMMISSIONING
}

// Counters need to be reset on any transaction retries during the scan
// through range descriptors.
initCounters := func() {
rangesChecked = 0
for action := range actionCounts {
actionCounts[action] = 0
}
rangeErrors = rangeErrors[:0]
for nid := range replicasByNode {
replicasByNode[nid] = replicasByNode[nid][:0]
}
}

// Only check using the first store on this node, as they should all give
// identical results.
var evalStore *kvserver.Store
err := s.node.stores.VisitStores(func(s *kvserver.Store) error {
if evalStore == nil {
evalStore = s
}
return nil
})
if err == nil && evalStore == nil {
err = errors.Errorf("n%d has no initialized store", s.NodeID())
}
if err != nil {
return decommissionPreCheckResult{}, err
}

// Define our node liveness overrides to simulate that the nodeIDs for which
// we are checking decommission readiness are in the DECOMMISSIONING state.
// All other nodeIDs should use their actual liveness status.
existingStorePool := evalStore.GetStoreConfig().StorePool
overrideNodeLivenessFn := storepool.OverrideNodeLivenessFunc(
decommissionCheckNodeIDs, existingStorePool.NodeLivenessFn,
)
overrideStorePool := storepool.NewOverrideStorePool(existingStorePool, overrideNodeLivenessFn)

// Define our replica filter to only look at the replicas on the checked nodes.
predHasDecommissioningReplica := func(rDesc roachpb.ReplicaDescriptor) bool {
_, ok := decommissionCheckNodeIDs[rDesc.NodeID]
return ok
}

// Iterate through all range descriptors using the rangedesc.Scanner, which
// will perform the requisite meta1/meta2 lookups, including retries.
rangeDescScanner := rangedesc.NewScanner(s.db)
err = rangeDescScanner.Scan(ctx, pageSize, initCounters, keys.EverythingSpan, func(descriptors ...roachpb.RangeDescriptor) error {
for _, desc := range descriptors {
// Track replicas by node for recording purposes.
// Skip checks if this range doesn't exist on a potentially decommissioning node.
replicasToMove := desc.Replicas().FilterToDescriptors(predHasDecommissioningReplica)
if len(replicasToMove) == 0 {
continue
}
for _, rDesc := range replicasToMove {
rIdent := roachpb.ReplicaIdent{
RangeID: desc.RangeID,
Replica: rDesc,
}
replicasByNode[rDesc.NodeID] = append(replicasByNode[rDesc.NodeID], rIdent)
}

if maxErrors > 0 && len(rangeErrors) >= maxErrors {
// TODO(sarkesian): Consider adding a per-range descriptor iterator to
// rangedesc.Scanner, which will correctly stop iteration on the
// function returning iterutil.StopIteration().
continue
}

action, _, recording, rErr := evalStore.AllocatorCheckRange(ctx, &desc, overrideStorePool)
rangesChecked += 1
actionCounts[action] += 1

if passed, checkResult := evaluateRangeCheckResult(strictReadiness, collectTraces,
&desc, action, recording, rErr,
); !passed {
rangeErrors = append(rangeErrors, checkResult)
}
}

return nil
})

return decommissionPreCheckResult{
rangesChecked: rangesChecked,
replicasByNode: replicasByNode,
actionCounts: actionCounts,
rangesNotReady: rangeErrors,
}, err
}

// evaluateRangeCheckResult returns true or false if the range has passed
// decommissioning checks (based on if we are testing strict readiness or not),
// as well as the encapsulated range check result with errors defined as needed.
func evaluateRangeCheckResult(
strictReadiness bool,
collectTraces bool,
desc *roachpb.RangeDescriptor,
action allocatorimpl.AllocatorAction,
recording tracingpb.Recording,
rErr error,
) (passed bool, _ decommissionRangeCheckResult) {
checkResult := decommissionRangeCheckResult{
desc: desc,
action: action,
err: rErr,
}

if collectTraces {
checkResult.tracingSpans = recording
}

if rErr != nil {
return false, checkResult
}

if action == allocatorimpl.AllocatorRangeUnavailable ||
action == allocatorimpl.AllocatorNoop ||
action == allocatorimpl.AllocatorConsiderRebalance {
checkResult.err = errors.Errorf("range r%d requires unexpected allocation action: %s",
desc.RangeID, action,
)
return false, checkResult
}

if strictReadiness && !(action.Replace() || action.Remove()) {
checkResult.err = errors.Errorf(
"range r%d needs repair beyond replacing/removing the decommissioning replica: %s",
desc.RangeID, action,
)
return false, checkResult
}

return true, checkResult
}

// Decommission idempotently sets the decommissioning flag for specified nodes.
// The error return is a gRPC error.
func (s *Server) Decommission(
Expand Down
Loading

0 comments on commit 1e57036

Please sign in to comment.