Skip to content

Commit

Permalink
loqrecovery,admin,cli: stage recovery plan on cluster
Browse files Browse the repository at this point in the history
This commit adds loss of quorum recovery plan staging on nodes.
RecoveryStagePlan admin call is distributing recovery plan to
relevant nodes of the cluster. To do so, it first verifies that
cluster state is unchanged from the state where plan was created
and there are no previously staged plans.
Then it distributes plan to all cluster nodes using fan-out mechanism.
Each node in turn markes dead nodes as decommissioned and if there
are planned changes for the node it saves plan in the local store.
Admin call is backed by debug recover apply-plan command when using
--host flag to work in half-online mode.

Release note: None
  • Loading branch information
aliher1911 committed Jan 20, 2023
1 parent 5209cea commit b05fbcc
Show file tree
Hide file tree
Showing 7 changed files with 571 additions and 53 deletions.
2 changes: 1 addition & 1 deletion docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -7437,7 +7437,7 @@ Support status: [reserved](#support-status)
| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| statuses | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus) | repeated | Statuses contain a list of recovery statuses of nodes updated during recovery. It also contains nodes that were expected to be live (not decommissioned by recovery) but failed to return status response. | [reserved](#support-status) |
| unavailable_ranges | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.roachpb.RangeDescriptor) | repeated | Unavailable ranges contains descriptors of ranges that failed health checks. | [reserved](#support-status) |
| unavailable_ranges | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.roachpb.RangeDescriptor) | repeated | UnavailableRanges contains descriptors of ranges that failed health checks. | [reserved](#support-status) |
| decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyResponse-int32) | repeated | DecommissionedNodeIDs contains list of decommissioned node id's. Only nodes that were decommissioned by the plan would be listed here, not all historically decommissioned ones. | [reserved](#support-status) |


Expand Down
158 changes: 145 additions & 13 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"io"
"os"
"path"
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -170,7 +172,7 @@ Now the cluster could be started again.
var recoverCommands = []*cobra.Command{
debugRecoverCollectInfoCmd,
debugRecoverPlanCmd,
//debugRecoverStagePlan,
debugRecoverExecuteCmd,
//debugRecoverVerify,
}

