Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
95405: loqrecovery,admin,cli: check staged plans, stage recovery plan on cluster r=erikgrinaker a=aliher1911

loqrecovery,admin,cli: stage recovery plan on cluster

This commit adds loss of quorum recovery plan staging on nodes.
RecoveryStagePlan admin call is distributing recovery plan to
relevant nodes of the cluster. To do so, it first verifies that
cluster state is unchanged from the state where plan was created
and there are no previously staged plans.
Then it distributes plan to all cluster nodes using fan-out mechanism.
Each node in turn markes dead nodes as decommissioned and if there
are planned changes for the node it saves plan in the local store.
Admin call is backed by debug recover apply-plan command when using
--host flag to work in half-online mode.

Release note: None

----

loqrecovery,admin: implement endpoint to check staged plans

This commit adds loss of quorum recovery verify call to admin
interface. Call allows querying loss of quorum recovery status
from all nodes of the cluster. It provides info about loss of
quorum recovery plans staged on each node.

Release note: None

----

State checkpoint is included in the PR as it only provide partial functionality of state needed for stage phase to work.

Fixes #93044
Touches #74135
Touches #93043

When doing staging, cli would present following reports in happy case:
```
$ cockroach debug recover apply-plan  --host=127.0.0.1:26257 --insecure=true recover-plan.json
Proposed changes in plan 66200d2c-e0e1-4af4-b890-ef5bb6e9ccc4:
  range r93:/Table/106/1/"boston"/"333333D\x00\x80\x00\x00\x00\x00\x00\x00\n" updating replica 2 to 16.
  range r92:/Table/106/1/"los angeles"/"\x99\x99\x99\x99\x99\x99H\x00\x80\x00\x00\x00\x00\x00\x00\x1e" updating replica 2 to 16.
  range r91:/Table/106/1/"seattle"/"ffffffH\x00\x80\x00\x00\x00\x00\x00\x00\x14" updating replica 2 to 16.
  range r115:/Table/106/1/"washington dc"/"L\xcc\xcc\xcc\xcc\xccL\x00\x80\x00\x00\x00\x00\x00\x00\x0f" updating replica 2 to 15.
  range r80:/Table/107 updating replica 1 to 15.
  range r96:/Table/107/1/"san francisco"/"\x88\x88\x88\x88\x88\x88H\x00\x80\x00\x00\x00\x00\x00\x00\b" updating replica 1 to 15.
  range r102:/Table/107/1/"seattle"/"UUUUUUD\x00\x80\x00\x00\x00\x00\x00\x00\x05" updating replica 4 to 16.
  range r89:/Table/107/2 updating replica 1 to 15.
  range r126:/Table/108/1/"amsterdam"/"\xc5\x1e\xb8Q\xeb\x85@\x00\x80\x00\x00\x00\x00\x00\x01\x81" updating replica 2 to 16.
  range r104:/Table/108/1/"los angeles"/"\xa8\xf5\u008f\\(H\x00\x80\x00\x00\x00\x00\x00\x01J" updating replica 3 to 16.
  range r119:/Table/108/1/"san francisco"/"\x8c\xcc\xcc\xcc\xcc\xcc@\x00\x80\x00\x00\x00\x00\x00\x01\x13" updating replica 6 to 18.
  range r117:/Table/108/1/"seattle"/"p\xa3\xd7\n=pD\x00\x80\x00\x00\x00\x00\x00\x00\xdc" updating replica 4 to 17.
  range r155:/Table/108/1/"washington dc"/"Tz\xe1G\xae\x14L\x00\x80\x00\x00\x00\x00\x00\x00\xa5" updating replica 3 to 15.
  range r82:/Table/108/3 updating replica 1 to 15.

Nodes n4, n5 will be marked as decommissioned.


Proceed with staging plan [y/N] y

Plan staged. To complete recovery restart nodes n1, n2, n3.

To verify recovery status invoke

'cockroach debug recover verify  --host=127.0.0.1:26257 --insecure=true recover-plan.json'

```

And allow overwriting of currently stages plans if need be:

