diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 0958e9adb6f3..1eab5661f734 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3141,7 +3141,7 @@ func (s *Store) AllocatorCheckRange( ) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, tracingpb.Recording, error) { var spanOptions []tracing.SpanOption if collectTraces { - spanOptions = append(spanOptions, tracing.WithRecording(tracingpb.RecordingStructured)) + spanOptions = append(spanOptions, tracing.WithRecording(tracingpb.RecordingVerbose)) } ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator check range", spanOptions...) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 7195acfb8748..b3f05785f2a6 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -2634,7 +2634,92 @@ func (s *adminServer) getStatementBundle(ctx context.Context, id int64, w http.R func (s *systemAdminServer) DecommissionPreCheck( ctx context.Context, req *serverpb.DecommissionPreCheckRequest, ) (*serverpb.DecommissionPreCheckResponse, error) { - return nil, grpcstatus.Errorf(codes.Unimplemented, "method DecommissionPreCheck not implemented") + var collectTraces bool + if s := tracing.SpanFromContext(ctx); (s != nil && s.RecordingType() != tracingpb.RecordingOff) || req.CollectTraces { + collectTraces = true + } + + // Initially evaluate node liveness status, so we filter the nodes to check. + var nodesToCheck []roachpb.NodeID + livenessStatusByNodeID, err := getLivenessStatusMap(ctx, s.nodeLiveness, s.clock.Now().GoTime(), s.st) + if err != nil { + return nil, serverError(ctx, err) + } + + resp := &serverpb.DecommissionPreCheckResponse{} + resultsByNodeID := make(map[roachpb.NodeID]serverpb.DecommissionPreCheckResponse_NodeCheckResult) + + // Any nodes that are already decommissioned or have unknown liveness should + // not be checked, and are added to response without replica counts or errors. + for _, nID := range req.NodeIDs { + livenessStatus := livenessStatusByNodeID[nID] + if livenessStatus == livenesspb.NodeLivenessStatus_UNKNOWN { + resultsByNodeID[nID] = serverpb.DecommissionPreCheckResponse_NodeCheckResult{ + NodeID: nID, + DecommissionReadiness: serverpb.DecommissionPreCheckResponse_UNKNOWN, + LivenessStatus: livenessStatus, + } + } else if livenessStatus == livenesspb.NodeLivenessStatus_DECOMMISSIONED { + resultsByNodeID[nID] = serverpb.DecommissionPreCheckResponse_NodeCheckResult{ + NodeID: nID, + DecommissionReadiness: serverpb.DecommissionPreCheckResponse_ALREADY_DECOMMISSIONED, + LivenessStatus: livenessStatus, + } + } else { + nodesToCheck = append(nodesToCheck, nID) + } + } + + results, err := s.server.DecommissionPreCheck(ctx, nodesToCheck, req.StrictReadiness, collectTraces, int(req.NumReplicaReport)) + if err != nil { + return nil, err + } + + // Collect ranges that encountered errors by the nodes on which their replicas + // exist. Ranges with replicas on multiple checked nodes will result in the + // error being reported for each nodeID. + rangeCheckErrsByNode := make(map[roachpb.NodeID][]serverpb.DecommissionPreCheckResponse_RangeCheckResult) + for _, rangeWithErr := range results.rangesNotReady { + rangeCheckResult := serverpb.DecommissionPreCheckResponse_RangeCheckResult{ + RangeID: rangeWithErr.desc.RangeID, + Action: rangeWithErr.action, + Events: recordedSpansToTraceEvents(rangeWithErr.tracingSpans), + Error: rangeWithErr.err.Error(), + } + + for _, nID := range nodesToCheck { + if rangeWithErr.desc.Replicas().HasReplicaOnNode(nID) { + rangeCheckErrsByNode[nID] = append(rangeCheckErrsByNode[nID], rangeCheckResult) + } + } + } + + // Evaluate readiness for each node to check based on how many ranges have + // replicas on the node that did not pass checks. + for _, nID := range nodesToCheck { + numReplicas := len(results.replicasByNode[nID]) + var readiness serverpb.DecommissionPreCheckResponse_NodeReadiness + if len(rangeCheckErrsByNode[nID]) > 0 { + readiness = serverpb.DecommissionPreCheckResponse_ALLOCATION_ERRORS + } else { + readiness = serverpb.DecommissionPreCheckResponse_READY + } + + resultsByNodeID[nID] = serverpb.DecommissionPreCheckResponse_NodeCheckResult{ + NodeID: nID, + DecommissionReadiness: readiness, + LivenessStatus: livenessStatusByNodeID[nID], + ReplicaCount: int64(numReplicas), + CheckedRanges: rangeCheckErrsByNode[nID], + } + } + + // Reorder checked nodes to match request order. + for _, nID := range req.NodeIDs { + resp.CheckedNodes = append(resp.CheckedNodes, resultsByNodeID[nID]) + } + + return resp, nil } // DecommissionStatus returns the DecommissionStatus for all or the given nodes. diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index 1b76b9ce1b7f..659a7cfb9b0b 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -2462,9 +2462,61 @@ func TestEndpointTelemetryBasic(t *testing.T) { ))) } -// TestDecommissionPreCheck tests the basic functionality of the +// checkNodeCheckResultReady is a helper function for validating that the +// results of a decommission pre-check on a single node show it is ready. +func checkNodeCheckResultReady(t *testing.T, nID roachpb.NodeID, replicaCount int64, + checkResult serverpb.DecommissionPreCheckResponse_NodeCheckResult, +) { + require.Equal(t, serverpb.DecommissionPreCheckResponse_NodeCheckResult{ + NodeID: nID, + DecommissionReadiness: serverpb.DecommissionPreCheckResponse_READY, + LivenessStatus: livenesspb.NodeLivenessStatus_LIVE, + ReplicaCount: replicaCount, + CheckedRanges: nil, + }, checkResult) +} + +// checkRangeCheckResult is a helper function for validating a range error +// returned as part of a decommission pre-check. +func checkRangeCheckResult(t *testing.T, desc roachpb.RangeDescriptor, + checkResult serverpb.DecommissionPreCheckResponse_RangeCheckResult, + expectedAction string, expectedErrSubstr string, expectTraces bool, +) { + passed := false + defer func() { + if !passed { + t.Logf("failed checking %s", desc) + if expectTraces { + var traceBuilder strings.Builder + for _, event := range checkResult.Events { + fmt.Fprintf(&traceBuilder, "\n(%s) %s", event.Time, event.Message) + } + t.Logf("trace events: %s", traceBuilder.String()) + } + } + }() + require.Equalf(t, desc.RangeID, checkResult.RangeID, "expected r%d, got r%d with error: \"%s\"", + desc.RangeID, checkResult.RangeID, checkResult.Error) + require.Equalf(t, expectedAction, checkResult.Action, "r%d expected action %s, got action %s with error: \"%s\"", + desc.RangeID, expectedAction, checkResult.Action, checkResult.Error) + require.NotEmptyf(t, checkResult.Error, "r%d expected non-empty error", checkResult.RangeID) + if len(expectedErrSubstr) > 0 { + require.Containsf(t, checkResult.Error, expectedErrSubstr, "r%d expected error with \"%s\", got error: \"%s\"", + desc.RangeID, expectedErrSubstr, checkResult.Error) + } + if expectTraces { + require.NotEmptyf(t, checkResult.Events, "r%d expected traces, got none with error: \"%s\"", + checkResult.RangeID, checkResult.Error) + } else { + require.Emptyf(t, checkResult.Events, "r%d expected no traces with error: \"%s\"", + checkResult.RangeID, checkResult.Error) + } + passed = true +} + +// TestDecommissionPreCheckBasicReadiness tests the basic functionality of the // DecommissionPreCheck endpoint. -func TestDecommissionPreCheck(t *testing.T) { +func TestDecommissionPreCheckBasicReadiness(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) skip.UnderRace(t) // can't handle 7-node clusters @@ -2484,9 +2536,300 @@ func TestDecommissionPreCheck(t *testing.T) { resp, err := adminClient.DecommissionPreCheck(ctx, &serverpb.DecommissionPreCheckRequest{ NodeIDs: []roachpb.NodeID{tc.Server(5).NodeID()}, }) - require.Error(t, err) - require.Equal(t, codes.Unimplemented, status.Code(err)) - require.Nil(t, resp) + require.NoError(t, err) + require.Len(t, resp.CheckedNodes, 1) + checkNodeCheckResultReady(t, tc.Server(5).NodeID(), 0, resp.CheckedNodes[0]) +} + +// TestDecommissionPreCheckUnready tests the functionality of the +// DecommissionPreCheck endpoint with some nodes not ready. +func TestDecommissionPreCheckUnready(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderRace(t) // can't handle 7-node clusters + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // saves time + }) + defer tc.Stopper().Stop(ctx) + + // Add replicas to a node we will check. + // Scratch range should have RF=3, liveness range should have RF=5. + adminSrvIdx := 3 + decommissioningSrvIdx := 5 + scratchKey := tc.ScratchRange(t) + scratchDesc := tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx)) + livenessDesc := tc.LookupRangeOrFatal(t, keys.NodeLivenessPrefix) + livenessDesc = tc.AddVotersOrFatal(t, livenessDesc.StartKey.AsRawKey(), tc.Target(decommissioningSrvIdx)) + + adminSrv := tc.Server(adminSrvIdx) + decommissioningSrv := tc.Server(decommissioningSrvIdx) + conn, err := adminSrv.RPCContext().GRPCDialNode( + adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + + checkNodeReady := func(nID roachpb.NodeID, replicaCount int64, strict bool) { + resp, err := adminClient.DecommissionPreCheck(ctx, &serverpb.DecommissionPreCheckRequest{ + NodeIDs: []roachpb.NodeID{nID}, + StrictReadiness: strict, + }) + require.NoError(t, err) + require.Len(t, resp.CheckedNodes, 1) + checkNodeCheckResultReady(t, nID, replicaCount, resp.CheckedNodes[0]) + } + + awaitDecommissioned := func(nID roachpb.NodeID) { + testutils.SucceedsSoon(t, func() error { + livenesses, err := adminSrv.NodeLiveness().(*liveness.NodeLiveness).GetLivenessesFromKV(ctx) + if err != nil { + return err + } + for _, nodeLiveness := range livenesses { + if nodeLiveness.NodeID == nID { + if nodeLiveness.Membership == livenesspb.MembershipStatus_DECOMMISSIONED { + return nil + } else { + return errors.Errorf("n%d has membership: %s", nID, nodeLiveness.Membership) + } + } + } + return errors.Errorf("n%d liveness not found", nID) + }) + } + + checkAndDecommission := func(srvIdx int, replicaCount int64, strict bool) { + nID := tc.Server(srvIdx).NodeID() + checkNodeReady(nID, replicaCount, strict) + require.NoError(t, adminSrv.Decommission( + ctx, livenesspb.MembershipStatus_DECOMMISSIONING, []roachpb.NodeID{nID})) + require.NoError(t, adminSrv.Decommission( + ctx, livenesspb.MembershipStatus_DECOMMISSIONED, []roachpb.NodeID{nID})) + awaitDecommissioned(nID) + } + + // In non-strict mode, this decommission appears "ready". This is because the + // ranges with replicas on decommissioningSrv have priority action "AddVoter", + // and they have valid targets. + checkNodeReady(decommissioningSrv.NodeID(), 2, false) + + // In strict mode, we would expect the readiness check to fail. + resp, err := adminClient.DecommissionPreCheck(ctx, &serverpb.DecommissionPreCheckRequest{ + NodeIDs: []roachpb.NodeID{decommissioningSrv.NodeID()}, + NumReplicaReport: 50, + StrictReadiness: true, + CollectTraces: true, + }) + require.NoError(t, err) + nodeCheckResult := resp.CheckedNodes[0] + require.Equalf(t, serverpb.DecommissionPreCheckResponse_ALLOCATION_ERRORS, nodeCheckResult.DecommissionReadiness, + "expected n%d to have allocation errors, got %s", nodeCheckResult.NodeID, nodeCheckResult.DecommissionReadiness) + require.Len(t, nodeCheckResult.CheckedRanges, 2) + checkRangeCheckResult(t, livenessDesc, nodeCheckResult.CheckedRanges[0], + "add voter", "needs repair beyond replacing/removing", true, + ) + checkRangeCheckResult(t, scratchDesc, nodeCheckResult.CheckedRanges[1], + "add voter", "needs repair beyond replacing/removing", true, + ) + + // Add replicas to ensure we have the correct number of replicas for each range. + scratchDesc = tc.AddVotersOrFatal(t, scratchKey, tc.Target(adminSrvIdx)) + livenessDesc = tc.AddVotersOrFatal(t, livenessDesc.StartKey.AsRawKey(), + tc.Target(adminSrvIdx), tc.Target(4), tc.Target(6), + ) + require.True(t, hasReplicaOnServers(tc, &scratchDesc, 0, adminSrvIdx, decommissioningSrvIdx)) + require.True(t, hasReplicaOnServers(tc, &livenessDesc, 0, adminSrvIdx, decommissioningSrvIdx, 4, 6)) + require.Len(t, scratchDesc.InternalReplicas, 3) + require.Len(t, livenessDesc.InternalReplicas, 5) + + // Decommissioning pre-check should pass on decommissioningSrv in both strict + // and non-strict modes, as each range can find valid upreplication targets. + checkNodeReady(decommissioningSrv.NodeID(), 2, true) + + // Check and decommission empty nodes, decreasing to a 5-node cluster. + checkAndDecommission(1, 0, true) + checkAndDecommission(2, 0, true) + + // Check that we can still decommission. + // Below 5 nodes, system ranges will have an effective RF=3. + checkNodeReady(decommissioningSrv.NodeID(), 2, true) + + // Check that we can decommission the nodes with liveness replicas only. + checkAndDecommission(4, 1, true) + checkAndDecommission(6, 1, true) + + // Check range descriptors are as expected. + scratchDesc = tc.LookupRangeOrFatal(t, scratchDesc.StartKey.AsRawKey()) + livenessDesc = tc.LookupRangeOrFatal(t, livenessDesc.StartKey.AsRawKey()) + require.True(t, hasReplicaOnServers(tc, &scratchDesc, 0, adminSrvIdx, decommissioningSrvIdx)) + require.True(t, hasReplicaOnServers(tc, &livenessDesc, 0, adminSrvIdx, decommissioningSrvIdx, 4, 6)) + require.Len(t, scratchDesc.InternalReplicas, 3) + require.Len(t, livenessDesc.InternalReplicas, 5) + + // Cleanup orphaned liveness replicas and check. + livenessDesc = tc.RemoveVotersOrFatal(t, livenessDesc.StartKey.AsRawKey(), tc.Target(4), tc.Target(6)) + require.True(t, hasReplicaOnServers(tc, &livenessDesc, 0, adminSrvIdx, decommissioningSrvIdx)) + require.Len(t, livenessDesc.InternalReplicas, 3) + + // Validate that the node is not ready to decommission. + resp, err = adminClient.DecommissionPreCheck(ctx, &serverpb.DecommissionPreCheckRequest{ + NodeIDs: []roachpb.NodeID{decommissioningSrv.NodeID()}, + NumReplicaReport: 1, // Test that we limit errors. + StrictReadiness: true, + }) + require.NoError(t, err) + nodeCheckResult = resp.CheckedNodes[0] + require.Equalf(t, serverpb.DecommissionPreCheckResponse_ALLOCATION_ERRORS, nodeCheckResult.DecommissionReadiness, + "expected n%d to have allocation errors, got %s", nodeCheckResult.NodeID, nodeCheckResult.DecommissionReadiness) + require.Equal(t, int64(2), nodeCheckResult.ReplicaCount) + require.Len(t, nodeCheckResult.CheckedRanges, 1) + checkRangeCheckResult(t, livenessDesc, nodeCheckResult.CheckedRanges[0], + "replace decommissioning voter", + "0 of 2 live stores are able to take a new replica for the range "+ + "(2 already have a voter, 0 already have a non-voter); "+ + "likely not enough nodes in cluster", + false, + ) +} + +// TestDecommissionPreCheckMultiple tests the functionality of the +// DecommissionPreCheck endpoint with multiple nodes. +func TestDecommissionPreCheckMultiple(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // saves time + }) + defer tc.Stopper().Stop(ctx) + + // TODO(sarkesian): Once #95909 is merged, test checks on a 3-node decommission. + // e.g. Test both server idxs 3,4 and 2,3,4 (which should not pass checks). + adminSrvIdx := 1 + decommissioningSrvIdxs := []int{3, 4} + decommissioningSrvNodeIDs := make([]roachpb.NodeID, len(decommissioningSrvIdxs)) + for i, srvIdx := range decommissioningSrvIdxs { + decommissioningSrvNodeIDs[i] = tc.Server(srvIdx).NodeID() + } + + // Add replicas to nodes we will check. + // Scratch range should have RF=3, liveness range should have RF=5. + rangeDescs := []roachpb.RangeDescriptor{ + tc.LookupRangeOrFatal(t, keys.NodeLivenessPrefix), + tc.LookupRangeOrFatal(t, tc.ScratchRange(t)), + } + rangeDescSrvIdxs := [][]int{ + {0, 1, 2, 3, 4}, + {0, 3, 4}, + } + rangeDescSrvTargets := make([][]roachpb.ReplicationTarget, len(rangeDescs)) + for i, srvIdxs := range rangeDescSrvIdxs { + for _, srvIdx := range srvIdxs { + if srvIdx != 0 { + rangeDescSrvTargets[i] = append(rangeDescSrvTargets[i], tc.Target(srvIdx)) + } + } + } + + for i, rangeDesc := range rangeDescs { + rangeDescs[i] = tc.AddVotersOrFatal(t, rangeDesc.StartKey.AsRawKey(), rangeDescSrvTargets[i]...) + } + + for i, rangeDesc := range rangeDescs { + require.True(t, hasReplicaOnServers(tc, &rangeDesc, rangeDescSrvIdxs[i]...)) + require.Len(t, rangeDesc.InternalReplicas, len(rangeDescSrvIdxs[i])) + } + + adminSrv := tc.Server(adminSrvIdx) + conn, err := adminSrv.RPCContext().GRPCDialNode( + adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + + // We expect to be able to decommission the targeted nodes simultaneously. + resp, err := adminClient.DecommissionPreCheck(ctx, &serverpb.DecommissionPreCheckRequest{ + NodeIDs: decommissioningSrvNodeIDs, + NumReplicaReport: 50, + StrictReadiness: true, + CollectTraces: true, + }) + require.NoError(t, err) + require.Len(t, resp.CheckedNodes, len(decommissioningSrvIdxs)) + for i, nID := range decommissioningSrvNodeIDs { + checkNodeCheckResultReady(t, nID, int64(len(rangeDescs)), resp.CheckedNodes[i]) + } +} + +// TestDecommissionPreCheckInvalidNode tests the functionality of the +// DecommissionPreCheck endpoint where some nodes are invalid. +func TestDecommissionPreCheckInvalidNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // saves time + }) + defer tc.Stopper().Stop(ctx) + + adminSrvIdx := 1 + validDecommissioningNodeID := roachpb.NodeID(5) + invalidDecommissioningNodeID := roachpb.NodeID(34) + decommissioningNodeIDs := []roachpb.NodeID{validDecommissioningNodeID, invalidDecommissioningNodeID} + + // Add replicas to nodes we will check. + // Scratch range should have RF=3, liveness range should have RF=5. + rangeDescs := []roachpb.RangeDescriptor{ + tc.LookupRangeOrFatal(t, keys.NodeLivenessPrefix), + tc.LookupRangeOrFatal(t, tc.ScratchRange(t)), + } + rangeDescSrvIdxs := [][]int{ + {0, 1, 2, 3, 4}, + {0, 3, 4}, + } + rangeDescSrvTargets := make([][]roachpb.ReplicationTarget, len(rangeDescs)) + for i, srvIdxs := range rangeDescSrvIdxs { + for _, srvIdx := range srvIdxs { + if srvIdx != 0 { + rangeDescSrvTargets[i] = append(rangeDescSrvTargets[i], tc.Target(srvIdx)) + } + } + } + + for i, rangeDesc := range rangeDescs { + rangeDescs[i] = tc.AddVotersOrFatal(t, rangeDesc.StartKey.AsRawKey(), rangeDescSrvTargets[i]...) + } + + for i, rangeDesc := range rangeDescs { + require.True(t, hasReplicaOnServers(tc, &rangeDesc, rangeDescSrvIdxs[i]...)) + require.Len(t, rangeDesc.InternalReplicas, len(rangeDescSrvIdxs[i])) + } + + adminSrv := tc.Server(adminSrvIdx) + conn, err := adminSrv.RPCContext().GRPCDialNode( + adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + + // We expect the pre-check to fail as some node IDs are invalid. + resp, err := adminClient.DecommissionPreCheck(ctx, &serverpb.DecommissionPreCheckRequest{ + NodeIDs: decommissioningNodeIDs, + NumReplicaReport: 50, + StrictReadiness: true, + CollectTraces: true, + }) + require.NoError(t, err) + require.Len(t, resp.CheckedNodes, len(decommissioningNodeIDs)) + checkNodeCheckResultReady(t, validDecommissioningNodeID, int64(len(rangeDescs)), resp.CheckedNodes[0]) + require.Equal(t, serverpb.DecommissionPreCheckResponse_NodeCheckResult{ + NodeID: invalidDecommissioningNodeID, + DecommissionReadiness: serverpb.DecommissionPreCheckResponse_UNKNOWN, + LivenessStatus: livenesspb.NodeLivenessStatus_UNKNOWN, + ReplicaCount: 0, + CheckedRanges: nil, + }, resp.CheckedNodes[1]) } func TestDecommissionSelf(t *testing.T) { diff --git a/pkg/server/decommission.go b/pkg/server/decommission.go index fbef73325a67..e76722e0e19b 100644 --- a/pkg/server/decommission.go +++ b/pkg/server/decommission.go @@ -47,7 +47,7 @@ type decommissioningNodeMap struct { // and target for a single range that has an extant replica on a node targeted // for decommission. type decommissionRangeCheckResult struct { - desc *roachpb.RangeDescriptor + desc roachpb.RangeDescriptor action string tracingSpans tracingpb.Recording err error @@ -300,7 +300,7 @@ func evaluateRangeCheckResult( rErr error, ) (passed bool, _ decommissionRangeCheckResult) { checkResult := decommissionRangeCheckResult{ - desc: desc, + desc: *desc, action: action.String(), err: rErr, }