Skip to content

Commit

Permalink
server: implement decommission pre-check api
Browse files Browse the repository at this point in the history
This change implements the `DecommissionPreCheck` RPC on the `Admin`
service, using the support for evaluating node decommission readiness by
checking each range introduced in #93758. In checking node decommission
readiness, only nodes that have a valid, non-`DECOMMISSIONED` liveness
status are checked, and ranges with replicas on the checked nodes that
encounter errors in attempting to allocate replacement replicas are
reported in the response. Ranges that have replicas on multiple checked
nodes have their errors reported for each nodeID in the request list.

Depends on #93758, #90222.

Epic: CRDB-20924

Release note: None
  • Loading branch information
AlexTalks committed Jan 28, 2023
1 parent 16b232b commit e033cc6
Show file tree
Hide file tree
Showing 4 changed files with 444 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3142,7 +3142,7 @@ func (s *Store) AllocatorCheckRange(
) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, tracingpb.Recording, error) {
var spanOptions []tracing.SpanOption
if collectTraces {
spanOptions = append(spanOptions, tracing.WithRecording(tracingpb.RecordingStructured))
spanOptions = append(spanOptions, tracing.WithRecording(tracingpb.RecordingVerbose))
}
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator check range", spanOptions...)

Expand Down
87 changes: 86 additions & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2634,7 +2634,92 @@ func (s *adminServer) getStatementBundle(ctx context.Context, id int64, w http.R
func (s *systemAdminServer) DecommissionPreCheck(
ctx context.Context, req *serverpb.DecommissionPreCheckRequest,
) (*serverpb.DecommissionPreCheckResponse, error) {
return nil, grpcstatus.Errorf(codes.Unimplemented, "method DecommissionPreCheck not implemented")
var collectTraces bool
if s := tracing.SpanFromContext(ctx); (s != nil && s.RecordingType() != tracingpb.RecordingOff) || req.CollectTraces {
collectTraces = true
}

// Initially evaluate node liveness status, so we filter the nodes to check.
var nodesToCheck []roachpb.NodeID
livenessStatusByNodeID, err := getLivenessStatusMap(ctx, s.nodeLiveness, s.clock.Now().GoTime(), s.st)
if err != nil {
return nil, serverError(ctx, err)
}

resp := &serverpb.DecommissionPreCheckResponse{}
resultsByNodeID := make(map[roachpb.NodeID]serverpb.DecommissionPreCheckResponse_NodeCheckResult)

// Any nodes that are already decommissioned or have unknown liveness should
// not be checked, and are added to response without replica counts or errors.
for _, nID := range req.NodeIDs {
livenessStatus := livenessStatusByNodeID[nID]
if livenessStatus == livenesspb.NodeLivenessStatus_UNKNOWN {
resultsByNodeID[nID] = serverpb.DecommissionPreCheckResponse_NodeCheckResult{
NodeID: nID,
DecommissionReadiness: serverpb.DecommissionPreCheckResponse_UNKNOWN,
LivenessStatus: livenessStatus,
}
} else if livenessStatus == livenesspb.NodeLivenessStatus_DECOMMISSIONED {
resultsByNodeID[nID] = serverpb.DecommissionPreCheckResponse_NodeCheckResult{
NodeID: nID,
DecommissionReadiness: serverpb.DecommissionPreCheckResponse_ALREADY_DECOMMISSIONED,
LivenessStatus: livenessStatus,
}
} else {
nodesToCheck = append(nodesToCheck, nID)
}
}

results, err := s.server.DecommissionPreCheck(ctx, nodesToCheck, req.StrictReadiness, collectTraces, int(req.NumReplicaReport))
if err != nil {
return nil, err
}

// Collect ranges that encountered errors by the nodes on which their replicas
// exist. Ranges with replicas on multiple checked nodes will result in the
// error being reported for each nodeID.
rangeCheckErrsByNode := make(map[roachpb.NodeID][]serverpb.DecommissionPreCheckResponse_RangeCheckResult)
for _, rangeWithErr := range results.rangesNotReady {
rangeCheckResult := serverpb.DecommissionPreCheckResponse_RangeCheckResult{
RangeID: rangeWithErr.desc.RangeID,
Action: rangeWithErr.action,
Events: recordedSpansToTraceEvents(rangeWithErr.tracingSpans),
Error: rangeWithErr.err.Error(),
}

for _, nID := range nodesToCheck {
if rangeWithErr.desc.Replicas().HasReplicaOnNode(nID) {
rangeCheckErrsByNode[nID] = append(rangeCheckErrsByNode[nID], rangeCheckResult)
}
}
}

// Evaluate readiness for each node to check based on how many ranges have
// replicas on the node that did not pass checks.
for _, nID := range nodesToCheck {
numReplicas := len(results.replicasByNode[nID])
var readiness serverpb.DecommissionPreCheckResponse_NodeReadiness
if len(rangeCheckErrsByNode[nID]) > 0 {
readiness = serverpb.DecommissionPreCheckResponse_ALLOCATION_ERRORS
} else {
readiness = serverpb.DecommissionPreCheckResponse_READY
}

resultsByNodeID[nID] = serverpb.DecommissionPreCheckResponse_NodeCheckResult{
NodeID: nID,
DecommissionReadiness: readiness,
LivenessStatus: livenessStatusByNodeID[nID],
ReplicaCount: int64(numReplicas),
CheckedRanges: rangeCheckErrsByNode[nID],
}
}

// Reorder checked nodes to match request order.
for _, nID := range req.NodeIDs {
resp.CheckedNodes = append(resp.CheckedNodes, resultsByNodeID[nID])
}

return resp, nil
}

// DecommissionStatus returns the DecommissionStatus for all or the given nodes.
Expand Down
Loading

0 comments on commit e033cc6

Please sign in to comment.