```
$ cockroach debug recover apply-plan  --host=127.0.0.1:26257 --insecure=true recover-plan-2.json
Proposed changes in plan 576f3d2e-518c-4dbc-9af4-b416629bbf1a:
  range r93:/Table/106/1/"boston"/"333333D\x00\x80\x00\x00\x00\x00\x00\x00\n" updating replica 2 to 16.
  range r92:/Table/106/1/"los angeles"/"\x99\x99\x99\x99\x99\x99H\x00\x80\x00\x00\x00\x00\x00\x00\x1e" updating replica 2 to 16.
  range r91:/Table/106/1/"seattle"/"ffffffH\x00\x80\x00\x00\x00\x00\x00\x00\x14" updating replica 2 to 16.
  range r115:/Table/106/1/"washington dc"/"L\xcc\xcc\xcc\xcc\xccL\x00\x80\x00\x00\x00\x00\x00\x00\x0f" updating replica 2 to 15.
  range r80:/Table/107 updating replica 1 to 15.
  range r96:/Table/107/1/"san francisco"/"\x88\x88\x88\x88\x88\x88H\x00\x80\x00\x00\x00\x00\x00\x00\b" updating replica 1 to 15.
  range r102:/Table/107/1/"seattle"/"UUUUUUD\x00\x80\x00\x00\x00\x00\x00\x00\x05" updating replica 4 to 16.
  range r89:/Table/107/2 updating replica 1 to 15.
  range r126:/Table/108/1/"amsterdam"/"\xc5\x1e\xb8Q\xeb\x85@\x00\x80\x00\x00\x00\x00\x00\x01\x81" updating replica 2 to 16.
  range r104:/Table/108/1/"los angeles"/"\xa8\xf5\u008f\\(H\x00\x80\x00\x00\x00\x00\x00\x01J" updating replica 3 to 16.
  range r119:/Table/108/1/"san francisco"/"\x8c\xcc\xcc\xcc\xcc\xcc@\x00\x80\x00\x00\x00\x00\x00\x01\x13" updating replica 6 to 18.
  range r117:/Table/108/1/"seattle"/"p\xa3\xd7\n=pD\x00\x80\x00\x00\x00\x00\x00\x00\xdc" updating replica 4 to 17.
  range r155:/Table/108/1/"washington dc"/"Tz\xe1G\xae\x14L\x00\x80\x00\x00\x00\x00\x00\x00\xa5" updating replica 3 to 15.
  range r82:/Table/108/3 updating replica 1 to 15.

Nodes n4, n5 will be marked as decommissioned.

Conflicting staged plans will be replaced:
  plan 66200d2c-e0e1-4af4-b890-ef5bb6e9ccc4 is staged on node n1.
  plan 66200d2c-e0e1-4af4-b890-ef5bb6e9ccc4 is staged on node n3.
  plan 66200d2c-e0e1-4af4-b890-ef5bb6e9ccc4 is staged on node n2.


Proceed with staging plan [y/N] y

Plan staged. To complete recovery restart nodes n1, n2, n3.

To verify recovery status invoke

'cockroach debug recover verify  --host=127.0.0.1:26257 --insecure=true recover-plan-2.json'

```


95545: sql: cache result of nullary UDFs and uncorrelated subqueries r=mgartner a=mgartner

#### sql: add tests showing duplicate execution of uncorrelated subqueries

Release note: None

#### logictest: fix bug in let variable replacement

This commit fixes a bug in the implementation of the `let` command that
incorrectly matched and replaced parts of queries with dollar-quotes,
like `CREATE FUNCTION ... AS $$SELECT 1$$`.

Release note: None

#### sql: cache result of nullary UDFs and uncorrelated subqueries

Nullary (zero-argument), non-volatile UDFs and lazily-evaluated,
uncorrelated subqueries now cache the result of their first invocation.
On subsequent invocations, the cached result is returned rather than
evaluating the full UDF or subquery. This eliminates duplicate work, and
makes the behavior of lazily-evaluated, uncorrelated subqueries match
that of eagerly-evaluated, uncorrelated subqueries which are evaluated
once before the main query is evaluated.

Epic: CRDB-20370

Release note: None


95599: opt: do not hoist correlated, non-leakproof subqueries in COALESCE r=mgartner a=mgartner

