diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 2e9e67a2b1e9..edfae22673ef 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -7437,7 +7437,7 @@ Support status: [reserved](#support-status) | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | | statuses | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus) | repeated | Statuses contain a list of recovery statuses of nodes updated during recovery. It also contains nodes that were expected to be live (not decommissioned by recovery) but failed to return status response. | [reserved](#support-status) | -| unavailable_ranges | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.roachpb.RangeDescriptor) | repeated | Unavailable ranges contains descriptors of ranges that failed health checks. | [reserved](#support-status) | +| unavailable_ranges | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.roachpb.RangeDescriptor) | repeated | UnavailableRanges contains descriptors of ranges that failed health checks. | [reserved](#support-status) | | decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyResponse-int32) | repeated | DecommissionedNodeIDs contains list of decommissioned node id's. Only nodes that were decommissioned by the plan would be listed here, not all historically decommissioned ones. | [reserved](#support-status) | diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index 636bf7f49419..649e5b0c9348 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -17,6 +17,7 @@ import ( "io" "os" "path" + "sort" "strings" "github.com/cockroachdb/cockroach/pkg/base" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -170,7 +172,7 @@ Now the cluster could be started again. var recoverCommands = []*cobra.Command{ debugRecoverCollectInfoCmd, debugRecoverPlanCmd, - //debugRecoverStagePlan, + debugRecoverExecuteCmd, //debugRecoverVerify, } @@ -503,7 +505,7 @@ To stage recovery application in half-online mode invoke: Alternatively distribute plan to below nodes and invoke 'debug recover apply-plan --store= %s' on: `, remoteArgs, planFile, planFile) for _, node := range report.UpdatedNodes { - _, _ = fmt.Fprintf(stderr, "- node n%d, store(s) %s\n", node.NodeID, joinStoreIDs(node.StoreIDs)) + _, _ = fmt.Fprintf(stderr, "- node n%d, store(s) %s\n", node.NodeID, joinIDs("s", node.StoreIDs)) } return nil @@ -559,8 +561,10 @@ var debugRecoverExecuteOpts struct { // --confirm flag. // If action is confirmed, then all changes are committed to the storage. func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { - stopper := stop.NewStopper() - defer stopper.Stop(cmd.Context()) + // We need cancellable context here to obtain grpc client connection. + // getAdminClient will refuse otherwise. + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() planFile := args[0] data, err := os.ReadFile(planFile) @@ -574,6 +578,134 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to unmarshal plan from file %q", planFile) } + if len(debugRecoverExecuteOpts.Stores.Specs) == 0 { + return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates) + } + return applyRecoveryToLocalStore(ctx, nodeUpdates) +} + +func stageRecoveryOntoCluster( + ctx context.Context, cmd *cobra.Command, planFile string, plan loqrecoverypb.ReplicaUpdatePlan, +) error { + c, finish, err := getAdminClient(ctx, serverCfg) + if err != nil { + return errors.Wrapf(err, "failed to get admin connection to cluster") + } + defer finish() + + // Check existing plan on nodes + type planConflict struct { + nodeID roachpb.NodeID + planID string + } + vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + if err != nil { + return errors.Wrap(err, "failed to retrieve loss of quorum recovery status from cluster") + } + var conflicts []planConflict + for _, ns := range vr.Statuses { + if ns.PendingPlanID != nil && !ns.PendingPlanID.Equal(plan.PlanID) { + conflicts = append(conflicts, planConflict{nodeID: ns.NodeID, planID: ns.PendingPlanID.String()}) + } + } + + // Proposed report + _, _ = fmt.Fprintf(stderr, "Proposed changes in plan %s:\n", plan.PlanID) + for _, u := range plan.Updates { + _, _ = fmt.Fprintf(stderr, " range r%d:%s updating replica %s to %s and discarding all others.\n", + u.RangeID, roachpb.Key(u.StartKey), u.OldReplicaID, u.NextReplicaID) + } + _, _ = fmt.Fprintf(stderr, "\nNodes %s will be marked as decommissioned.\n", joinIDs("n", plan.DecommissionedNodeIDs)) + + if len(conflicts) > 0 { + _, _ = fmt.Fprintf(stderr, "\nConflicting staged plans will be replaced:\n") + for _, cp := range conflicts { + _, _ = fmt.Fprintf(stderr, " plan %s is staged on node n%d.\n", cp.planID, cp.nodeID) + } + } + _, _ = fmt.Fprintln(stderr) + + // Confirm actions + switch debugRecoverExecuteOpts.confirmAction { + case prompt: + _, _ = fmt.Fprintf(stderr, "\nProceed with staging plan [y/N] ") + reader := bufio.NewReader(os.Stdin) + line, err := reader.ReadString('\n') + if err != nil { + return errors.Wrap(err, "failed to read user input") + } + _, _ = fmt.Fprintf(stderr, "\n") + if len(line) < 1 || (line[0] != 'y' && line[0] != 'Y') { + _, _ = fmt.Fprint(stderr, "Aborted at user request\n") + return nil + } + case allYes: + // All actions enabled by default. + default: + return errors.New("Aborted by --confirm option") + } + + maybeWrapStagingError := func(msg string, res *serverpb.RecoveryStagePlanResponse, err error) error { + if err != nil { + return errors.Wrapf(err, "%s", msg) + } + if len(res.Errors) > 0 { + return errors.Newf("%s:\n%s", msg, strings.Join(res.Errors, "\n")) + } + return nil + } + + if len(conflicts) > 0 { + // We don't want to combine removing old plan and adding new one since it + // could produce cluster with multiple plans at the same time which could + // make situation worse. + res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true}) + if err := maybeWrapStagingError("failed removing existing loss of quorum replica recovery plan from cluster", res, err); err != nil { + return err + } + } + sr, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + if err := maybeWrapStagingError("failed to stage loss of quorum recovery plan on cluster", + sr, err); err != nil { + return err + } + + remoteArgs := getCLIClusterFlags(true, cmd, func(flag string) bool { + _, filter := planSpecificFlags[flag] + return filter + }) + + nodeSet := make(map[roachpb.NodeID]interface{}) + for _, r := range plan.Updates { + nodeSet[r.NodeID()] = struct{}{} + } + + _, _ = fmt.Fprintf(stderr, `Plan staged. To complete recovery restart nodes %s. + +To verify recovery status invoke: + +'cockroach debug recover verify %s %s' +`, joinIDs("n", sortedKeys(nodeSet)), remoteArgs, planFile) + return nil +} + +func sortedKeys[T ~int32](set map[T]any) []T { + var sorted []T + for k := range set { + sorted = append(sorted, k) + } + sort.Slice(sorted, func(i, j int) bool { + return sorted[i] < sorted[j] + }) + return sorted +} + +func applyRecoveryToLocalStore( + ctx context.Context, nodeUpdates loqrecoverypb.ReplicaUpdatePlan, +) error { + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + var localNodeID roachpb.NodeID batches := make(map[roachpb.StoreID]storage.Batch) for _, storeSpec := range debugRecoverExecuteOpts.Stores.Specs { @@ -586,7 +718,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { defer store.Close() defer batch.Close() - storeIdent, err := kvstorage.ReadStoreIdent(cmd.Context(), store) + storeIdent, err := kvstorage.ReadStoreIdent(ctx, store) if err != nil { return err } @@ -602,7 +734,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { updateTime := timeutil.Now() prepReport, err := loqrecovery.PrepareUpdateReplicas( - cmd.Context(), nodeUpdates, uuid.DefaultGenerator, updateTime, localNodeID, batches) + ctx, nodeUpdates, uuid.DefaultGenerator, updateTime, localNodeID, batches) if err != nil { return err } @@ -615,7 +747,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { if len(prepReport.UpdatedReplicas) == 0 { if len(prepReport.MissingStores) > 0 { return errors.Newf("stores %s expected on the node but no paths were provided", - joinStoreIDs(prepReport.MissingStores)) + joinIDs("s", prepReport.MissingStores)) } _, _ = fmt.Fprintf(stderr, "No updates planned on this node.\n") return nil @@ -653,14 +785,14 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { // Apply batches to the stores. applyReport, err := loqrecovery.CommitReplicaChanges(batches) - _, _ = fmt.Fprintf(stderr, "Updated store(s): %s\n", joinStoreIDs(applyReport.UpdatedStores)) + _, _ = fmt.Fprintf(stderr, "Updated store(s): %s\n", joinIDs("s", applyReport.UpdatedStores)) return err } -func joinStoreIDs(storeIDs []roachpb.StoreID) string { - storeNames := make([]string, 0, len(storeIDs)) - for _, id := range storeIDs { - storeNames = append(storeNames, fmt.Sprintf("s%d", id)) +func joinIDs[T ~int32](prefix string, ids []T) string { + storeNames := make([]string, 0, len(ids)) + for _, id := range ids { + storeNames = append(storeNames, fmt.Sprintf("%s%d", prefix, id)) } return strings.Join(storeNames, ", ") } @@ -681,7 +813,7 @@ func formatNodeStores(locations []loqrecovery.NodeStores, indent string) string nodeDetails := make([]string, 0, len(locations)) for _, node := range locations { nodeDetails = append(nodeDetails, - indent+fmt.Sprintf("n%d: store(s): %s", node.NodeID, joinStoreIDs(node.StoreIDs))) + indent+fmt.Sprintf("n%d: store(s): %s", node.NodeID, joinIDs("s", node.StoreIDs))) } return strings.Join(nodeDetails, "\n") } diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index 173ac0a1b860..394d649572e6 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -69,6 +69,7 @@ go_test( "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/stateloader", "//pkg/roachpb", + "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index 16e8e7759cec..b59205106947 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -12,6 +12,7 @@ package loqrecovery import ( "context" + "fmt" "io" "time" @@ -52,10 +53,13 @@ type visitNodesFn func(ctx context.Context, retryOpts retry.Options, ) error type Server struct { - nodeIDContainer *base.NodeIDContainer - stores *kvserver.Stores - visitNodes visitNodesFn - planStore PlanStore + nodeIDContainer *base.NodeIDContainer + clusterIDContainer *base.ClusterIDContainer + stores *kvserver.Stores + visitNodes visitNodesFn + planStore PlanStore + decommissionFn func(context.Context, roachpb.NodeID) error + metadataQueryTimeout time.Duration forwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error } @@ -68,6 +72,7 @@ func NewServer( loc roachpb.Locality, rpcCtx *rpc.Context, knobs base.ModuleTestingKnobs, + decommission func(context.Context, roachpb.NodeID) error, ) *Server { // Server side timeouts are necessary in recovery collector since we do best // effort operations where cluster info collection as an operation succeeds @@ -82,9 +87,11 @@ func NewServer( } return &Server{ nodeIDContainer: nodeIDContainer, + clusterIDContainer: rpcCtx.StorageClusterID, stores: stores, visitNodes: makeVisitAvailableNodes(g, loc, rpcCtx), planStore: planStore, + decommissionFn: decommission, metadataQueryTimeout: metadataQueryTimeout, forwardReplicaFilter: forwardReplicaFilter, } @@ -211,6 +218,142 @@ func (s Server) ServeClusterReplicas( }) } +func (s Server) StagePlan( + ctx context.Context, req *serverpb.RecoveryStagePlanRequest, +) (*serverpb.RecoveryStagePlanResponse, error) { + if !req.ForcePlan && req.Plan == nil { + return nil, errors.New("stage plan request can't be used with empty plan without force flag") + } + clusterID := s.clusterIDContainer.Get().String() + if req.Plan != nil && req.Plan.ClusterID != clusterID { + return nil, errors.Newf("attempting to stage plan from cluster %s on cluster %s", + req.Plan.ClusterID, clusterID) + } + + localNodeID := s.nodeIDContainer.Get() + // Create a plan copy with all empty fields to shortcut all plan nil checks + // below to avoid unnecessary nil checks. + var plan loqrecoverypb.ReplicaUpdatePlan + if req.Plan != nil { + plan = *req.Plan + } + if req.AllNodes { + // Scan cluster for conflicting recovery plans and for stray nodes that are + // planned for forced decommission, but rejoined cluster. + foundNodes := make(map[roachpb.NodeID]struct{}) + err := s.visitNodes( + ctx, + replicaInfoStreamRetryOptions, + func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) + if err != nil { + return errors.Mark(err, errMarkRetry) + } + // If operation fails here, we don't want to find all remaining + // violating nodes because cli must ensure that cluster is safe for + // staging. + if !req.ForcePlan && res.Status.PendingPlanID != nil && !res.Status.PendingPlanID.Equal(plan.PlanID) { + return errors.Newf("plan %s is already staged on node n%d", res.Status.PendingPlanID, nodeID) + } + foundNodes[nodeID] = struct{}{} + return nil + }) + if err != nil { + return nil, err + } + + // Check that no nodes that must be decommissioned are present. + for _, dID := range plan.DecommissionedNodeIDs { + if _, ok := foundNodes[dID]; ok { + return nil, errors.Newf("node n%d was planned for decommission, but is present in cluster", dID) + } + } + + // Check out that all nodes that should save plan are present. + for _, u := range plan.Updates { + if _, ok := foundNodes[u.NodeID()]; !ok { + return nil, errors.Newf("node n%d has planned changed but is unreachable in the cluster", u.NodeID()) + } + } + + // Distribute plan - this should not use fan out to available, but use + // list from previous step. + var nodeErrors []string + err = s.visitNodes( + ctx, + replicaInfoStreamRetryOptions, + func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + delete(foundNodes, nodeID) + res, err := client.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: req.Plan, + AllNodes: false, + ForcePlan: req.ForcePlan, + }) + if err != nil { + nodeErrors = append(nodeErrors, + errors.Wrapf(err, "failed staging the plan on node n%d", nodeID).Error()) + return nil + } + nodeErrors = append(nodeErrors, res.Errors...) + return nil + }) + if err != nil { + nodeErrors = append(nodeErrors, + errors.Wrapf(err, "failed to perform fan-out to cluster nodes from n%d", + localNodeID).Error()) + } + if len(foundNodes) > 0 { + // We didn't talk to some of originally found nodes. Need to report + // disappeared nodes as we don't know what is happening with the cluster. + for n := range foundNodes { + nodeErrors = append(nodeErrors, fmt.Sprintf("node n%d disappeared while performing plan staging operation", n)) + } + } + return &serverpb.RecoveryStagePlanResponse{Errors: nodeErrors}, nil + } + + log.Infof(ctx, "attempting to stage loss of quorum recovery plan") + + responseFromError := func(err error) (*serverpb.RecoveryStagePlanResponse, error) { + return &serverpb.RecoveryStagePlanResponse{ + Errors: []string{ + errors.Wrapf(err, "failed to stage plan on node n%d", localNodeID).Error(), + }, + }, nil + } + + existingPlan, exists, err := s.planStore.LoadPlan() + if err != nil { + return responseFromError(err) + } + if exists && !existingPlan.PlanID.Equal(plan.PlanID) && !req.ForcePlan { + return responseFromError(errors.Newf("conflicting plan %s is already staged", existingPlan.PlanID)) + } + + for _, node := range plan.DecommissionedNodeIDs { + if err := s.decommissionFn(ctx, node); err != nil { + return responseFromError(err) + } + } + + if req.ForcePlan { + if err := s.planStore.RemovePlan(); err != nil { + return responseFromError(err) + } + } + + for _, r := range plan.Updates { + if r.NodeID() == localNodeID { + if err := s.planStore.SavePlan(plan); err != nil { + return responseFromError(err) + } + break + } + } + + return &serverpb.RecoveryStagePlanResponse{}, nil +} + func (s Server) NodeStatus( _ context.Context, _ *serverpb.RecoveryNodeStatusRequest, ) (*serverpb.RecoveryNodeStatusResponse, error) { diff --git a/pkg/kv/kvserver/loqrecovery/server_test.go b/pkg/kv/kvserver/loqrecovery/server_test.go index 2c7c4c62d3e3..004e8393e87e 100644 --- a/pkg/kv/kvserver/loqrecovery/server_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_test.go @@ -12,6 +12,7 @@ package loqrecovery_test import ( "context" + "fmt" "strconv" "sync/atomic" "testing" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -96,8 +98,6 @@ func TestReplicaCollection(t *testing.T) { assertReplicas(2, true) tc.StopServer(1) assertReplicas(1, false) - - tc.Stopper().Stop(ctx) } // TestStreamRestart verifies that if connection is dropped mid way through @@ -161,8 +161,6 @@ func TestStreamRestart(t *testing.T) { } assertReplicas(3) - - tc.Stopper().Stop(ctx) } func getInfoCounters(info loqrecoverypb.ClusterReplicaInfo) clusterInfoCounters { @@ -190,33 +188,10 @@ func TestGetRecoveryState(t *testing.T) { ctx := context.Background() - reg := server.NewStickyInMemEnginesRegistry() + tc, reg, planStores := prepTestCluster(t) defer reg.CloseAllStickyInMemEngines() - - args := base.TestClusterArgs{ - ServerArgsPerNode: make(map[int]base.TestServerArgs), - } - for i := 0; i < 3; i++ { - args.ServerArgsPerNode[i] = base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - StickyEngineRegistry: reg, - }, - }, - StoreSpecs: []base.StoreSpec{ - { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), - }, - }, - } - } - tc := testcluster.NewTestCluster(t, 3, args) - tc.Start(t) defer tc.Stopper().Stop(ctx) - planStores := prepInMemPlanStores(t, args.ServerArgsPerNode) - adm, err := tc.GetAdminClient(ctx, t, 0) require.NoError(t, err, "failed to get admin client") @@ -227,9 +202,7 @@ func TestGetRecoveryState(t *testing.T) { } // Injecting plan into 2 nodes out of 3. - plan := loqrecoverypb.ReplicaUpdatePlan{ - PlanID: uuid.MakeV4(), - } + plan := makeTestRecoveryPlan(ctx, t, adm) for i := 0; i < 2; i++ { require.NoError(t, planStores[i].SavePlan(plan), "failed to save plan on node n%d", i) } @@ -238,9 +211,9 @@ func TestGetRecoveryState(t *testing.T) { resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) require.NoError(t, err) statuses := aggregateStatusByNode(resp) - require.Equal(t, &plan.PlanID, statuses[1].PendingPlanID, "incorrect plan id on node 0") - require.Equal(t, &plan.PlanID, statuses[2].PendingPlanID, "incorrect plan id on node 1") - require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id on node 2") + require.Equal(t, &plan.PlanID, statuses[1].PendingPlanID, "incorrect plan id on node 1") + require.Equal(t, &plan.PlanID, statuses[2].PendingPlanID, "incorrect plan id on node 2") + require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id on node 3") // Check we can collect partial results. tc.StopServer(1) @@ -271,6 +244,236 @@ func aggregateStatusByNode( return statuses } +func TestStageRecoveryPlans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _ := prepTestCluster(t) + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resp.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + sk := tc.ScratchRange(t) + + // Stage plan with update for node 3 using node 0 and check which nodes + // saved plan. + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 3), + } + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.NoError(t, err, "failed to stage plan") + require.Empty(t, res.Errors, "unexpected errors in stage response") + + // First we test that plans are successfully picked up by status call. + resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + statuses := aggregateStatusByNode(resp) + require.Nil(t, statuses[1].PendingPlanID, "unexpected plan id on node 1") + require.Nil(t, statuses[2].PendingPlanID, "unexpected plan id on node 2") + require.Equal(t, &plan.PlanID, statuses[3].PendingPlanID, "incorrect plan id on node 3") +} + +func TestStageConflictingPlans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _ := prepTestCluster(t) + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resp.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + sk := tc.ScratchRange(t) + + // Stage first plan. + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 3), + } + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.NoError(t, err, "failed to stage plan") + require.Empty(t, res.Errors, "unexpected errors in stage response") + + plan2 := makeTestRecoveryPlan(ctx, t, adm) + plan2.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 2), + } + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan2, AllNodes: true}) + require.ErrorContains(t, err, + fmt.Sprintf("plan %s is already staged on node n3", plan.PlanID.String()), + "conflicting plans must not be allowed") +} + +func TestForcePlanUpdate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _ := prepTestCluster(t) + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resV, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resV.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + sk := tc.ScratchRange(t) + + // Stage first plan. + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 3), + } + resS, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.NoError(t, err, "failed to stage plan") + require.Empty(t, resS.Errors, "unexpected errors in stage response") + + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true}) + require.NoError(t, err, "force plan should reset previous plans") + + // Verify that plan was successfully replaced by an empty one. + resV, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + statuses := aggregateStatusByNode(resV) + require.Nil(t, statuses[1].PendingPlanID, "unexpected plan id on node 1") + require.Nil(t, statuses[2].PendingPlanID, "unexpected plan id on node 2") + require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id on node 3") +} + +func TestNodeDecommissioned(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _ := prepTestCluster(t) + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + tc.StopServer(2) + + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.DecommissionedNodeIDs = []roachpb.NodeID{roachpb.NodeID(3)} + res, err := adm.RecoveryStagePlan(ctx, + &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.NoError(t, err, "failed to stage plan") + require.Empty(t, res.Errors, "unexpected errors in stage response") + + require.ErrorContains(t, tc.Server(0).RPCContext().OnOutgoingPing(ctx, &rpc.PingRequest{TargetNodeID: 3}), + "permanently removed from the cluster", "ping of decommissioned node should fail") +} + +func TestRejectDecommissionReachableNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _ := prepTestCluster(t) + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.DecommissionedNodeIDs = []roachpb.NodeID{roachpb.NodeID(3)} + _, err = adm.RecoveryStagePlan(ctx, + &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.ErrorContains(t, err, "was planned for decommission, but is present in cluster", + "staging plan decommissioning live nodes must not be allowed") +} + +func TestStageRecoveryPlansToWrongCluster(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _ := prepTestCluster(t) + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resp.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + sk := tc.ScratchRange(t) + + fakeClusterID, _ := uuid.NewV4() + // Stage plan with id of different cluster and see if error is raised. + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.ClusterID = fakeClusterID.String() + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 3), + } + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.ErrorContains(t, err, "attempting to stage plan from cluster", "failed to stage plan") +} + +func prepTestCluster( + t *testing.T, +) (*testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore) { + reg := server.NewStickyInMemEnginesRegistry() + + const nodes = 3 + args := base.TestClusterArgs{ + ServerArgsPerNode: make(map[int]base.TestServerArgs), + } + for i := 0; i < nodes; i++ { + args.ServerArgsPerNode[i] = base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: reg, + }, + }, + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + } + } + tc := testcluster.NewTestCluster(t, nodes, args) + tc.Start(t) + return tc, reg, prepInMemPlanStores(t, args.ServerArgsPerNode) +} + func prepInMemPlanStores( t *testing.T, serverArgs map[int]base.TestServerArgs, ) map[int]loqrecovery.PlanStore { @@ -283,3 +486,32 @@ func prepInMemPlanStores( } return pss } + +func createRecoveryForRange( + t *testing.T, tc *testcluster.TestCluster, key roachpb.Key, storeID int, +) loqrecoverypb.ReplicaUpdate { + rngD, err := tc.LookupRange(key) + require.NoError(t, err, "can't find range for key %s", key) + replD, ok := rngD.GetReplicaDescriptor(roachpb.StoreID(storeID)) + require.True(t, ok, "expecting scratch replica on node 3") + replD.ReplicaID += 10 + return loqrecoverypb.ReplicaUpdate{ + RangeID: rngD.RangeID, + StartKey: loqrecoverypb.RecoveryKey(rngD.StartKey), + OldReplicaID: replD.ReplicaID, + NewReplica: replD, + NextReplicaID: replD.ReplicaID + 1, + } +} + +func makeTestRecoveryPlan( + ctx context.Context, t *testing.T, ac serverpb.AdminClient, +) loqrecoverypb.ReplicaUpdatePlan { + t.Helper() + cr, err := ac.Cluster(ctx, &serverpb.ClusterRequest{}) + require.NoError(t, err, "failed to read cluster it") + return loqrecoverypb.ReplicaUpdatePlan{ + PlanID: uuid.MakeV4(), + ClusterID: cr.ClusterID, + } +} diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 278078aded1c..02b0c2d89e34 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -3265,7 +3265,14 @@ func (s *systemAdminServer) RecoveryCollectLocalReplicaInfo( func (s *systemAdminServer) RecoveryStagePlan( ctx context.Context, request *serverpb.RecoveryStagePlanRequest, ) (*serverpb.RecoveryStagePlanResponse, error) { - return nil, errors.AssertionFailedf("To be implemented by #93044") + ctx = s.server.AnnotateCtx(ctx) + _, err := s.requireAdminUser(ctx) + if err != nil { + return nil, err + } + + log.Ops.Info(ctx, "staging recovery plan") + return s.server.recoveryServer.StagePlan(ctx, request) } func (s *systemAdminServer) RecoveryNodeStatus( diff --git a/pkg/server/server.go b/pkg/server/server.go index db10760914d0..f60ee4d10008 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1034,6 +1034,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { cfg.Locality, rpcContext, cfg.TestingKnobs.LOQRecovery, + func(ctx context.Context, id roachpb.NodeID) error { + return nodeTombStorage.SetDecommissioned(ctx, id, timeutil.Now()) + }, ) *lateBoundServer = Server{