From 4b36ef5ce9c642ef5c126497a458aa03e0d3b5e6 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Mon, 23 Jan 2023 18:56:12 +0000 Subject: [PATCH] loqrecovery,cli: add debug recover verify command This commit adds debug recovery verify command which provides the status of loss of quorum recovery plan application status. The command is used after debug recover apply-plan was used to stage a recovery plan on a cluster to check application progress. It allows user to check which nodes still needs to be restarted, outcome of recovery on restarted nodes and health of ranges on the entire cluster. Release note: None --- docs/generated/http/full.md | 33 +- pkg/base/test_server_args.go | 6 + pkg/cli/BUILD.bazel | 1 + pkg/cli/debug_recover_loss_of_quorum.go | 285 ++++++- pkg/cli/debug_recover_loss_of_quorum_test.go | 187 +++++ pkg/kv/kvclient/kvcoord/dist_sender.go | 2 +- pkg/kv/kvserver/loqrecovery/BUILD.bazel | 6 + .../loqrecovery/loqrecoverypb/recovery.proto | 23 + pkg/kv/kvserver/loqrecovery/server.go | 331 +++++++-- .../loqrecovery/server_integration_test.go | 703 ++++++++++++++++++ pkg/kv/kvserver/loqrecovery/server_test.go | 600 +++------------ pkg/kv/kvserver/loqrecovery/utils.go | 7 +- pkg/server/admin.go | 2 +- pkg/server/loss_of_quorum.go | 5 +- pkg/server/server.go | 4 + pkg/server/serverpb/admin.go | 6 + pkg/server/serverpb/admin.proto | 29 +- pkg/testutils/BUILD.bazel | 2 + pkg/testutils/listener.go | 147 ++++ pkg/testutils/testcluster/testcluster.go | 32 +- pkg/util/grpcutil/grpc_util.go | 2 +- 21 files changed, 1837 insertions(+), 576 deletions(-) create mode 100644 pkg/kv/kvserver/loqrecovery/server_integration_test.go create mode 100644 pkg/testutils/listener.go diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 4e4eafa7525b..c7e5fc512de0 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -7613,6 +7613,7 @@ Support status: [reserved](#support-status) | ----- | ---- | ----- | ----------- | -------------- | | plan_id | [bytes](#cockroach.server.serverpb.RecoveryVerifyRequest-bytes) | | PlanID is ID of the plan to verify. | [reserved](#support-status) | | decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | repeated | DecommissionedNodeIDs is a set of nodes that should be marked as decommissioned in the cluster when loss of quorum recovery successfully applies. | [reserved](#support-status) | +| max_reported_ranges | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxReportedRanges is the maximum number of failed ranges to report. If more unhealthy ranges are found, error will be returned alongside range to indicate that ranges were cut short. | [reserved](#support-status) | @@ -7631,14 +7632,42 @@ Support status: [reserved](#support-status) | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | | statuses | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus) | repeated | Statuses contain a list of recovery statuses of nodes updated during recovery. It also contains nodes that were expected to be live (not decommissioned by recovery) but failed to return status response. | [reserved](#support-status) | -| unavailable_ranges | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.roachpb.RangeDescriptor) | repeated | UnavailableRanges contains descriptors of ranges that failed health checks. | [reserved](#support-status) | -| decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyResponse-int32) | repeated | DecommissionedNodeIDs contains list of decommissioned node id's. Only nodes that were decommissioned by the plan would be listed here, not all historically decommissioned ones. | [reserved](#support-status) | +| unavailable_ranges | [RecoveryVerifyResponse.UnavailableRanges](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.server.serverpb.RecoveryVerifyResponse.UnavailableRanges) | | UnavailableRanges contains information about ranges that failed health check. | [reserved](#support-status) | +| decommissioned_node_statuses | [RecoveryVerifyResponse.DecommissionedNodeStatusesEntry](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.server.serverpb.RecoveryVerifyResponse.DecommissionedNodeStatusesEntry) | repeated | DecommissionedNodeStatuses contains a map of requested IDs with their corresponding liveness statuses. | [reserved](#support-status) | + +#### RecoveryVerifyResponse.UnavailableRanges + + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| ranges | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.RangeRecoveryStatus](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.RangeRecoveryStatus) | repeated | Ranges contains descriptors of ranges that failed health check. If there are too many ranges to report, error would contain relevant message. | [reserved](#support-status) | +| error | [string](#cockroach.server.serverpb.RecoveryVerifyResponse-string) | | Error contains an optional error if ranges validation can't complete. | [reserved](#support-status) | + + + + + + +#### RecoveryVerifyResponse.DecommissionedNodeStatusesEntry + + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| key | [int32](#cockroach.server.serverpb.RecoveryVerifyResponse-int32) | | | | +| value | [cockroach.kv.kvserver.liveness.livenesspb.MembershipStatus](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.kv.kvserver.liveness.livenesspb.MembershipStatus) | | | | + + + + + ## ListTenants diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 7d98696274b5..989b50ed65b9 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -194,6 +194,12 @@ type TestClusterArgs struct { // A copy of an entry from this map will be copied to each individual server // and potentially adjusted according to ReplicationMode. ServerArgsPerNode map[int]TestServerArgs + + // If reusable listeners is true, then restart should keep listeners untouched + // so that servers are kept on the same ports. It is up to the test to provide + // proxy listeners that could be closed by test servers and then reused upon + // restart. + ReusableListeners bool } var ( diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index cd4e43ce6c32..bd5568b1e68d 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -399,6 +399,7 @@ go_test( "//pkg/util/stop", "//pkg/util/timeutil", "//pkg/util/tracing", + "//pkg/util/uuid", "//pkg/workload/examples", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index 9495ef009516..899ca9ed9119 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -104,7 +104,39 @@ recovery steps to verify the data and ensure database consistency should be taken ASAP. Those actions should be done at application level. 'debug recover' set of commands is used as a last resort to perform range -recovery operation. To perform recovery one should perform this sequence +recovery operation. + +Loss of quorum recovery could be performed in two modes half-online and +offline. Half-online approach is a preferred one but offline approach is +preserved for compatibility with any existing tooling that may exist. The +main difference between approaches is how information in cluster is collected +and distributed within the cluster and how many nodes needs to be restarted +during recovery. + +To perform recovery using half-online approach one should perform this sequence +of actions: + +1. Run 'cockroach debug recover make-plan' in a half-online mode to collect +replica information from surviving nodes of a cluster and decide which +replicas should survive and up-replicate. + +2. Run 'cockroach debug recover apply-plan' in half online mode to distribute +plan to surviving cluster nodes for application. At this point plan is staged +and can't be reverted. + +3. Follow instructions from apply plan to perform a rolling restart of nodes +that need to update their storage. Restart should be done using appropriate +automation that used to run the cluster. + +4. Optionally use 'cockroach debug recover verify' to check recovery progress +and resulting range health. + +If it was possible to produce distribute and apply the plan, then cluster should +become operational again. It is not guaranteed that there's no data loss +and that all database consistency was not compromised. + +If for whatever reasons half-online approach is not feasible or fails when +collecting info or distributing recovery plans, one could perform this sequence of actions: 0. Decommission failed nodes preemptively to eliminate the possibility of @@ -124,7 +156,7 @@ should be collected and made locally available for the next step. on step 1. Planner will decide which replicas should survive and up-replicate. -4. Run 'cockroach debug recover execute-plan' on every node using plan +4. Run 'cockroach debug recover apply-plan' on every node using plan generated on the previous step. Each node will pick relevant portion of the plan and update local replicas accordingly to restore quorum. @@ -134,7 +166,28 @@ If it was possible to produce and apply the plan, then cluster should become operational again. It is not guaranteed that there's no data loss and that all database consistency was not compromised. -Example run: +Example run #1 (half-online mode): + +If we have a cluster of 5 nodes 1-5 where we lost nodes 3 and 4. Each node +has two stores and they are numbered as 1,2 on node 1; 3,4 on node 2 etc. +Recovery commands to recover unavailable ranges would be (most command output +is omitted for brevity): + +[cockroach@admin ~]$ cockroach debug recover make-plan --host cockroach-1.cockroachlabs.com --certs-dir=root_certs -o recovery-plan.json + +[cockroach@admin ~]$ cockroach debug recover apply-plan --host cockroach-1.cockroachlabs.com --certs-dir=root_certs recovery-plan.json + +Proceed with staging plan [y/N] y + +Plan staged. To complete recovery restart nodes n2, n3. + +[cockroach@admin ~]$ # restart-nodes 2 3 as instructed by apply-plan. + +[cockroach@admin ~]$ cockroach debug recover verify --host cockroach-1.cockroachlabs.com --certs-dir=root_certs recovery-plan.json + +Loss of quorum recovery is complete. + +Example run #2 (offline mode): If we have a cluster of 5 nodes 1-5 where we lost nodes 3 and 4. Each node has two stores and they are numbered as 1,2 on node 1; 3,4 on node 2 etc. @@ -174,14 +227,15 @@ var recoverCommands = []*cobra.Command{ debugRecoverCollectInfoCmd, debugRecoverPlanCmd, debugRecoverExecuteCmd, - //debugRecoverVerify, + debugRecoverVerifyCmd, } func init() { debugRecoverCmd.AddCommand( debugRecoverCollectInfoCmd, debugRecoverPlanCmd, - debugRecoverExecuteCmd) + debugRecoverExecuteCmd, + debugRecoverVerifyCmd) } var debugRecoverCollectInfoCmd = &cobra.Command{ @@ -791,6 +845,227 @@ func applyRecoveryToLocalStore( return err } +var debugRecoverVerifyCmd = &cobra.Command{ + Use: "verify [plan-file]", + Short: "verify loss of quorum recovery application status", + Long: ` +Check cluster loss of quorum recovery state. + +Verify command will check if all nodes applied recovery plan and that all +necessary nodes are decommissioned. + +If invoked without a plan file, command will print status of all nodes in the +cluster. + +The address of a single healthy cluster node must be provided using the --host +flag. This designated node will retrieve and check status of all nodes in the +cluster. + +See debug recover command help for more details on how to use this command. +`, + Args: cobra.MaximumNArgs(1), + RunE: runDebugVerify, +} + +func runDebugVerify(cmd *cobra.Command, args []string) error { + // We must have cancellable context here to obtain grpc client connection. + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + var updatePlan loqrecoverypb.ReplicaUpdatePlan + if len(args) > 0 { + planFile := args[0] + data, err := os.ReadFile(planFile) + if err != nil { + return errors.Wrapf(err, "failed to read plan file %q", planFile) + } + jsonpb := protoutil.JSONPb{Indent: " "} + if err = jsonpb.Unmarshal(data, &updatePlan); err != nil { + return errors.Wrapf(err, "failed to unmarshal plan from file %q", planFile) + } + } + + // Plan statuses. + if len(updatePlan.Updates) > 0 { + _, _ = fmt.Printf("Checking application of recovery plan %s\n", updatePlan.PlanID) + } + + c, finish, err := getAdminClient(ctx, serverCfg) + if err != nil { + return errors.Wrapf(err, "failed to get admin connection to cluster") + } + defer finish() + req := serverpb.RecoveryVerifyRequest{ + DecommissionedNodeIDs: updatePlan.DecommissionedNodeIDs, + MaxReportedRanges: 20, + } + // Maybe switch to non-nullable? + if !updatePlan.PlanID.Equal(uuid.UUID{}) { + req.PendingPlanID = &updatePlan.PlanID + } + res, err := c.RecoveryVerify(ctx, &req) + if err != nil { + return errors.Wrapf(err, "failed to retrieve replica info from cluster") + } + + if len(res.UnavailableRanges.Ranges) > 0 { + _, _ = fmt.Fprintf(stderr, "Unavailable ranges:\n") + for _, d := range res.UnavailableRanges.Ranges { + _, _ = fmt.Fprintf(stderr, " r%d : %s, start key %s\n", + d.RangeID, d.FailureType, d.StartKey) + } + } + if res.UnavailableRanges.Error != "" { + _, _ = fmt.Fprintf(stderr, "Failed to complete range health check: %s\n", + res.UnavailableRanges.Error) + } + + diff := diffPlanWithNodeStatus(updatePlan, res.Statuses) + if len(diff.report) > 0 { + if len(updatePlan.Updates) > 0 { + _, _ = fmt.Fprintf(stderr, "Recovery plan application progress:\n") + } else { + _, _ = fmt.Fprintf(stderr, "Recovery plans:\n") + } + } + for _, line := range diff.report { + _, _ = fmt.Fprintf(stderr, "%s\n", line) + } + + // Node statuses. + allDecommissioned := true + var b strings.Builder + for id, status := range res.DecommissionedNodeStatuses { + if !status.Decommissioned() { + b.WriteString(fmt.Sprintf(" n%d: %s\n", id, status)) + allDecommissioned = false + } + } + if len(res.DecommissionedNodeStatuses) > 0 { + if allDecommissioned { + _, _ = fmt.Fprintf(stderr, "All dead nodes are decommissioned.\n") + } else { + _, _ = fmt.Fprintf(stderr, "Nodes not yet decommissioned:\n%s", b.String()) + } + } + + if len(updatePlan.Updates) > 0 { + if !allDecommissioned || diff.pending > 0 { + return errors.New("loss of quorum recovery is not finished yet") + } + if diff.errors > 0 || !res.UnavailableRanges.Empty() { + return errors.New("loss of quorum recovery did not fully succeed") + } + _, _ = fmt.Fprintf(stderr, "Loss of quorum recovery is complete.\n") + } else { + if diff.errors > 0 || !res.UnavailableRanges.Empty() { + return errors.New("cluster has unhealthy ranges") + } + } + return nil +} + +type clusterDiff struct { + report []string + pending int + errors int +} + +func (d *clusterDiff) append(line string) { + d.report = append(d.report, line) +} + +func (d *clusterDiff) appendPending(line string) { + d.report = append(d.report, line) + d.pending++ +} + +func (d *clusterDiff) appendError(line string) { + d.report = append(d.report, line) + d.errors++ +} + +func diffPlanWithNodeStatus( + updatePlan loqrecoverypb.ReplicaUpdatePlan, nodes []loqrecoverypb.NodeRecoveryStatus, +) clusterDiff { + var result clusterDiff + + nodesWithPlan := make(map[roachpb.NodeID]interface{}) + for _, r := range updatePlan.Updates { + nodesWithPlan[r.NodeID()] = struct{}{} + } + + // Sort statuses by node id for ease of readability. + sort.Slice(nodes, func(i, j int) bool { + return nodes[i].NodeID < nodes[j].NodeID + }) + // Plan statuses. + if len(nodesWithPlan) > 0 { + // Invoked with plan, need to verify application of concrete plan to the + // cluster. + for _, status := range nodes { + if _, ok := nodesWithPlan[status.NodeID]; ok { + // Nodes that we expect plan to be pending or applied. + switch { + case status.AppliedPlanID != nil && status.AppliedPlanID.Equal(updatePlan.PlanID) && status.Error != "": + result.appendError(fmt.Sprintf(" plan application failed on node n%d: %s", status.NodeID, status.Error)) + case status.AppliedPlanID != nil && status.AppliedPlanID.Equal(updatePlan.PlanID): + result.append(fmt.Sprintf(" plan applied successfully on node n%d", status.NodeID)) + case status.PendingPlanID != nil && status.PendingPlanID.Equal(updatePlan.PlanID): + result.appendPending(fmt.Sprintf(" plan application pending on node n%d", status.NodeID)) + case status.PendingPlanID != nil: + result.appendError(fmt.Sprintf(" unexpected staged plan %s on node n%d", *status.PendingPlanID, status.NodeID)) + case status.PendingPlanID == nil: + result.appendError(fmt.Sprintf(" failed to find staged plan on node n%d", status.NodeID)) + } + delete(nodesWithPlan, status.NodeID) + } else { + switch { + case status.PendingPlanID != nil && status.PendingPlanID.Equal(updatePlan.PlanID): + result.appendError(fmt.Sprintf(" plan staged on n%d but no replicas is planned for update on the node", status.NodeID)) + case status.PendingPlanID != nil: + result.appendError(fmt.Sprintf(" unexpected staged plan %s on node n%d", *status.PendingPlanID, status.NodeID)) + } + } + } + // Check if any nodes that must have a plan staged or applied are missing + // from received node statuses. + var missing []roachpb.NodeID + for k := range nodesWithPlan { + missing = append(missing, k) + } + sort.Slice(missing, func(i, j int) bool { + return missing[i] < missing[j] + }) + for _, id := range missing { + result.appendError(fmt.Sprintf(" failed to find node n%d where plan must be staged", id)) + } + } else { + // Invoked without a plan, just dump collected information without making + // any conclusions. + for _, status := range nodes { + if status.PendingPlanID != nil { + result.append(fmt.Sprintf(" node n%d staged plan: %s", status.NodeID, + *status.PendingPlanID)) + } + switch { + case status.Error != "" && status.AppliedPlanID != nil: + result.append(fmt.Sprintf(" node n%d failed to apply plan %s: %s", status.NodeID, + *status.AppliedPlanID, status.Error)) + case status.Error != "": + result.append(fmt.Sprintf(" node n%d failed to apply plan: %s", status.NodeID, + status.Error)) + case status.AppliedPlanID != nil: + result.append(fmt.Sprintf(" node n%d applied plan: %s at %s", status.NodeID, + *status.AppliedPlanID, status.ApplyTimestamp)) + } + } + } + return result +} + func formatNodeStores(locations []loqrecovery.NodeStores, indent string) string { hasMultiStore := false for _, v := range locations { diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index 3e32fe099fde..a96fe45bbfb2 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -15,6 +15,7 @@ import ( "fmt" "os" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -31,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -379,3 +381,188 @@ func TestJsonSerialization(t *testing.T) { require.NoError(t, jsonpb.Unmarshal(data, &crFromJSON)) require.Equal(t, nr, crFromJSON, "objects before and after serialization") } + +func TestUpdatePlanVsClusterDiff(t *testing.T) { + defer leaktest.AfterTest(t)() + + var empty uuid.UUID + planID, _ := uuid.FromString("123e4567-e89b-12d3-a456-426614174000") + otherPlanID, _ := uuid.FromString("123e4567-e89b-12d3-a456-426614174001") + applyTime, _ := time.Parse(time.RFC3339, "2023-01-24T10:30:00Z") + + status := func(id roachpb.NodeID, pending, applied uuid.UUID, err string) loqrecoverypb.NodeRecoveryStatus { + s := loqrecoverypb.NodeRecoveryStatus{ + NodeID: id, + } + if !pending.Equal(empty) { + s.PendingPlanID = &pending + } + if !applied.Equal(empty) { + s.AppliedPlanID = &applied + s.ApplyTimestamp = &applyTime + } + s.Error = err + return s + } + + for _, d := range []struct { + name string + nodes []int + status []loqrecoverypb.NodeRecoveryStatus + pending int + errors int + report []string + }{ + { + name: "after staging", + nodes: []int{1, 2, 3}, + status: []loqrecoverypb.NodeRecoveryStatus{ + status(1, planID, empty, ""), + status(2, planID, empty, ""), + status(3, planID, empty, ""), + }, + pending: 3, + report: []string{ + " plan application pending on node n1", + " plan application pending on node n2", + " plan application pending on node n3", + }, + }, + { + name: "partially applied", + nodes: []int{1, 2, 3}, + status: []loqrecoverypb.NodeRecoveryStatus{ + status(1, planID, empty, ""), + status(2, empty, planID, ""), + status(3, planID, empty, ""), + }, + pending: 2, + report: []string{ + " plan application pending on node n1", + " plan applied successfully on node n2", + " plan application pending on node n3", + }, + }, + { + name: "fully applied", + nodes: []int{1, 2, 3}, + status: []loqrecoverypb.NodeRecoveryStatus{ + status(1, empty, planID, ""), + status(2, empty, planID, ""), + status(3, empty, planID, ""), + }, + report: []string{ + " plan applied successfully on node n1", + " plan applied successfully on node n2", + " plan applied successfully on node n3", + }, + }, + { + name: "staging lost no node", + nodes: []int{1, 2, 3}, + status: []loqrecoverypb.NodeRecoveryStatus{ + status(1, planID, empty, ""), + status(3, planID, empty, ""), + }, + pending: 2, + errors: 1, + report: []string{ + " plan application pending on node n1", + " plan application pending on node n3", + " failed to find node n2 where plan must be staged", + }, + }, + { + name: "staging lost no plan", + nodes: []int{1, 2, 3}, + status: []loqrecoverypb.NodeRecoveryStatus{ + status(1, planID, empty, ""), + status(2, planID, empty, ""), + status(3, empty, empty, ""), + }, + pending: 2, + errors: 1, + report: []string{ + " plan application pending on node n1", + " plan application pending on node n2", + " failed to find staged plan on node n3", + }, + }, + { + name: "partial failure", + nodes: []int{1, 2, 3}, + status: []loqrecoverypb.NodeRecoveryStatus{ + status(1, planID, empty, ""), + status(2, empty, planID, "found stale replica"), + status(3, planID, empty, ""), + }, + pending: 2, + errors: 1, + report: []string{ + " plan application pending on node n1", + " plan application failed on node n2: found stale replica", + " plan application pending on node n3", + }, + }, + { + name: "no plan", + status: []loqrecoverypb.NodeRecoveryStatus{ + status(1, planID, empty, ""), + status(2, empty, planID, "found stale replica"), + status(3, empty, otherPlanID, ""), + }, + report: []string{ + " node n1 staged plan: 123e4567-e89b-12d3-a456-426614174000", + " node n2 failed to apply plan 123e4567-e89b-12d3-a456-426614174000: found stale replica", + " node n3 applied plan: 123e4567-e89b-12d3-a456-426614174001 at 2023-01-24 10:30:00 +0000 UTC", + }, + }, + { + name: "wrong plan", + nodes: []int{1, 2}, + status: []loqrecoverypb.NodeRecoveryStatus{ + status(1, planID, empty, ""), + status(2, otherPlanID, empty, ""), + status(3, otherPlanID, empty, ""), + }, + pending: 1, + errors: 2, + report: []string{ + " plan application pending on node n1", + " unexpected staged plan 123e4567-e89b-12d3-a456-426614174001 on node n2", + " unexpected staged plan 123e4567-e89b-12d3-a456-426614174001 on node n3", + }, + }, + } { + t.Run(d.name, func(t *testing.T) { + plan := loqrecoverypb.ReplicaUpdatePlan{ + PlanID: planID, + } + // Plan will contain single replica update for each requested node. + rangeSeq := 1 + for _, id := range d.nodes { + plan.Updates = append(plan.Updates, loqrecoverypb.ReplicaUpdate{ + RangeID: roachpb.RangeID(rangeSeq), + StartKey: nil, + OldReplicaID: roachpb.ReplicaID(1), + NewReplica: roachpb.ReplicaDescriptor{ + NodeID: roachpb.NodeID(id), + StoreID: roachpb.StoreID(id), + ReplicaID: roachpb.ReplicaID(rangeSeq + 17), + }, + NextReplicaID: roachpb.ReplicaID(rangeSeq + 18), + }) + } + + diff := diffPlanWithNodeStatus(plan, d.status) + require.Equal(t, d.pending, diff.pending, "number of pending changes") + require.Equal(t, d.errors, diff.errors, "number of node errors") + if d.report != nil { + require.Equal(t, len(d.report), len(diff.report), "number of lines in diff") + for i := range d.report { + require.Equal(t, d.report[i], diff.report[i], "wrong line %d of report", i) + } + } + }) + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 2eb47402abfa..61a6aea1394c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -542,7 +542,7 @@ func (ds *DistSender) RangeDescriptorCache() *rangecache.RangeCache { // The client has some control over the consistency of the lookup. The // acceptable values for the consistency argument are INCONSISTENT // or READ_UNCOMMITTED. We use INCONSISTENT for an optimistic lookup -// pass. If we don't fine a new enough descriptor, we do a leaseholder +// pass. If we don't find a new enough descriptor, we do a leaseholder // read at READ_UNCOMMITTED in order to read intents as well as committed // values. The reason for this is that it's not clear whether the intent // or the previous value points to the correct location of the Range. It gets diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index 88936905672d..d3f9f428155c 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/kvstorage", + "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/stateloader", @@ -33,6 +34,7 @@ go_library( "//pkg/util/contextutil", "//pkg/util/grpcutil", "//pkg/util/hlc", + "//pkg/util/iterutil", "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/retry", @@ -42,6 +44,7 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//vfs", "@io_etcd_go_raft_v3//raftpb", + "@org_golang_google_grpc//:go_default_library", ], ) @@ -53,6 +56,7 @@ go_test( "record_test.go", "recovery_env_test.go", "recovery_test.go", + "server_integration_test.go", "server_test.go", "store_test.go", ], @@ -76,11 +80,13 @@ go_test( "//pkg/server", "//pkg/server/serverpb", "//pkg/settings/cluster", + "//pkg/spanconfig", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/keysutil", diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto index 6471811e2d87..f9471e4edfcd 100644 --- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto +++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto @@ -185,6 +185,29 @@ message PlanApplicationResult { string error = 3; } +// RangeHealth is a state of a range availability from loss of quorum recovery +// perspective. +enum RangeHealth { + // Healthy indicates that range is available. + Healthy = 0; + // WaitingForMeta means that range is not yet readable according to meta + // descriptor, but has a recovered replica that is waiting to be up-replicated. + WaitingForMeta = 1; + // LossOfQuorum means that range is unavailable because it lost quorum and + // no replicas are fixed to act as a designated survivor (yet). + LossOfQuorum = 2; +} + +// RangeRecoveryStatus contains recovery status of a single range. +message RangeRecoveryStatus { + int64 range_id = 1 [(gogoproto.customname) = "RangeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; + bytes start_key = 2 [ + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key", + (gogoproto.moretags) = 'yaml:"StartKey"']; + RangeHealth failure_type = 3; +} + // DeferredRecoveryActions contains data for recovery actions that need to be // performed after node restarts if it applied a recovery plan. message DeferredRecoveryActions { diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index b59205106947..cce38211c705 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -21,22 +21,27 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "google.golang.org/grpc" ) const rangeMetadataScanChunkSize = 100 -var replicaInfoStreamRetryOptions = retry.Options{ +const retrieveNodeStatusTimeout = 30 * time.Second +const retrieveKeyspaceHealthTimeout = time.Minute + +var fanOutConnectionRetryOptions = retry.Options{ MaxRetries: 3, InitialBackoff: time.Second, Multiplier: 1, @@ -48,17 +53,24 @@ func IsRetryableError(err error) bool { return errors.Is(err, errMarkRetry) } -type visitNodesFn func(ctx context.Context, retryOpts retry.Options, +type visitNodeAdminFn func(ctx context.Context, retryOpts retry.Options, + nodeFilter func(nodeID roachpb.NodeID) bool, visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error, ) error +type visitNodeStatusFn func(ctx context.Context, nodeID roachpb.NodeID, retryOpts retry.Options, + visitor func(client serverpb.StatusClient) error, +) error + type Server struct { - nodeIDContainer *base.NodeIDContainer - clusterIDContainer *base.ClusterIDContainer - stores *kvserver.Stores - visitNodes visitNodesFn - planStore PlanStore - decommissionFn func(context.Context, roachpb.NodeID) error + nodeIDContainer *base.NodeIDContainer + clusterIDContainer *base.ClusterIDContainer + stores *kvserver.Stores + visitAdminNodes visitNodeAdminFn + visitStatusNode visitNodeStatusFn + planStore PlanStore + decommissionFn func(context.Context, roachpb.NodeID) error + decommissionStatusFn func(context.Context, roachpb.NodeID) (livenesspb.MembershipStatus, bool) metadataQueryTimeout time.Duration forwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error @@ -73,6 +85,7 @@ func NewServer( rpcCtx *rpc.Context, knobs base.ModuleTestingKnobs, decommission func(context.Context, roachpb.NodeID) error, + decommissionStatusFn func(context.Context, roachpb.NodeID) (livenesspb.MembershipStatus, bool), ) *Server { // Server side timeouts are necessary in recovery collector since we do best // effort operations where cluster info collection as an operation succeeds @@ -89,9 +102,11 @@ func NewServer( nodeIDContainer: nodeIDContainer, clusterIDContainer: rpcCtx.StorageClusterID, stores: stores, - visitNodes: makeVisitAvailableNodes(g, loc, rpcCtx), + visitAdminNodes: makeVisitAvailableNodes(g, loc, rpcCtx), + visitStatusNode: makeVisitNode(g, loc, rpcCtx), planStore: planStore, decommissionFn: decommission, + decommissionStatusFn: decommissionStatusFn, metadataQueryTimeout: metadataQueryTimeout, forwardReplicaFilter: forwardReplicaFilter, } @@ -128,7 +143,7 @@ func (s Server) ServeClusterReplicas( } }() - err = contextutil.RunWithTimeout(ctx, "scan-range-descriptors", s.metadataQueryTimeout, + err = contextutil.RunWithTimeout(ctx, "scan range descriptors", s.metadataQueryTimeout, func(txnCtx context.Context) error { txn := kvDB.NewTxn(txnCtx, "scan-range-descriptors") if err := txn.SetFixedTimestamp(txnCtx, kvDB.Clock().Now()); err != nil { @@ -167,8 +182,9 @@ func (s Server) ServeClusterReplicas( } // Stream local replica info from all nodes wrapping them in response stream. - return s.visitNodes(ctx, - replicaInfoStreamRetryOptions, + return s.visitAdminNodes(ctx, + fanOutConnectionRetryOptions, + allNodes, func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { log.Infof(ctx, "trying to get info from node n%d", nodeID) nodeReplicas := 0 @@ -240,10 +256,11 @@ func (s Server) StagePlan( if req.AllNodes { // Scan cluster for conflicting recovery plans and for stray nodes that are // planned for forced decommission, but rejoined cluster. - foundNodes := make(map[roachpb.NodeID]struct{}) - err := s.visitNodes( + foundNodes := make(map[roachpb.NodeID]bool) + err := s.visitAdminNodes( ctx, - replicaInfoStreamRetryOptions, + fanOutConnectionRetryOptions, + allNodes, func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) if err != nil { @@ -255,7 +272,7 @@ func (s Server) StagePlan( if !req.ForcePlan && res.Status.PendingPlanID != nil && !res.Status.PendingPlanID.Equal(plan.PlanID) { return errors.Newf("plan %s is already staged on node n%d", res.Status.PendingPlanID, nodeID) } - foundNodes[nodeID] = struct{}{} + foundNodes[nodeID] = true return nil }) if err != nil { @@ -264,14 +281,14 @@ func (s Server) StagePlan( // Check that no nodes that must be decommissioned are present. for _, dID := range plan.DecommissionedNodeIDs { - if _, ok := foundNodes[dID]; ok { + if foundNodes[dID] { return nil, errors.Newf("node n%d was planned for decommission, but is present in cluster", dID) } } // Check out that all nodes that should save plan are present. for _, u := range plan.Updates { - if _, ok := foundNodes[u.NodeID()]; !ok { + if !foundNodes[u.NodeID()] { return nil, errors.Newf("node n%d has planned changed but is unreachable in the cluster", u.NodeID()) } } @@ -279,9 +296,10 @@ func (s Server) StagePlan( // Distribute plan - this should not use fan out to available, but use // list from previous step. var nodeErrors []string - err = s.visitNodes( + err = s.visitAdminNodes( ctx, - replicaInfoStreamRetryOptions, + fanOutConnectionRetryOptions, + onlyListed(foundNodes), func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { delete(foundNodes, nodeID) res, err := client.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ @@ -355,62 +373,227 @@ func (s Server) StagePlan( } func (s Server) NodeStatus( - _ context.Context, _ *serverpb.RecoveryNodeStatusRequest, + ctx context.Context, _ *serverpb.RecoveryNodeStatusRequest, ) (*serverpb.RecoveryNodeStatusResponse, error) { - // TODO: report full status. + status := loqrecoverypb.NodeRecoveryStatus{ + NodeID: s.nodeIDContainer.Get(), + } plan, exists, err := s.planStore.LoadPlan() if err != nil { return nil, err } - var planID *uuid.UUID if exists { - planID = &plan.PlanID + status.PendingPlanID = &plan.PlanID + } + err = s.stores.VisitStores(func(s *kvserver.Store) error { + r, ok, err := readNodeRecoveryStatusInfo(ctx, s.Engine()) + if err != nil { + return err + } + if ok { + status.AppliedPlanID = &r.AppliedPlanID + status.ApplyTimestamp = &r.ApplyTimestamp + status.Error = r.Error + return iterutil.StopIteration() + } + return nil + }) + if err = iterutil.Map(err); err != nil { + log.Errorf(ctx, "failed to read loss of quorum recovery application status %s", err) + return nil, err } + return &serverpb.RecoveryNodeStatusResponse{ - Status: loqrecoverypb.NodeRecoveryStatus{ - NodeID: s.nodeIDContainer.Get(), - PendingPlanID: planID, - }, + Status: status, }, nil } func (s Server) Verify( - ctx context.Context, request *serverpb.RecoveryVerifyRequest, + ctx context.Context, + req *serverpb.RecoveryVerifyRequest, + liveNodes livenesspb.IsLiveMap, + db *kv.DB, ) (*serverpb.RecoveryVerifyResponse, error) { var nss []loqrecoverypb.NodeRecoveryStatus - err := s.visitNodes(ctx, replicaInfoStreamRetryOptions, + err := s.visitAdminNodes(ctx, fanOutConnectionRetryOptions, + notListed(req.DecommissionedNodeIDs), func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) - if err != nil { - return errors.Mark(errors.Wrapf(err, "failed to retrieve status of n%d", nodeID), errMarkRetry) - } - nss = append(nss, res.Status) - return nil + return contextutil.RunWithTimeout(ctx, fmt.Sprintf("retrieve status of n%d", nodeID), + retrieveNodeStatusTimeout, + func(ctx context.Context) error { + res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) + if err != nil { + return errors.Mark(errors.Wrapf(err, "failed to retrieve status of n%d", nodeID), + errMarkRetry) + } + nss = append(nss, res.Status) + return nil + }) }) if err != nil { return nil, err } - // TODO: retrieve status of requested nodes (for decommission check) - // TODO: retrieve unavailable ranges report + decomStatus := make(map[roachpb.NodeID]livenesspb.MembershipStatus) + decomNodes := make(map[roachpb.NodeID]interface{}) + for _, plannedID := range req.DecommissionedNodeIDs { + decomNodes[plannedID] = struct{}{} + if ns, ok := s.decommissionStatusFn(ctx, plannedID); ok { + decomStatus[plannedID] = ns + } + } + + isNodeLive := func(rd roachpb.ReplicaDescriptor) bool { + // Preemptively remove dead nodes as they would return Forbidden error if + // liveness is not stale enough. + if _, removed := decomNodes[rd.NodeID]; removed { + return false + } + l, ok := liveNodes[rd.NodeID] + return ok && l.IsLive + } + + getRangeInfo := func( + ctx context.Context, rID roachpb.RangeID, nID roachpb.NodeID, + ) (serverpb.RangeInfo, error) { + var info serverpb.RangeInfo + err := s.visitStatusNode(ctx, nID, fanOutConnectionRetryOptions, + func(c serverpb.StatusClient) error { + resp, err := c.Range(ctx, &serverpb.RangeRequest{RangeId: int64(rID)}) + if err != nil { + return err + } + res := resp.ResponsesByNodeID[nID] + if len(res.Infos) > 0 { + info = res.Infos[0] + return nil + } + return errors.Newf("range r%d not found on node n%d", rID, nID) + }) + if err != nil { + return serverpb.RangeInfo{}, err + } + return info, nil + } + + // Note that rangeCheckErr is a partial error, so we may have subset of ranges + // and an error, both of them will go to response. + unavailable, rangeCheckErr := func() ([]loqrecoverypb.RangeRecoveryStatus, error) { + var unavailable []loqrecoverypb.RangeRecoveryStatus + if req.MaxReportedRanges == 0 { + return nil, nil + } + err := contextutil.RunWithTimeout(ctx, "retrieve ranges health", retrieveKeyspaceHealthTimeout, + func(ctx context.Context) error { + start := keys.Meta2Prefix + for { + kvs, err := db.Scan(ctx, start, keys.MetaMax, rangeMetadataScanChunkSize) + if err != nil { + return err + } + if len(kvs) == 0 { + break + } + var endKey roachpb.Key + for _, rangeDescKV := range kvs { + endKey = rangeDescKV.Key + var d roachpb.RangeDescriptor + if err := rangeDescKV.ValueProto(&d); err != nil { + continue + } + h := checkRangeHealth(ctx, d, isNodeLive, getRangeInfo) + if h != loqrecoverypb.RangeHealth_Healthy { + if len(unavailable) >= int(req.MaxReportedRanges) { + return errors.Newf("found more failed ranges than limit %d", + req.MaxReportedRanges) + } + unavailable = append(unavailable, loqrecoverypb.RangeRecoveryStatus{ + RangeID: d.RangeID, + StartKey: d.StartKey.AsRawKey(), + FailureType: h, + }) + } + } + start = endKey.Next() + } + return nil + }) + // Note: we are returning partial results and an error in case we time out + // or hit unavailability or scan limit + return unavailable, err + }() + rangeHealth := serverpb.RecoveryVerifyResponse_UnavailableRanges{ + Ranges: unavailable, + } + if rangeCheckErr != nil { + rangeHealth.Error = rangeCheckErr.Error() + } + return &serverpb.RecoveryVerifyResponse{ - Statuses: nss, + Statuses: nss, + DecommissionedNodeStatuses: decomStatus, + UnavailableRanges: rangeHealth, }, nil } +func checkRangeHealth( + ctx context.Context, + d roachpb.RangeDescriptor, + liveFunc func(rd roachpb.ReplicaDescriptor) bool, + rangeInfo func(ctx context.Context, id roachpb.RangeID, nID roachpb.NodeID) (serverpb.RangeInfo, error), +) loqrecoverypb.RangeHealth { + if d.Replicas().CanMakeProgress(liveFunc) { + return loqrecoverypb.RangeHealth_Healthy + } + log.Infof(ctx, "probing range %s as it can't make progress", d) + stuckReplica := false + healthyReplica := false + for _, r := range d.Replicas().Descriptors() { + // Check if node is in deleted nodes first. + log.Infof(ctx, "checking replica %s", r) + if liveFunc(r) { + info, err := rangeInfo(ctx, d.RangeID, r.NodeID) + if err != nil { + log.Infof(ctx, "failed to get range status for replica") + // We can't reach node which is reported as live, skip this replica + // for now and check if remaining nodes could serve the range. + continue + } + canMakeProgress := info.State.Desc.Replicas().CanMakeProgress(liveFunc) + + healthyReplica = healthyReplica || canMakeProgress + stuckReplica = stuckReplica || !canMakeProgress + log.Infof(ctx, "can make progress %t", canMakeProgress) + } + } + // If we have a leaseholder that can't make progress it could block all + // operations on the range. Upreplication of healthy replica will update + // meta and resolve the issue. + if stuckReplica && healthyReplica { + return loqrecoverypb.RangeHealth_WaitingForMeta + } + // If we have healthy replica and no stuck replicas then this replica + // will respond. + if healthyReplica && !stuckReplica { + return loqrecoverypb.RangeHealth_Healthy + } + return loqrecoverypb.RangeHealth_LossOfQuorum +} + func makeVisitAvailableNodes( g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context, -) visitNodesFn { +) visitNodeAdminFn { return func(ctx context.Context, retryOpts retry.Options, + nodeFilter func(nodeID roachpb.NodeID) bool, visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error, ) error { - collectNodeWithRetry := func(node roachpb.NodeDescriptor) error { + visitWithRetry := func(node roachpb.NodeDescriptor) error { var err error for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt()) addr := node.AddressForLocality(loc) - conn, err := rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).Connect(ctx) - client := serverpb.NewAdminClient(conn) + var conn *grpc.ClientConn + conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).Connect(ctx) // Nodes would contain dead nodes that we don't need to visit. We can skip // them and let caller handle incomplete info. if err != nil { @@ -421,13 +604,13 @@ func makeVisitAvailableNodes( // live. continue } + client := serverpb.NewAdminClient(conn) err = visitor(node.NodeID, client) if err == nil { return nil } log.Infof(ctx, "failed calling a visitor for node n%d: %s", node.NodeID, err) if !IsRetryableError(err) { - // For non retryable errors abort immediately. return err } } @@ -448,7 +631,7 @@ func makeVisitAvailableNodes( // Don't use node descriptors with NodeID 0, because that's meant to // indicate that the node has been removed from the cluster. - if d.NodeID != 0 { + if d.NodeID != 0 && nodeFilter(d.NodeID) { nodes = append(nodes, d) } @@ -458,10 +641,64 @@ func makeVisitAvailableNodes( } for _, node := range nodes { - if err := collectNodeWithRetry(node); err != nil { + if err := visitWithRetry(node); err != nil { return err } } return nil } } + +func makeVisitNode(g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context) visitNodeStatusFn { + return func(ctx context.Context, nodeID roachpb.NodeID, retryOpts retry.Options, + visitor func(client serverpb.StatusClient) error, + ) error { + node, err := g.GetNodeDescriptor(nodeID) + if err != nil { + return err + } + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt()) + addr := node.AddressForLocality(loc) + var conn *grpc.ClientConn + conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).Connect(ctx) + if err != nil { + if grpcutil.IsClosedConnection(err) { + return err + } + // Retry any other transient connection flakes. + continue + } + client := serverpb.NewStatusClient(conn) + err = visitor(client) + if err == nil { + return nil + } + log.Infof(ctx, "failed calling a visitor for node n%d: %s", node.NodeID, err) + if !IsRetryableError(err) { + return err + } + } + return err + } +} + +func allNodes(roachpb.NodeID) bool { + return true +} + +func onlyListed(nodes map[roachpb.NodeID]bool) func(id roachpb.NodeID) bool { + return func(id roachpb.NodeID) bool { + return nodes[id] + } +} + +func notListed(ids []roachpb.NodeID) func(id roachpb.NodeID) bool { + ignored := make(map[roachpb.NodeID]bool) + for _, id := range ids { + ignored[id] = true + } + return func(id roachpb.NodeID) bool { + return !ignored[id] + } +} diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go new file mode 100644 index 000000000000..caaf0995355c --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -0,0 +1,703 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery_test + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +type clusterInfoCounters struct { + nodes, stores, replicas, descriptors int +} + +func TestReplicaCollection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{{InMemory: true}}, + Insecure: true, + Knobs: base.TestingKnobs{ + LOQRecovery: &loqrecovery.TestingKnobs{ + MetadataScanTimeout: 15 * time.Second, + }, + }, + }, + }) + tc.Start(t) + defer tc.Stopper().Stop(ctx) + require.NoError(t, tc.WaitForFullReplication()) + tc.ToggleReplicateQueues(false) + + r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases") + var totalRanges int + require.NoError(t, r.Scan(&totalRanges), "failed to query range count") + adm, err := tc.GetAdminClient(ctx, t, 2) + require.NoError(t, err, "failed to get admin client") + + // Collect and assert replica metadata. For expectMeta case we sometimes have + // meta and sometimes doesn't depending on which node holds the lease. + // We just ignore descriptor counts if we are not expecting meta. + assertReplicas := func(liveNodes int, expectMeta bool) { + var replicas loqrecoverypb.ClusterReplicaInfo + var stats loqrecovery.CollectionStats + + replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + require.NoError(t, err, "failed to retrieve replica info") + + // Check counters on retrieved replica info. + cnt := getInfoCounters(replicas) + require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores") + require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes") + if expectMeta { + require.Equal(t, totalRanges, cnt.descriptors, + "number of collected descriptors from metadata") + } + require.Equal(t, totalRanges*liveNodes, cnt.replicas, "number of collected replicas") + // Check stats counters as well. + require.Equal(t, liveNodes, stats.Nodes, "node counter stats") + require.Equal(t, liveNodes, stats.Stores, "store counter stats") + if expectMeta { + require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats") + } + require.NotEqual(t, replicas.ClusterID, uuid.UUID{}.String(), "cluster UUID must not be empty") + } + + tc.StopServer(0) + assertReplicas(2, true) + tc.StopServer(1) + assertReplicas(1, false) +} + +// TestStreamRestart verifies that if connection is dropped mid way through +// replica stream, it would be handled correctly with a stream restart that +// allows caller to rewind back partial replica data and receive consistent +// stream of replcia infos. +func TestStreamRestart(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + var failCount atomic.Int64 + tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{{InMemory: true}}, + Insecure: true, + Knobs: base.TestingKnobs{ + LOQRecovery: &loqrecovery.TestingKnobs{ + MetadataScanTimeout: 15 * time.Second, + ForwardReplicaFilter: func(response *serverpb.RecoveryCollectLocalReplicaInfoResponse) error { + if response.ReplicaInfo.NodeID == 2 && response.ReplicaInfo.Desc.RangeID == 14 && failCount.Add(1) < 3 { + return errors.New("rpc stream stopped") + } + return nil + }, + }, + }, + }, + }) + tc.Start(t) + defer tc.Stopper().Stop(ctx) + require.NoError(t, tc.WaitForFullReplication()) + tc.ToggleReplicateQueues(false) + + r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases") + var totalRanges int + require.NoError(t, r.Scan(&totalRanges), "failed to query range count") + adm, err := tc.GetAdminClient(ctx, t, 2) + require.NoError(t, err, "failed to get admin client") + + assertReplicas := func(liveNodes int) { + var replicas loqrecoverypb.ClusterReplicaInfo + var stats loqrecovery.CollectionStats + + replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + require.NoError(t, err, "failed to retrieve replica info") + + // Check counters on retrieved replica info. + cnt := getInfoCounters(replicas) + require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores") + require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes") + require.Equal(t, totalRanges, cnt.descriptors, + "number of collected descriptors from metadata") + require.Equal(t, totalRanges*liveNodes, cnt.replicas, + "number of collected replicas") + // Check stats counters as well. + require.Equal(t, liveNodes, stats.Nodes, "node counter stats") + require.Equal(t, liveNodes, stats.Stores, "store counter stats") + require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats") + } + + assertReplicas(3) +} + +func getInfoCounters(info loqrecoverypb.ClusterReplicaInfo) clusterInfoCounters { + stores := map[roachpb.StoreID]interface{}{} + nodes := map[roachpb.NodeID]interface{}{} + totalReplicas := 0 + for _, nr := range info.LocalInfo { + for _, r := range nr.Replicas { + stores[r.StoreID] = struct{}{} + nodes[r.NodeID] = struct{}{} + } + totalReplicas += len(nr.Replicas) + } + return clusterInfoCounters{ + nodes: len(nodes), + stores: len(stores), + replicas: totalReplicas, + descriptors: len(info.Descriptors), + } +} + +func TestGetPlanStagingState(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, planStores, lReg := prepTestCluster(t, 3) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resp.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + // Injecting plan into 2 nodes out of 3. + plan := makeTestRecoveryPlan(ctx, t, adm) + for i := 0; i < 2; i++ { + require.NoError(t, planStores[i].SavePlan(plan), "failed to save plan on node n%d", i) + } + + // First we test that plans are successfully picked up by status call. + resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + statuses := aggregateStatusByNode(resp) + require.Equal(t, &plan.PlanID, statuses[1].PendingPlanID, "incorrect plan id on node 1") + require.Equal(t, &plan.PlanID, statuses[2].PendingPlanID, "incorrect plan id on node 2") + require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id on node 3") + + // Check we can collect partial results. + tc.StopServer(1) + + testutils.SucceedsSoon(t, func() error { + resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + if err != nil { + return err + } + if len(resp.Statuses) > 2 { + return errors.New("too many statuses in response") + } + return nil + }) + + statuses = aggregateStatusByNode(resp) + require.Equal(t, &plan.PlanID, statuses[1].PendingPlanID, "incorrect plan id") + require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id") +} + +func aggregateStatusByNode( + resp *serverpb.RecoveryVerifyResponse, +) map[roachpb.NodeID]loqrecoverypb.NodeRecoveryStatus { + statuses := make(map[roachpb.NodeID]loqrecoverypb.NodeRecoveryStatus) + for _, s := range resp.Statuses { + statuses[s.NodeID] = s + } + return statuses +} + +func TestStageRecoveryPlans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 3) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resp.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + sk := tc.ScratchRange(t) + + // Stage plan with update for node 3 using node 0 and check which nodes + // saved plan. + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 3), + } + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.NoError(t, err, "failed to stage plan") + require.Empty(t, res.Errors, "unexpected errors in stage response") + + // First we test that plans are successfully picked up by status call. + resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + statuses := aggregateStatusByNode(resp) + require.Nil(t, statuses[1].PendingPlanID, "unexpected plan id on node 1") + require.Nil(t, statuses[2].PendingPlanID, "unexpected plan id on node 2") + require.Equal(t, &plan.PlanID, statuses[3].PendingPlanID, "incorrect plan id on node 3") +} + +func TestStageConflictingPlans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 3) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resp.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + sk := tc.ScratchRange(t) + + // Stage first plan. + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 3), + } + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.NoError(t, err, "failed to stage plan") + require.Empty(t, res.Errors, "unexpected errors in stage response") + + plan2 := makeTestRecoveryPlan(ctx, t, adm) + plan2.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 2), + } + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan2, AllNodes: true}) + require.ErrorContains(t, err, + fmt.Sprintf("plan %s is already staged on node n3", plan.PlanID.String()), + "conflicting plans must not be allowed") +} + +func TestForcePlanUpdate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 3) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resV, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resV.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + sk := tc.ScratchRange(t) + + // Stage first plan. + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 3), + } + resS, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.NoError(t, err, "failed to stage plan") + require.Empty(t, resS.Errors, "unexpected errors in stage response") + + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true}) + require.NoError(t, err, "force plan should reset previous plans") + + // Verify that plan was successfully replaced by an empty one. + resV, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + statuses := aggregateStatusByNode(resV) + require.Nil(t, statuses[1].PendingPlanID, "unexpected plan id on node 1") + require.Nil(t, statuses[2].PendingPlanID, "unexpected plan id on node 2") + require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id on node 3") +} + +func TestNodeDecommissioned(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 3) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + tc.StopServer(2) + + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.DecommissionedNodeIDs = []roachpb.NodeID{roachpb.NodeID(3)} + testutils.SucceedsSoon(t, func() error { + res, err := adm.RecoveryStagePlan(ctx, + &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + if err != nil { + return err + } + if len(res.Errors) > 0 { + return errors.Newf("failed to stage plan: %s", strings.Join(res.Errors, "; ")) + } + return nil + }) + + require.ErrorContains(t, tc.Server(0).RPCContext().OnOutgoingPing(ctx, &rpc.PingRequest{TargetNodeID: 3}), + "permanently removed from the cluster", "ping of decommissioned node should fail") +} + +func TestRejectDecommissionReachableNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 3) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.DecommissionedNodeIDs = []roachpb.NodeID{roachpb.NodeID(3)} + _, err = adm.RecoveryStagePlan(ctx, + &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.ErrorContains(t, err, "was planned for decommission, but is present in cluster", + "staging plan decommissioning live nodes must not be allowed") +} + +func TestStageRecoveryPlansToWrongCluster(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 3) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err) + for _, s := range resp.Statuses { + require.Nil(t, s.PendingPlanID, "no pending plan") + } + + sk := tc.ScratchRange(t) + + fakeClusterID, _ := uuid.NewV4() + // Stage plan with id of different cluster and see if error is raised. + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.ClusterID = fakeClusterID.String() + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 3), + } + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.ErrorContains(t, err, "attempting to stage plan from cluster", "failed to stage plan") +} + +func TestRetrieveRangeStatus(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 5) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + // Use scratch range to ensure we have a range that loses quorum. + sk := tc.ScratchRange(t) + require.NoError(t, tc.WaitForFullReplication(), "failed to wait for full replication") + tc.ToggleReplicateQueues(false) + tc.SplitRangeOrFatal(t, testutils.MakeKey(sk, []byte{255})) + + d := tc.LookupRangeOrFatal(t, sk) + + rs := d.Replicas().Voters().Descriptors() + require.Equal(t, 3, len(rs), "Number of scratch replicas") + + // Kill 2 of 3 scratch replicas. + tc.StopServer(int(rs[0].NodeID - 1)) + tc.StopServer(int(rs[1].NodeID - 1)) + + admServer := int(rs[2].NodeID - 1) + adm, err := tc.GetAdminClient(ctx, t, admServer) + require.NoError(t, err, "failed to get admin client") + + r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ + DecommissionedNodeIDs: []roachpb.NodeID{rs[0].NodeID, rs[1].NodeID}, + MaxReportedRanges: 999, + }) + require.NoError(t, err, "failed to get range status") + + // We verify that at least scratch range lost its quorum at this point. + func() { + for _, status := range r.UnavailableRanges.Ranges { + if status.RangeID == d.RangeID { + require.Equal(t, loqrecoverypb.RangeHealth_LossOfQuorum.String(), status.FailureType.String()) + return + } + } + t.Fatal("failed to find scratch range in unavailable ranges") + }() + require.Empty(t, r.UnavailableRanges.Error, "should have no error") + + // Try to check if limiting number of ranges will produce error on too many ranges. + r, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ + DecommissionedNodeIDs: []roachpb.NodeID{rs[0].NodeID, rs[1].NodeID}, + MaxReportedRanges: 1, + }) + require.NoError(t, err, "failed to get range status") + require.Equal(t, r.UnavailableRanges.Error, "found more failed ranges than limit 1") +} + +func TestRetrieveApplyStatus(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 5) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + // Use scratch range to ensure we have a range that loses quorum. + sk := tc.ScratchRange(t) + require.NoError(t, tc.WaitForFullReplication(), "failed to wait for full replication") + tc.ToggleReplicateQueues(false) + d := tc.LookupRangeOrFatal(t, sk) + + rs := d.Replicas().Voters().Descriptors() + require.Equal(t, 3, len(rs), "Number of scratch replicas") + + admServer := int(rs[2].NodeID - 1) + // Move liveness lease to a node that is not killed, otherwise test takes + // very long time to finish. + ld := tc.LookupRangeOrFatal(t, keys.NodeLivenessPrefix) + tc.TransferRangeLeaseOrFatal(t, ld, tc.Target(admServer)) + + tc.StopServer(int(rs[0].NodeID - 1)) + tc.StopServer(int(rs[1].NodeID - 1)) + + adm, err := tc.GetAdminClient(ctx, t, admServer) + require.NoError(t, err, "failed to get admin client") + + var replicas loqrecoverypb.ClusterReplicaInfo + testutils.SucceedsSoon(t, func() error { + var err error + replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + return err + }) + plan, planDetails, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator) + require.NoError(t, err, "failed to create a plan") + testutils.SucceedsSoon(t, func() error { + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + if err != nil { + return err + } + if errMsg := strings.Join(res.Errors, ", "); len(errMsg) > 0 { + return errors.Newf("%s", errMsg) + } + return nil + }) + + r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ + DecommissionedNodeIDs: plan.DecommissionedNodeIDs, + }) + + require.NoError(t, err, "failed to run recovery verify") + updates := make(map[roachpb.NodeID]interface{}) + for _, n := range planDetails.UpdatedNodes { + updates[n.NodeID] = struct{}{} + } + staged := 0 + for _, s := range r.Statuses { + if s.PendingPlanID != nil { + require.Equal(t, plan.PlanID, *s.PendingPlanID, "wrong plan staged") + require.Contains(t, updates, s.NodeID, + "plan should be staged on nodes where changes are planned") + staged++ + } + } + require.Equal(t, len(planDetails.UpdatedNodes), staged, "number of staged plans") + + for _, id := range planDetails.UpdatedNodes { + tc.StopServer(int(id.NodeID - 1)) + require.NoError(t, lReg.Reopen(int(id.NodeID-1)), "failed to reopen listen socket") + require.NoError(t, tc.RestartServer(int(id.NodeID-1)), "failed to restart node") + } + + // Need to get new client connection as we one we used belonged to killed + // server. + adm, err = tc.GetAdminClient(ctx, t, admServer) + require.NoError(t, err, "failed to get admin client") + r, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ + PendingPlanID: &plan.PlanID, + DecommissionedNodeIDs: plan.DecommissionedNodeIDs, + }) + require.NoError(t, err, "failed to run recovery verify") + applied := 0 + for _, s := range r.Statuses { + if s.AppliedPlanID != nil { + require.Equal(t, plan.PlanID, *s.AppliedPlanID, "wrong plan staged") + require.Contains(t, updates, s.NodeID, + "plan should be staged on nodes where changes are planned") + applied++ + } + } + require.Equal(t, len(planDetails.UpdatedNodes), applied, "number of applied plans") +} + +func prepTestCluster( + t *testing.T, nodes int, +) ( + *testcluster.TestCluster, + server.StickyInMemEnginesRegistry, + map[int]loqrecovery.PlanStore, + testutils.ListenerRegistry, +) { + skip.UnderStressRace(t, "cluster frequently fails to start under stress race") + + reg := server.NewStickyInMemEnginesRegistry() + + lReg := testutils.NewListenerRegistry() + + args := base.TestClusterArgs{ + ServerArgsPerNode: make(map[int]base.TestServerArgs), + ReusableListeners: true, + } + for i := 0; i < nodes; i++ { + args.ServerArgsPerNode[i] = base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: reg, + }, + SpanConfig: &spanconfig.TestingKnobs{ + ConfigureScratchRange: true, + }, + }, + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Listener: lReg.Get(t, i), + } + } + tc := testcluster.NewTestCluster(t, nodes, args) + tc.Start(t) + return tc, reg, prepInMemPlanStores(t, args.ServerArgsPerNode), lReg +} + +func prepInMemPlanStores( + t *testing.T, serverArgs map[int]base.TestServerArgs, +) map[int]loqrecovery.PlanStore { + pss := make(map[int]loqrecovery.PlanStore) + for id, args := range serverArgs { + reg := args.Knobs.Server.(*server.TestingKnobs).StickyEngineRegistry + store, err := reg.GetUnderlyingFS(args.StoreSpecs[0]) + require.NoError(t, err, "can't create loq recovery plan store") + pss[id] = loqrecovery.NewPlanStore(".", store) + } + return pss +} + +func createRecoveryForRange( + t *testing.T, tc *testcluster.TestCluster, key roachpb.Key, storeID int, +) loqrecoverypb.ReplicaUpdate { + rngD, err := tc.LookupRange(key) + require.NoError(t, err, "can't find range for key %s", key) + replD, ok := rngD.GetReplicaDescriptor(roachpb.StoreID(storeID)) + require.True(t, ok, "expecting scratch replica on node 3") + replD.ReplicaID += 10 + return loqrecoverypb.ReplicaUpdate{ + RangeID: rngD.RangeID, + StartKey: loqrecoverypb.RecoveryKey(rngD.StartKey), + OldReplicaID: replD.ReplicaID, + NewReplica: replD, + NextReplicaID: replD.ReplicaID + 1, + } +} + +func makeTestRecoveryPlan( + ctx context.Context, t *testing.T, ac serverpb.AdminClient, +) loqrecoverypb.ReplicaUpdatePlan { + t.Helper() + cr, err := ac.Cluster(ctx, &serverpb.ClusterRequest{}) + require.NoError(t, err, "failed to read cluster it") + return loqrecoverypb.ReplicaUpdatePlan{ + PlanID: uuid.MakeV4(), + ClusterID: cr.ClusterID, + } +} diff --git a/pkg/kv/kvserver/loqrecovery/server_test.go b/pkg/kv/kvserver/loqrecovery/server_test.go index 004e8393e87e..f204f11beb34 100644 --- a/pkg/kv/kvserver/loqrecovery/server_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The Cockroach Authors. +// Copyright 2023 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,510 +8,134 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package loqrecovery_test +package loqrecovery import ( "context" - "fmt" - "strconv" - "sync/atomic" "testing" - "time" - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) -type clusterInfoCounters struct { - nodes, stores, replicas, descriptors int -} - -func TestReplicaCollection(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - +func TestProbeRangeHealth(t *testing.T) { ctx := context.Background() - - tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - StoreSpecs: []base.StoreSpec{{InMemory: true}}, - Insecure: true, - Knobs: base.TestingKnobs{ - LOQRecovery: &loqrecovery.TestingKnobs{ - MetadataScanTimeout: 15 * time.Second, - }, + for _, d := range []struct { + name string + metaNodes []roachpb.NodeID + replicaNodes map[roachpb.NodeID][]roachpb.NodeID + deadNodes map[roachpb.NodeID]interface{} + result loqrecoverypb.RangeHealth + }{ + { + name: "meta healthy", + metaNodes: nodes(1, 2, 3), + result: loqrecoverypb.RangeHealth_Healthy, + }, + { + name: "no live replicas", + metaNodes: nodes(1, 2, 3), + deadNodes: nodeSet(1, 2, 3), + result: loqrecoverypb.RangeHealth_LossOfQuorum, + }, + { + name: "no replicas can make progress", + metaNodes: nodes(1, 2, 3, 4, 5), + replicaNodes: map[roachpb.NodeID][]roachpb.NodeID{ + 1: nodes(1, 2, 3, 4, 5), + 2: nodes(1, 2, 3, 4, 5), }, + deadNodes: nodeSet(3, 4, 5), + result: loqrecoverypb.RangeHealth_LossOfQuorum, }, - }) - tc.Start(t) - defer tc.Stopper().Stop(ctx) - require.NoError(t, tc.WaitForFullReplication()) - tc.ToggleReplicateQueues(false) - - r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases") - var totalRanges int - require.NoError(t, r.Scan(&totalRanges), "failed to query range count") - adm, err := tc.GetAdminClient(ctx, t, 2) - require.NoError(t, err, "failed to get admin client") - - // Collect and assert replica metadata. For expectMeta case we sometimes have - // meta and sometimes doesn't depending on which node holds the lease. - // We just ignore descriptor counts if we are not expecting meta. - assertReplicas := func(liveNodes int, expectMeta bool) { - var replicas loqrecoverypb.ClusterReplicaInfo - var stats loqrecovery.CollectionStats - - replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) - require.NoError(t, err, "failed to retrieve replica info") - - // Check counters on retrieved replica info. - cnt := getInfoCounters(replicas) - require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores") - require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes") - if expectMeta { - require.Equal(t, totalRanges, cnt.descriptors, - "number of collected descriptors from metadata") - } - require.Equal(t, totalRanges*liveNodes, cnt.replicas, "number of collected replicas") - // Check stats counters as well. - require.Equal(t, liveNodes, stats.Nodes, "node counter stats") - require.Equal(t, liveNodes, stats.Stores, "store counter stats") - if expectMeta { - require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats") - } - require.NotEqual(t, replicas.ClusterID, uuid.UUID{}.String(), "cluster UUID must not be empty") - } - - tc.StopServer(0) - assertReplicas(2, true) - tc.StopServer(1) - assertReplicas(1, false) -} - -// TestStreamRestart verifies that if connection is dropped mid way through -// replica stream, it would be handled correctly with a stream restart that -// allows caller to rewind back partial replica data and receive consistent -// stream of replcia infos. -func TestStreamRestart(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - - var failCount atomic.Int64 - tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - StoreSpecs: []base.StoreSpec{{InMemory: true}}, - Insecure: true, - Knobs: base.TestingKnobs{ - LOQRecovery: &loqrecovery.TestingKnobs{ - MetadataScanTimeout: 15 * time.Second, - ForwardReplicaFilter: func(response *serverpb.RecoveryCollectLocalReplicaInfoResponse) error { - if response.ReplicaInfo.NodeID == 2 && response.ReplicaInfo.Desc.RangeID == 14 && failCount.Add(1) < 3 { - return errors.New("rpc stream stopped") - } - return nil - }, - }, + { + name: "all live replicas can make progress", + metaNodes: nodes(1, 2, 3, 4, 5), + replicaNodes: map[roachpb.NodeID][]roachpb.NodeID{ + 2: nodes(2), }, + deadNodes: nodeSet(1, 3, 4, 5), + result: loqrecoverypb.RangeHealth_Healthy, }, - }) - tc.Start(t) - defer tc.Stopper().Stop(ctx) - require.NoError(t, tc.WaitForFullReplication()) - tc.ToggleReplicateQueues(false) - - r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases") - var totalRanges int - require.NoError(t, r.Scan(&totalRanges), "failed to query range count") - adm, err := tc.GetAdminClient(ctx, t, 2) - require.NoError(t, err, "failed to get admin client") - - assertReplicas := func(liveNodes int) { - var replicas loqrecoverypb.ClusterReplicaInfo - var stats loqrecovery.CollectionStats - - replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) - require.NoError(t, err, "failed to retrieve replica info") - - // Check counters on retrieved replica info. - cnt := getInfoCounters(replicas) - require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores") - require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes") - require.Equal(t, totalRanges, cnt.descriptors, - "number of collected descriptors from metadata") - require.Equal(t, totalRanges*liveNodes, cnt.replicas, - "number of collected replicas") - // Check stats counters as well. - require.Equal(t, liveNodes, stats.Nodes, "node counter stats") - require.Equal(t, liveNodes, stats.Stores, "store counter stats") - require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats") - } - - assertReplicas(3) -} - -func getInfoCounters(info loqrecoverypb.ClusterReplicaInfo) clusterInfoCounters { - stores := map[roachpb.StoreID]interface{}{} - nodes := map[roachpb.NodeID]interface{}{} - totalReplicas := 0 - for _, nr := range info.LocalInfo { - for _, r := range nr.Replicas { - stores[r.StoreID] = struct{}{} - nodes[r.NodeID] = struct{}{} - } - totalReplicas += len(nr.Replicas) - } - return clusterInfoCounters{ - nodes: len(nodes), - stores: len(stores), - replicas: totalReplicas, - descriptors: len(info.Descriptors), - } -} - -func TestGetRecoveryState(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - - tc, reg, planStores := prepTestCluster(t) - defer reg.CloseAllStickyInMemEngines() - defer tc.Stopper().Stop(ctx) - - adm, err := tc.GetAdminClient(ctx, t, 0) - require.NoError(t, err, "failed to get admin client") - - resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - require.NoError(t, err) - for _, s := range resp.Statuses { - require.Nil(t, s.PendingPlanID, "no pending plan") - } - - // Injecting plan into 2 nodes out of 3. - plan := makeTestRecoveryPlan(ctx, t, adm) - for i := 0; i < 2; i++ { - require.NoError(t, planStores[i].SavePlan(plan), "failed to save plan on node n%d", i) - } - - // First we test that plans are successfully picked up by status call. - resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - require.NoError(t, err) - statuses := aggregateStatusByNode(resp) - require.Equal(t, &plan.PlanID, statuses[1].PendingPlanID, "incorrect plan id on node 1") - require.Equal(t, &plan.PlanID, statuses[2].PendingPlanID, "incorrect plan id on node 2") - require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id on node 3") - - // Check we can collect partial results. - tc.StopServer(1) - - testutils.SucceedsSoon(t, func() error { - resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - if err != nil { - return err - } - if len(resp.Statuses) > 2 { - return errors.New("too many statuses in response") - } - return nil - }) - - statuses = aggregateStatusByNode(resp) - require.Equal(t, &plan.PlanID, statuses[1].PendingPlanID, "incorrect plan id") - require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id") -} - -func aggregateStatusByNode( - resp *serverpb.RecoveryVerifyResponse, -) map[roachpb.NodeID]loqrecoverypb.NodeRecoveryStatus { - statuses := make(map[roachpb.NodeID]loqrecoverypb.NodeRecoveryStatus) - for _, s := range resp.Statuses { - statuses[s.NodeID] = s - } - return statuses -} - -func TestStageRecoveryPlans(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - - tc, reg, _ := prepTestCluster(t) - defer reg.CloseAllStickyInMemEngines() - defer tc.Stopper().Stop(ctx) - - adm, err := tc.GetAdminClient(ctx, t, 0) - require.NoError(t, err, "failed to get admin client") - - resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - require.NoError(t, err) - for _, s := range resp.Statuses { - require.Nil(t, s.PendingPlanID, "no pending plan") - } - - sk := tc.ScratchRange(t) - - // Stage plan with update for node 3 using node 0 and check which nodes - // saved plan. - plan := makeTestRecoveryPlan(ctx, t, adm) - plan.Updates = []loqrecoverypb.ReplicaUpdate{ - createRecoveryForRange(t, tc, sk, 3), - } - res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) - require.NoError(t, err, "failed to stage plan") - require.Empty(t, res.Errors, "unexpected errors in stage response") - - // First we test that plans are successfully picked up by status call. - resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - require.NoError(t, err) - statuses := aggregateStatusByNode(resp) - require.Nil(t, statuses[1].PendingPlanID, "unexpected plan id on node 1") - require.Nil(t, statuses[2].PendingPlanID, "unexpected plan id on node 2") - require.Equal(t, &plan.PlanID, statuses[3].PendingPlanID, "incorrect plan id on node 3") -} - -func TestStageConflictingPlans(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - - tc, reg, _ := prepTestCluster(t) - defer reg.CloseAllStickyInMemEngines() - defer tc.Stopper().Stop(ctx) - - adm, err := tc.GetAdminClient(ctx, t, 0) - require.NoError(t, err, "failed to get admin client") - - resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - require.NoError(t, err) - for _, s := range resp.Statuses { - require.Nil(t, s.PendingPlanID, "no pending plan") - } - - sk := tc.ScratchRange(t) - - // Stage first plan. - plan := makeTestRecoveryPlan(ctx, t, adm) - plan.Updates = []loqrecoverypb.ReplicaUpdate{ - createRecoveryForRange(t, tc, sk, 3), - } - res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) - require.NoError(t, err, "failed to stage plan") - require.Empty(t, res.Errors, "unexpected errors in stage response") - - plan2 := makeTestRecoveryPlan(ctx, t, adm) - plan2.Updates = []loqrecoverypb.ReplicaUpdate{ - createRecoveryForRange(t, tc, sk, 2), - } - _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan2, AllNodes: true}) - require.ErrorContains(t, err, - fmt.Sprintf("plan %s is already staged on node n3", plan.PlanID.String()), - "conflicting plans must not be allowed") -} - -func TestForcePlanUpdate(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - - tc, reg, _ := prepTestCluster(t) - defer reg.CloseAllStickyInMemEngines() - defer tc.Stopper().Stop(ctx) - - adm, err := tc.GetAdminClient(ctx, t, 0) - require.NoError(t, err, "failed to get admin client") - - resV, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - require.NoError(t, err) - for _, s := range resV.Statuses { - require.Nil(t, s.PendingPlanID, "no pending plan") - } - - sk := tc.ScratchRange(t) - - // Stage first plan. - plan := makeTestRecoveryPlan(ctx, t, adm) - plan.Updates = []loqrecoverypb.ReplicaUpdate{ - createRecoveryForRange(t, tc, sk, 3), - } - resS, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) - require.NoError(t, err, "failed to stage plan") - require.Empty(t, resS.Errors, "unexpected errors in stage response") - - _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true}) - require.NoError(t, err, "force plan should reset previous plans") - - // Verify that plan was successfully replaced by an empty one. - resV, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - require.NoError(t, err) - statuses := aggregateStatusByNode(resV) - require.Nil(t, statuses[1].PendingPlanID, "unexpected plan id on node 1") - require.Nil(t, statuses[2].PendingPlanID, "unexpected plan id on node 2") - require.Nil(t, statuses[3].PendingPlanID, "unexpected plan id on node 3") -} - -func TestNodeDecommissioned(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - - tc, reg, _ := prepTestCluster(t) - defer reg.CloseAllStickyInMemEngines() - defer tc.Stopper().Stop(ctx) - - adm, err := tc.GetAdminClient(ctx, t, 0) - require.NoError(t, err, "failed to get admin client") - - tc.StopServer(2) - - plan := makeTestRecoveryPlan(ctx, t, adm) - plan.DecommissionedNodeIDs = []roachpb.NodeID{roachpb.NodeID(3)} - res, err := adm.RecoveryStagePlan(ctx, - &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) - require.NoError(t, err, "failed to stage plan") - require.Empty(t, res.Errors, "unexpected errors in stage response") - - require.ErrorContains(t, tc.Server(0).RPCContext().OnOutgoingPing(ctx, &rpc.PingRequest{TargetNodeID: 3}), - "permanently removed from the cluster", "ping of decommissioned node should fail") -} - -func TestRejectDecommissionReachableNode(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - - tc, reg, _ := prepTestCluster(t) - defer reg.CloseAllStickyInMemEngines() - defer tc.Stopper().Stop(ctx) - - adm, err := tc.GetAdminClient(ctx, t, 0) - require.NoError(t, err, "failed to get admin client") - - plan := makeTestRecoveryPlan(ctx, t, adm) - plan.DecommissionedNodeIDs = []roachpb.NodeID{roachpb.NodeID(3)} - _, err = adm.RecoveryStagePlan(ctx, - &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) - require.ErrorContains(t, err, "was planned for decommission, but is present in cluster", - "staging plan decommissioning live nodes must not be allowed") -} - -func TestStageRecoveryPlansToWrongCluster(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - - tc, reg, _ := prepTestCluster(t) - defer reg.CloseAllStickyInMemEngines() - defer tc.Stopper().Stop(ctx) - - adm, err := tc.GetAdminClient(ctx, t, 0) - require.NoError(t, err, "failed to get admin client") - - resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) - require.NoError(t, err) - for _, s := range resp.Statuses { - require.Nil(t, s.PendingPlanID, "no pending plan") - } - - sk := tc.ScratchRange(t) - - fakeClusterID, _ := uuid.NewV4() - // Stage plan with id of different cluster and see if error is raised. - plan := makeTestRecoveryPlan(ctx, t, adm) - plan.ClusterID = fakeClusterID.String() - plan.Updates = []loqrecoverypb.ReplicaUpdate{ - createRecoveryForRange(t, tc, sk, 3), - } - _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) - require.ErrorContains(t, err, "attempting to stage plan from cluster", "failed to stage plan") -} - -func prepTestCluster( - t *testing.T, -) (*testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore) { - reg := server.NewStickyInMemEnginesRegistry() - - const nodes = 3 - args := base.TestClusterArgs{ - ServerArgsPerNode: make(map[int]base.TestServerArgs), - } - for i := 0; i < nodes; i++ { - args.ServerArgsPerNode[i] = base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - StickyEngineRegistry: reg, - }, + { + name: "mix of replicas", + metaNodes: nodes(1, 2, 3, 4, 5), + replicaNodes: map[roachpb.NodeID][]roachpb.NodeID{ + 1: nodes(1, 2, 3, 4, 5), + 2: nodes(2), }, - StoreSpecs: []base.StoreSpec{ - { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), - }, + deadNodes: nodeSet(3, 4, 5), + result: loqrecoverypb.RangeHealth_WaitingForMeta, + }, + { + name: "replica check error", + metaNodes: nodes(1, 2, 3, 4, 5), + replicaNodes: map[roachpb.NodeID][]roachpb.NodeID{ + 1: nodes(1, 2, 3, 4, 5), }, - } - } - tc := testcluster.NewTestCluster(t, nodes, args) - tc.Start(t) - return tc, reg, prepInMemPlanStores(t, args.ServerArgsPerNode) -} - -func prepInMemPlanStores( - t *testing.T, serverArgs map[int]base.TestServerArgs, -) map[int]loqrecovery.PlanStore { - pss := make(map[int]loqrecovery.PlanStore) - for id, args := range serverArgs { - reg := args.Knobs.Server.(*server.TestingKnobs).StickyEngineRegistry - store, err := reg.GetUnderlyingFS(args.StoreSpecs[0]) - require.NoError(t, err, "can't create loq recovery plan store") - pss[id] = loqrecovery.NewPlanStore(".", store) - } - return pss -} - -func createRecoveryForRange( - t *testing.T, tc *testcluster.TestCluster, key roachpb.Key, storeID int, -) loqrecoverypb.ReplicaUpdate { - rngD, err := tc.LookupRange(key) - require.NoError(t, err, "can't find range for key %s", key) - replD, ok := rngD.GetReplicaDescriptor(roachpb.StoreID(storeID)) - require.True(t, ok, "expecting scratch replica on node 3") - replD.ReplicaID += 10 - return loqrecoverypb.ReplicaUpdate{ - RangeID: rngD.RangeID, - StartKey: loqrecoverypb.RecoveryKey(rngD.StartKey), - OldReplicaID: replD.ReplicaID, - NewReplica: replD, - NextReplicaID: replD.ReplicaID + 1, - } -} - -func makeTestRecoveryPlan( - ctx context.Context, t *testing.T, ac serverpb.AdminClient, -) loqrecoverypb.ReplicaUpdatePlan { - t.Helper() - cr, err := ac.Cluster(ctx, &serverpb.ClusterRequest{}) - require.NoError(t, err, "failed to read cluster it") - return loqrecoverypb.ReplicaUpdatePlan{ - PlanID: uuid.MakeV4(), - ClusterID: cr.ClusterID, - } + deadNodes: nodeSet(3, 4, 5), + result: loqrecoverypb.RangeHealth_LossOfQuorum, + }, + } { + t.Run(d.name, func(t *testing.T) { + isLive := func(rd roachpb.ReplicaDescriptor) bool { + _, ok := d.deadNodes[rd.NodeID] + return !ok + } + rangeInfo := func(ctx context.Context, rID roachpb.RangeID, nID roachpb.NodeID, + ) (serverpb.RangeInfo, error) { + ids, ok := d.replicaNodes[nID] + if ok { + desc := buildDescForNodes(ids) + return serverpb.RangeInfo{ + State: kvserverpb.RangeInfo{ + ReplicaState: kvserverpb.ReplicaState{ + Desc: &desc, + }, + }, + }, nil + } + return serverpb.RangeInfo{}, errors.Newf("no replica for r%d on node n%d", rID, nID) + } + metaDesc := buildDescForNodes(d.metaNodes) + require.Equal(t, d.result.String(), checkRangeHealth(ctx, metaDesc, + isLive, rangeInfo).String(), "incorrect health computed") + }) + } +} + +func nodes(ns ...roachpb.NodeID) []roachpb.NodeID { + return append([]roachpb.NodeID(nil), ns...) +} + +func nodeSet(nodes ...roachpb.NodeID) map[roachpb.NodeID]interface{} { + r := make(map[roachpb.NodeID]interface{}) + for _, id := range nodes { + r[id] = struct{}{} + } + return r +} + +func buildDescForNodes(nodes []roachpb.NodeID) roachpb.RangeDescriptor { + desc := roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: keys.MustAddr(keys.MetaMin), + EndKey: keys.MustAddr(keys.Meta2KeyMax), + } + var rs roachpb.ReplicaSet + for i, id := range nodes { + rs.AddReplica(roachpb.ReplicaDescriptor{ + NodeID: id, + StoreID: roachpb.StoreID(id), + ReplicaID: roachpb.ReplicaID(i + 1), + Type: roachpb.VOTER_FULL, + }) + } + desc.SetReplicas(rs) + return desc } diff --git a/pkg/kv/kvserver/loqrecovery/utils.go b/pkg/kv/kvserver/loqrecovery/utils.go index a4cab771ac44..3fd2b9ec58ef 100644 --- a/pkg/kv/kvserver/loqrecovery/utils.go +++ b/pkg/kv/kvserver/loqrecovery/utils.go @@ -16,6 +16,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/strutil" ) type storeIDSet map[roachpb.StoreID]struct{} @@ -42,11 +43,7 @@ func (s storeIDSet) storeSliceFromSet() []roachpb.StoreID { // Make a string of stores 'set' in ascending order. func (s storeIDSet) joinStoreIDs() string { - storeNames := make([]string, 0, len(s)) - for _, id := range s.storeSliceFromSet() { - storeNames = append(storeNames, fmt.Sprintf("s%d", id)) - } - return strings.Join(storeNames, ", ") + return strutil.JoinIDs("s", s.storeSliceFromSet()) } func (s storeIDSet) intersect(other storeIDSet) storeIDSet { diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 44736c039a43..80c7e64e7e45 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -3375,7 +3375,7 @@ func (s *systemAdminServer) RecoveryVerify( return nil, err } - return s.server.recoveryServer.Verify(ctx, request) + return s.server.recoveryServer.Verify(ctx, request, s.nodeLiveness.GetIsLiveMap(), s.db) } // sqlQuery allows you to incrementally build a SQL query that uses diff --git a/pkg/server/loss_of_quorum.go b/pkg/server/loss_of_quorum.go index aa910d324f7a..624c9d5fbc8f 100644 --- a/pkg/server/loss_of_quorum.go +++ b/pkg/server/loss_of_quorum.go @@ -96,7 +96,8 @@ func maybeRunLossOfQuorumRecoveryCleanup( server *Server, stopper *stop.Stopper, ) { - _ = stopper.RunAsyncTask(ctx, "publish-loss-of-quorum-events", func(ctx context.Context) { + taskCtx, _ := stopper.WithCancelOnQuiesce(ctx) + _ = stopper.RunAsyncTask(taskCtx, "publish-loss-of-quorum-events", func(ctx context.Context) { if err := stores.VisitStores(func(s *kvserver.Store) error { _, err := loqrecovery.RegisterOfflineRecoveryEvents( ctx, @@ -149,7 +150,7 @@ func maybeRunLossOfQuorumRecoveryCleanup( if len(cleanup.DecommissionedNodeIDs) == 0 { return } - _ = stopper.RunAsyncTask(ctx, "maybe-mark-nodes-as-decommissioned", func(ctx context.Context) { + _ = stopper.RunAsyncTask(taskCtx, "maybe-mark-nodes-as-decommissioned", func(ctx context.Context) { log.Infof(ctx, "loss of quorum recovery decommissioning removed nodes %s", strutil.JoinIDs("n", cleanup.DecommissionedNodeIDs)) retryOpts := retry.Options{ diff --git a/pkg/server/server.go b/pkg/server/server.go index 8db279869848..344e98dc4e7e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1076,6 +1076,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { func(ctx context.Context, id roachpb.NodeID) error { return nodeTombStorage.SetDecommissioned(ctx, id, timeutil.Now()) }, + func(ctx context.Context, id roachpb.NodeID) (livenesspb.MembershipStatus, bool) { + r, ok := nodeLiveness.GetLiveness(id) + return r.Membership, ok + }, ) *lateBoundServer = Server{ diff --git a/pkg/server/serverpb/admin.go b/pkg/server/serverpb/admin.go index 0370b4c915dc..1de26acbf239 100644 --- a/pkg/server/serverpb/admin.go +++ b/pkg/server/serverpb/admin.go @@ -42,3 +42,9 @@ func (ts *TableStatsResponse) Add(ots *TableStatsResponse) { type TenantAdminServer interface { Liveness(context.Context, *LivenessRequest) (*LivenessResponse, error) } + +// Empty is true if there are no unavailable ranges and no error performing +// healthcheck. +func (r *RecoveryVerifyResponse_UnavailableRanges) Empty() bool { + return len(r.Ranges) == 0 && len(r.Error) == 0 +} diff --git a/pkg/server/serverpb/admin.proto b/pkg/server/serverpb/admin.proto index d1c277809035..f6c7c0eda8e5 100644 --- a/pkg/server/serverpb/admin.proto +++ b/pkg/server/serverpb/admin.proto @@ -965,23 +965,34 @@ message RecoveryVerifyRequest { // the cluster when loss of quorum recovery successfully applies. repeated int32 decommissioned_node_ids = 2 [(gogoproto.customname) = "DecommissionedNodeIDs", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + // MaxReportedRanges is the maximum number of failed ranges to report. + // If more unhealthy ranges are found, error will be returned alongside range + // to indicate that ranges were cut short. + int32 max_reported_ranges = 3; } message RecoveryVerifyResponse { + message UnavailableRanges { + // Ranges contains descriptors of ranges that failed health check. + // If there are too many ranges to report, error would contain relevant + // message. + repeated cockroach.kv.kvserver.loqrecovery.loqrecoverypb.RangeRecoveryStatus ranges = 1 [ + (gogoproto.nullable) = false]; + // Error contains an optional error if ranges validation can't complete. + string error = 2; + } + // Statuses contain a list of recovery statuses of nodes updated during recovery. It // also contains nodes that were expected to be live (not decommissioned by recovery) // but failed to return status response. repeated cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus statuses = 1 [ (gogoproto.nullable) = false]; - // UnavailableRanges contains descriptors of ranges that failed health checks. - repeated roachpb.RangeDescriptor unavailable_ranges = 2 [ - (gogoproto.nullable) = false]; - // DecommissionedNodeIDs contains list of decommissioned node id's. Only nodes that - // were decommissioned by the plan would be listed here, not all historically - // decommissioned ones. - repeated int32 decommissioned_node_ids = 3 [ - (gogoproto.customname) = "DecommissionedNodeIDs", - (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + // UnavailableRanges contains information about ranges that failed health check. + UnavailableRanges unavailable_ranges = 2 [(gogoproto.nullable) = false]; + // DecommissionedNodeStatuses contains a map of requested IDs with their + // corresponding liveness statuses. + map decommissioned_node_statuses = 3 [ + (gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; } // Admin is the gRPC API for the admin UI. Through grpc-gateway, we offer diff --git a/pkg/testutils/BUILD.bazel b/pkg/testutils/BUILD.bazel index 6eb2cb3ff8c7..c0ef2503a527 100644 --- a/pkg/testutils/BUILD.bazel +++ b/pkg/testutils/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "files.go", "hook.go", "keys.go", + "listener.go", "net.go", "pprof.go", "soon.go", @@ -34,6 +35,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/testutils/listener.go b/pkg/testutils/listener.go new file mode 100644 index 000000000000..296cff259580 --- /dev/null +++ b/pkg/testutils/listener.go @@ -0,0 +1,147 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package testutils + +import ( + "net" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// ListenerRegistry is a registry for listener sockets that allows TestServers +// to reuse listener sockets and keep them on the same ports throughout server +// restarts. +// Tests rely on net.Listen on port 0 to open first available port and uses its +// details to let TestServers connect to each other. Tests can't rely on fixed +// ports because it must be possible to run lots of tests in parallel. When +// TestServer is restarted, it would close its listener and reopen a new one +// on a different port as its own port might be reused by that time. +// This registry provides listener wrappers that could be associated with server +// ids and injected into TestServers normal way. Listeners will not close +// actual network sockets when closed, but will pause accepting connections. +// Test could then specifically resume listeners prior to restarting servers. +type ListenerRegistry struct { + listeners map[int]*reusableListener +} + +func NewListenerRegistry() ListenerRegistry { + return ListenerRegistry{listeners: make(map[int]*reusableListener)} +} + +func (r *ListenerRegistry) Get(t *testing.T, idx int) net.Listener { + t.Helper() + if l, ok := r.listeners[idx]; ok { + return l + } + nl, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "failed to create network listener") + l := &reusableListener{ + id: idx, + wrapped: nl, + acceptC: make(chan net.Conn), + stopC: make(chan interface{}), + } + l.resume() + r.listeners[idx] = l + go l.run() + return l +} + +func (r *ListenerRegistry) Reopen(idx int) error { + l, ok := r.listeners[idx] + if !ok { + return errors.Newf("socket for id %d is not open", idx) + } + l.resume() + return nil +} + +func (r *ListenerRegistry) Close() { + for k, v := range r.listeners { + _ = v.wrapped.Close() + close(v.stopC) + delete(r.listeners, k) + } +} + +type reusableListener struct { + id int + wrapped net.Listener + acceptC chan net.Conn + pauseMu struct { + syncutil.RWMutex + pauseC chan interface{} + } + stopC chan interface{} +} + +func (l *reusableListener) run() { + defer func() { + close(l.acceptC) + }() + for { + c, err := l.wrapped.Accept() + if err != nil { + return + } + select { + case l.acceptC <- c: + case <-l.pauseC(): + _ = c.Close() + case <-l.stopC: + _ = c.Close() + return + } + } +} + +func (l *reusableListener) pauseC() <-chan interface{} { + l.pauseMu.RLock() + defer l.pauseMu.RUnlock() + return l.pauseMu.pauseC +} + +func (l *reusableListener) resume() { + l.pauseMu.Lock() + defer l.pauseMu.Unlock() + l.pauseMu.pauseC = make(chan interface{}) +} + +func (l *reusableListener) Accept() (net.Conn, error) { + select { + case c, ok := <-l.acceptC: + if !ok { + return nil, net.ErrClosed + } + return c, nil + case <-l.pauseC(): + return nil, net.ErrClosed + } +} + +func (l *reusableListener) Close() error { + l.pauseMu.Lock() + defer l.pauseMu.Unlock() + select { + case <-l.pauseMu.pauseC: + // Already paused, nothing to do. + default: + close(l.pauseMu.pauseC) + } + return nil +} + +func (l *reusableListener) Addr() net.Addr { + return l.wrapped.Addr() +} diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 578771e85e18..276cc75d4d90 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1546,21 +1546,23 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } serverArgs := tc.serverArgs[idx] - if idx == 0 { - // If it's the first server, then we need to restart the RPC listener by hand. - // Look at NewTestCluster for more details. - listener, err := net.Listen("tcp", serverArgs.Listener.Addr().String()) - if err != nil { - return err - } - serverArgs.Listener = listener - serverArgs.Knobs.Server.(*server.TestingKnobs).RPCListener = serverArgs.Listener - } else { - serverArgs.Addr = "" - // Try and point the server to a live server in the cluster to join. - for i := range tc.Servers { - if !tc.ServerStopped(i) { - serverArgs.JoinAddr = tc.Servers[i].ServingRPCAddr() + if !tc.clusterArgs.ReusableListeners { + if idx == 0 { + // If it's the first server, then we need to restart the RPC listener by hand. + // Look at NewTestCluster for more details. + listener, err := net.Listen("tcp", serverArgs.Listener.Addr().String()) + if err != nil { + return err + } + serverArgs.Listener = listener + serverArgs.Knobs.Server.(*server.TestingKnobs).RPCListener = serverArgs.Listener + } else { + serverArgs.Addr = "" + // Try and point the server to a live server in the cluster to join. + for i := range tc.Servers { + if !tc.ServerStopped(i) { + serverArgs.JoinAddr = tc.Servers[i].ServingRPCAddr() + } } } } diff --git a/pkg/util/grpcutil/grpc_util.go b/pkg/util/grpcutil/grpc_util.go index 13d7a4bb8750..31086b10fef1 100644 --- a/pkg/util/grpcutil/grpc_util.go +++ b/pkg/util/grpcutil/grpc_util.go @@ -66,7 +66,7 @@ func IsTimeout(err error) bool { // set when we are not able to establish connection to remote node. func IsConnectionUnavailable(err error) bool { if s, ok := status.FromError(errors.UnwrapAll(err)); ok { - return s.Code() == codes.Unavailable + return s.Code() == codes.Unavailable || s.Code() == codes.FailedPrecondition } return false }