Previously, correlated, non-leakproof subqueries within a `COALESCE`
expression would be hoisted into an apply-join. This caused those
arguments to be evaluated when they shouldn't have been, i.e., when
previous arguments always evaluated to non-NULL values. This could cause
errors to propagate from arguments that should have never been
evaluated, and it was inconsistent with Postgres:

> Like a CASE expression, COALESCE only evaluates the arguments that are
needed to determine the result; that is, arguments to the right of the
first non-null argument are not evaluated[^1].

Now, non-leakproof subqueries are only hoisted from within a `COALESCE`
expression if they are not in the first argument. It is safe to hoist a
non-leakproof subquery in the first argument because it is always
evaluated.

Fixes #95560

Release note (bug fix): A bug has been fixed that could arguments of a
`COALESCE` to be evaluated when previous arguments always evaluated to
non-NULL values. This bug could cause query errors to originate from
arguments of a `COALESCE` that should have never been evaluated.

[^1]: https://www.postgresql.org/docs/15/functions-conditional.html#FUNCTIONS-COALESCE-NVL-IFNULL


Co-authored-by: Oleg Afanasyev <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
3 people committed Jan 24, 2023
4 parents 6ce9710 + f9957a2 + 19083d4 + 5bbb34a commit c8d92f3
Show file tree
Hide file tree
Showing 20 changed files with 1,146 additions and 106 deletions.
2 changes: 1 addition & 1 deletion docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -7535,7 +7535,7 @@ Support status: [reserved](#support-status)
| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| statuses | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.NodeRecoveryStatus) | repeated | Statuses contain a list of recovery statuses of nodes updated during recovery. It also contains nodes that were expected to be live (not decommissioned by recovery) but failed to return status response. | [reserved](#support-status) |
| unavailable_ranges | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.roachpb.RangeDescriptor) | repeated | Unavailable ranges contains descriptors of ranges that failed health checks. | [reserved](#support-status) |
| unavailable_ranges | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryVerifyResponse-cockroach.roachpb.RangeDescriptor) | repeated | UnavailableRanges contains descriptors of ranges that failed health checks. | [reserved](#support-status) |
| decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyResponse-int32) | repeated | DecommissionedNodeIDs contains list of decommissioned node id's. Only nodes that were decommissioned by the plan would be listed here, not all historically decommissioned ones. | [reserved](#support-status) |


Expand Down
158 changes: 145 additions & 13 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"io"
"os"
"path"
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -170,7 +172,7 @@ Now the cluster could be started again.
var recoverCommands = []*cobra.Command{
debugRecoverCollectInfoCmd,
debugRecoverPlanCmd,
//debugRecoverStagePlan,
debugRecoverExecuteCmd,
//debugRecoverVerify,
}