Expand Down Expand Up @@ -503,7 +505,7 @@ To stage recovery application in half-online mode invoke:
Alternatively distribute plan to below nodes and invoke 'debug recover apply-plan --store=<store-dir> %s' on:
`, remoteArgs, planFile, planFile)
for _, node := range report.UpdatedNodes {
_, _ = fmt.Fprintf(stderr, "- node n%d, store(s) %s\n", node.NodeID, joinStoreIDs(node.StoreIDs))
_, _ = fmt.Fprintf(stderr, "- node n%d, store(s) %s\n", node.NodeID, joinIDs("s", node.StoreIDs))
}

return nil
Expand Down Expand Up @@ -559,8 +561,10 @@ var debugRecoverExecuteOpts struct {
// --confirm flag.
// If action is confirmed, then all changes are committed to the storage.
func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(cmd.Context())
// We need cancellable context here to obtain grpc client connection.
// getAdminClient will refuse otherwise.
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

planFile := args[0]
data, err := os.ReadFile(planFile)
Expand All @@ -574,6 +578,134 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
return errors.Wrapf(err, "failed to unmarshal plan from file %q", planFile)
}

if len(debugRecoverExecuteOpts.Stores.Specs) == 0 {
return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates)
}
return applyRecoveryToLocalStore(ctx, nodeUpdates)
}

func stageRecoveryOntoCluster(
ctx context.Context, cmd *cobra.Command, planFile string, plan loqrecoverypb.ReplicaUpdatePlan,
) error {
c, finish, err := getAdminClient(ctx, serverCfg)
if err != nil {
return errors.Wrapf(err, "failed to get admin connection to cluster")
}
defer finish()

// Check existing plan on nodes
type planConflict struct {
nodeID roachpb.NodeID
planID string
}
vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{})
if err != nil {
return errors.Wrap(err, "failed to retrieve loss of quorum recovery status from cluster")
}
var conflicts []planConflict
for _, ns := range vr.Statuses {
if ns.PendingPlanID != nil && !ns.PendingPlanID.Equal(plan.PlanID) {
conflicts = append(conflicts, planConflict{nodeID: ns.NodeID, planID: ns.PendingPlanID.String()})
}
}

// Proposed report
_, _ = fmt.Fprintf(stderr, "Proposed changes in plan %s:\n", plan.PlanID)
for _, u := range plan.Updates {
_, _ = fmt.Fprintf(stderr, " range r%d:%s updating replica %s to %s and discarding all others.\n",
u.RangeID, roachpb.Key(u.StartKey), u.OldReplicaID, u.NextReplicaID)
}
_, _ = fmt.Fprintf(stderr, "\nNodes %s will be marked as decommissioned.\n", joinIDs("n", plan.DecommissionedNodeIDs))

if len(conflicts) > 0 {
_, _ = fmt.Fprintf(stderr, "\nConflicting staged plans will be replaced:\n")
for _, cp := range conflicts {
_, _ = fmt.Fprintf(stderr, " plan %s is staged on node n%d.\n", cp.planID, cp.nodeID)
}
}
_, _ = fmt.Fprintln(stderr)

// Confirm actions
switch debugRecoverExecuteOpts.confirmAction {
case prompt:
_, _ = fmt.Fprintf(stderr, "\nProceed with staging plan [y/N] ")
reader := bufio.NewReader(os.Stdin)
line, err := reader.ReadString('\n')
if err != nil {
return errors.Wrap(err, "failed to read user input")
}
_, _ = fmt.Fprintf(stderr, "\n")
if len(line) < 1 || (line[0] != 'y' && line[0] != 'Y') {
_, _ = fmt.Fprint(stderr, "Aborted at user request\n")
return nil
}
case allYes:
// All actions enabled by default.
default:
return errors.New("Aborted by --confirm option")
}

maybeWrapStagingError := func(msg string, res *serverpb.RecoveryStagePlanResponse, err error) error {
if err != nil {
return errors.Wrapf(err, "%s", msg)
}
if len(res.Errors) > 0 {
return errors.Newf("%s:\n%s", msg, strings.Join(res.Errors, "\n"))
}
return nil
}

if len(conflicts) > 0 {
// We don't want to combine removing old plan and adding new one since it
// could produce cluster with multiple plans at the same time which could
// make situation worse.
res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true})
if err := maybeWrapStagingError("failed removing existing loss of quorum replica recovery plan from cluster", res, err); err != nil {
return err
}
}
sr, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true})
if err := maybeWrapStagingError("failed to stage loss of quorum recovery plan on cluster",
sr, err); err != nil {
return err
}

remoteArgs := getCLIClusterFlags(true, cmd, func(flag string) bool {
_, filter := planSpecificFlags[flag]
return filter
})

nodeSet := make(map[roachpb.NodeID]interface{})
for _, r := range plan.Updates {
nodeSet[r.NodeID()] = struct{}{}
}

_, _ = fmt.Fprintf(stderr, `Plan staged. To complete recovery restart nodes %s.
To verify recovery status invoke:
'cockroach debug recover verify %s %s'
`, joinIDs("n", sortedKeys(nodeSet)), remoteArgs, planFile)
return nil
}

func sortedKeys[T ~int32](set map[T]any) []T {
var sorted []T
for k := range set {
sorted = append(sorted, k)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] < sorted[j]
})
return sorted
}

func applyRecoveryToLocalStore(
ctx context.Context, nodeUpdates loqrecoverypb.ReplicaUpdatePlan,
) error {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

var localNodeID roachpb.NodeID
batches := make(map[roachpb.StoreID]storage.Batch)
for _, storeSpec := range debugRecoverExecuteOpts.Stores.Specs {
Expand All @@ -586,7 +718,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
defer store.Close()
defer batch.Close()

storeIdent, err := kvstorage.ReadStoreIdent(cmd.Context(), store)
storeIdent, err := kvstorage.ReadStoreIdent(ctx, store)
if err != nil {
return err
}
Expand All @@ -602,7 +734,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {

updateTime := timeutil.Now()
prepReport, err := loqrecovery.PrepareUpdateReplicas(
cmd.Context(), nodeUpdates, uuid.DefaultGenerator, updateTime, localNodeID, batches)
ctx, nodeUpdates, uuid.DefaultGenerator, updateTime, localNodeID, batches)
if err != nil {
return err
}
Expand All @@ -615,7 +747,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
if len(prepReport.UpdatedReplicas) == 0 {
if len(prepReport.MissingStores) > 0 {
return errors.Newf("stores %s expected on the node but no paths were provided",
joinStoreIDs(prepReport.MissingStores))
joinIDs("s", prepReport.MissingStores))
}
_, _ = fmt.Fprintf(stderr, "No updates planned on this node.\n")
return nil
Expand Down Expand Up @@ -653,14 +785,14 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {

// Apply batches to the stores.
applyReport, err := loqrecovery.CommitReplicaChanges(batches)
_, _ = fmt.Fprintf(stderr, "Updated store(s): %s\n", joinStoreIDs(applyReport.UpdatedStores))
_, _ = fmt.Fprintf(stderr, "Updated store(s): %s\n", joinIDs("s", applyReport.UpdatedStores))
return err
}

func joinStoreIDs(storeIDs []roachpb.StoreID) string {
storeNames := make([]string, 0, len(storeIDs))
for _, id := range storeIDs {
storeNames = append(storeNames, fmt.Sprintf("s%d", id))
func joinIDs[T ~int32](prefix string, ids []T) string {
storeNames := make([]string, 0, len(ids))
for _, id := range ids {
storeNames = append(storeNames, fmt.Sprintf("%s%d", prefix, id))
}
return strings.Join(storeNames, ", ")
}
Expand All @@ -681,7 +813,7 @@ func formatNodeStores(locations []loqrecovery.NodeStores, indent string) string
nodeDetails := make([]string, 0, len(locations))
for _, node := range locations {
nodeDetails = append(nodeDetails,
indent+fmt.Sprintf("n%d: store(s): %s", node.NodeID, joinStoreIDs(node.StoreIDs)))
indent+fmt.Sprintf("n%d: store(s): %s", node.NodeID, joinIDs("s", node.StoreIDs)))
}
return strings.Join(nodeDetails, "\n")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/loqrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go_test(
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
Expand Down
Loading

0 comments on commit b05fbcc

Please sign in to comment.