Expand Down Expand Up @@ -503,7 +505,7 @@ To stage recovery application in half-online mode invoke:
Alternatively distribute plan to below nodes and invoke 'debug recover apply-plan --store=<store-dir> %s' on:
`, remoteArgs, planFile, planFile)
for _, node := range report.UpdatedNodes {
_, _ = fmt.Fprintf(stderr, "- node n%d, store(s) %s\n", node.NodeID, joinStoreIDs(node.StoreIDs))
_, _ = fmt.Fprintf(stderr, "- node n%d, store(s) %s\n", node.NodeID, joinIDs("s", node.StoreIDs))
}

return nil
Expand Down Expand Up @@ -559,8 +561,10 @@ var debugRecoverExecuteOpts struct {
// --confirm flag.
// If action is confirmed, then all changes are committed to the storage.
func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(cmd.Context())
// We need cancellable context here to obtain grpc client connection.
// getAdminClient will refuse otherwise.
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

planFile := args[0]
data, err := os.ReadFile(planFile)
Expand All @@ -574,6 +578,134 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
return errors.Wrapf(err, "failed to unmarshal plan from file %q", planFile)
}

if len(debugRecoverExecuteOpts.Stores.Specs) == 0 {
return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates)
}
return applyRecoveryToLocalStore(ctx, nodeUpdates)
}

func stageRecoveryOntoCluster(
ctx context.Context, cmd *cobra.Command, planFile string, plan loqrecoverypb.ReplicaUpdatePlan,
) error {
c, finish, err := getAdminClient(ctx, serverCfg)
if err != nil {
return errors.Wrapf(err, "failed to get admin connection to cluster")
}
defer finish()

// Check existing plan on nodes
type planConflict struct {
nodeID roachpb.NodeID
planID string
}
vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{})
if err != nil {
return errors.Wrap(err, "failed to retrieve loss of quorum recovery status from cluster")
}
var conflicts []planConflict
for _, ns := range vr.Statuses {
if ns.PendingPlanID != nil && !ns.PendingPlanID.Equal(plan.PlanID) {
conflicts = append(conflicts, planConflict{nodeID: ns.NodeID, planID: ns.PendingPlanID.String()})
}
}

// Proposed report
_, _ = fmt.Fprintf(stderr, "Proposed changes in plan %s:\n", plan.PlanID)
for _, u := range plan.Updates {
_, _ = fmt.Fprintf(stderr, " range r%d:%s updating replica %s to %s and discarding all others.\n",
u.RangeID, roachpb.Key(u.StartKey), u.OldReplicaID, u.NextReplicaID)
}
_, _ = fmt.Fprintf(stderr, "\nNodes %s will be marked as decommissioned.\n", joinIDs("n", plan.DecommissionedNodeIDs))

if len(conflicts) > 0 {
_, _ = fmt.Fprintf(stderr, "\nConflicting staged plans will be replaced:\n")
for _, cp := range conflicts {
_, _ = fmt.Fprintf(stderr, " plan %s is staged on node n%d.\n", cp.planID, cp.nodeID)
}
}
_, _ = fmt.Fprintln(stderr)

// Confirm actions
switch debugRecoverExecuteOpts.confirmAction {
case prompt:
_, _ = fmt.Fprintf(stderr, "\nProceed with staging plan [y/N] ")
reader := bufio.NewReader(os.Stdin)
line, err := reader.ReadString('\n')
if err != nil {
return errors.Wrap(err, "failed to read user input")
}
_, _ = fmt.Fprintf(stderr, "\n")
if len(line) < 1 || (line[0] != 'y' && line[0] != 'Y') {
_, _ = fmt.Fprint(stderr, "Aborted at user request\n")
return nil
}
case allYes:
// All actions enabled by default.
default:
return errors.New("Aborted by --confirm option")
}

maybeWrapStagingError := func(msg string, res *serverpb.RecoveryStagePlanResponse, err error) error {
if err != nil {
return errors.Wrapf(err, "%s", msg)
}
if len(res.Errors) > 0 {
return errors.Newf("%s:\n%s", msg, strings.Join(res.Errors, "\n"))
}
return nil
}

if len(conflicts) > 0 {
// We don't want to combine removing old plan and adding new one since it
// could produce cluster with multiple plans at the same time which could
// make situation worse.
res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true})
if err := maybeWrapStagingError("failed removing existing loss of quorum replica recovery plan from cluster", res, err); err != nil {
return err
}
}
sr, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true})
if err := maybeWrapStagingError("failed to stage loss of quorum recovery plan on cluster",
sr, err); err != nil {
return err
}

remoteArgs := getCLIClusterFlags(true, cmd, func(flag string) bool {
_, filter := planSpecificFlags[flag]
return filter
})

nodeSet := make(map[roachpb.NodeID]interface{})
for _, r := range plan.Updates {
nodeSet[r.NodeID()] = struct{}{}
}

_, _ = fmt.Fprintf(stderr, `Plan staged. To complete recovery restart nodes %s.
To verify recovery status invoke:
'cockroach debug recover verify %s %s'
`, joinIDs("n", sortedKeys(nodeSet)), remoteArgs, planFile)
return nil
}

func sortedKeys[T ~int32](set map[T]any) []T {
var sorted []T
for k := range set {
sorted = append(sorted, k)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] < sorted[j]
})
return sorted
}

func applyRecoveryToLocalStore(
ctx context.Context, nodeUpdates loqrecoverypb.ReplicaUpdatePlan,
) error {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

var localNodeID roachpb.NodeID
batches := make(map[roachpb.StoreID]storage.Batch)
for _, storeSpec := range debugRecoverExecuteOpts.Stores.Specs {
Expand All @@ -586,7 +718,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
defer store.Close()
defer batch.Close()

storeIdent, err := kvstorage.ReadStoreIdent(cmd.Context(), store)
storeIdent, err := kvstorage.ReadStoreIdent(ctx, store)
if err != nil {
return err
}
Expand All @@ -602,7 +734,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {

updateTime := timeutil.Now()
prepReport, err := loqrecovery.PrepareUpdateReplicas(
cmd.Context(), nodeUpdates, uuid.DefaultGenerator, updateTime, localNodeID, batches)
ctx, nodeUpdates, uuid.DefaultGenerator, updateTime, localNodeID, batches)
if err != nil {
return err
}
Expand All @@ -615,7 +747,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
if len(prepReport.UpdatedReplicas) == 0 {
if len(prepReport.MissingStores) > 0 {
return errors.Newf("stores %s expected on the node but no paths were provided",
joinStoreIDs(prepReport.MissingStores))
joinIDs("s", prepReport.MissingStores))
}
_, _ = fmt.Fprintf(stderr, "No updates planned on this node.\n")
return nil
Expand Down Expand Up @@ -653,14 +785,14 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {

// Apply batches to the stores.
applyReport, err := loqrecovery.CommitReplicaChanges(batches)
_, _ = fmt.Fprintf(stderr, "Updated store(s): %s\n", joinStoreIDs(applyReport.UpdatedStores))
_, _ = fmt.Fprintf(stderr, "Updated store(s): %s\n", joinIDs("s", applyReport.UpdatedStores))
return err
}

func joinStoreIDs(storeIDs []roachpb.StoreID) string {
storeNames := make([]string, 0, len(storeIDs))
for _, id := range storeIDs {
storeNames = append(storeNames, fmt.Sprintf("s%d", id))
func joinIDs[T ~int32](prefix string, ids []T) string {
storeNames := make([]string, 0, len(ids))
for _, id := range ids {
storeNames = append(storeNames, fmt.Sprintf("%s%d", prefix, id))
}
return strings.Join(storeNames, ", ")
}
Expand All @@ -681,7 +813,7 @@ func formatNodeStores(locations []loqrecovery.NodeStores, indent string) string
nodeDetails := make([]string, 0, len(locations))
for _, node := range locations {
nodeDetails = append(nodeDetails,
indent+fmt.Sprintf("n%d: store(s): %s", node.NodeID, joinStoreIDs(node.StoreIDs)))
indent+fmt.Sprintf("n%d: store(s): %s", node.NodeID, joinIDs("s", node.StoreIDs)))
}
return strings.Join(nodeDetails, "\n")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go_test(
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
Expand All @@ -91,7 +92,6 @@ go_test(
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_google_uuid//:uuid",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v2//:yaml_v2",
"@io_etcd_go_raft_v3//raftpb",
Expand Down
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,23 @@ message ReplicaRecoveryRecord {
// NodeRecoveryStatus contains information about loss of quorum recovery
// operations of a node.
message NodeRecoveryStatus {
// NodeID contains id of the node that status belongs to.
int32 node_id = 1 [(gogoproto.customname) = "NodeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"];
// PendingPlanID contains an ID or recovery plan that is staged on the node for
// application on the next restart.
bytes pending_plan_id = 1 [
bytes pending_plan_id = 2 [
(gogoproto.customname) = "PendingPlanID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
// AppliedPlanID contains an ID of recovery plan that was processed last.
// If plan application succeeded, then ApplyError will be nil, otherwise it will
// contain an error message.
bytes applied_plan_id = 2 [
bytes applied_plan_id = 3 [
(gogoproto.customname) = "AppliedPlanID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
// LastProcessingTime is a node wall clock time when last recovery plan was applied.
google.protobuf.Timestamp apply_timestamp = 3 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp apply_timestamp = 4 [(gogoproto.stdtime) = true];
// If most recent recovery plan application failed, Error will contain
// aggregated error messages containing all encountered errors.
string error = 4;
string error = 5;
}
Loading

0 comments on commit c8d92f3

Please sign in to comment.