From 9c6b9e9f1f4caa9630924225f780f84150ce9fc5 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Fri, 20 Jan 2023 15:19:22 +0000 Subject: [PATCH] loqrecovery,server: apply staged loqrecovery plans on start This commit adds loss of quorum recovery plan application functionality to server startup. Plans are staged during application phase and then applied when rolling restart of affected nodes is performed. Plan application peforms same actions that are performed by debug recovery apply-plan on offline storage. It is also updating loss of quorum recovery status stored in a store local key. Release note: None --- pkg/cli/debug_recover_loss_of_quorum.go | 9 +- pkg/keys/constants.go | 24 ++- pkg/keys/keys.go | 13 ++ pkg/keys/printer.go | 2 + pkg/keys/printer_test.go | 2 + pkg/kv/kvserver/loqrecovery/BUILD.bazel | 1 + pkg/kv/kvserver/loqrecovery/apply.go | 101 ++++++++- .../loqrecovery/loqrecoverypb/recovery.go | 4 + .../loqrecovery/loqrecoverypb/recovery.proto | 30 ++- pkg/kv/kvserver/loqrecovery/plan.go | 7 +- pkg/kv/kvserver/loqrecovery/record.go | 62 ++++++ .../kvserver/loqrecovery/recovery_env_test.go | 196 ++++++++++++++++-- pkg/kv/kvserver/loqrecovery/recovery_test.go | 8 +- .../testdata/half_online_application_plan | 86 ++++++++ .../half_online_apply_mismatching_plan | 61 ++++++ .../testdata/max_applied_voter_wins | 6 +- .../testdata/replica_changes_recorded | 7 +- pkg/server/BUILD.bazel | 1 + pkg/server/loss_of_quorum.go | 77 ++++++- pkg/server/server.go | 33 +-- pkg/util/strutil/util.go | 2 +- 21 files changed, 668 insertions(+), 64 deletions(-) create mode 100644 pkg/kv/kvserver/loqrecovery/testdata/half_online_application_plan create mode 100644 pkg/kv/kvserver/loqrecovery/testdata/half_online_apply_mismatching_plan diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index beff617d0c10..9495ef009516 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -376,7 +376,8 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error { ctx, replicas, deadStoreIDs, - deadNodeIDs) + deadNodeIDs, + uuid.DefaultGenerator) if err != nil { return err } @@ -613,8 +614,8 @@ func stageRecoveryOntoCluster( // Proposed report _, _ = fmt.Fprintf(stderr, "Proposed changes in plan %s:\n", plan.PlanID) for _, u := range plan.Updates { - _, _ = fmt.Fprintf(stderr, " range r%d:%s updating replica %s to %s and discarding all others.\n", - u.RangeID, roachpb.Key(u.StartKey), u.OldReplicaID, u.NextReplicaID) + _, _ = fmt.Fprintf(stderr, " range r%d:%s updating replica %s to %s on node n%d and discarding all others.\n", + u.RangeID, roachpb.Key(u.StartKey), u.OldReplicaID, u.NextReplicaID, u.NodeID()) } _, _ = fmt.Fprintf(stderr, "\nNodes %s will be marked as decommissioned.\n", strutil.JoinIDs("n", plan.DecommissionedNodeIDs)) @@ -690,7 +691,7 @@ To verify recovery status invoke: return nil } -func sortedKeys[T ~int32](set map[T]any) []T { +func sortedKeys[T ~int | ~int32 | ~int64](set map[T]any) []T { var sorted []T for k := range set { sorted = append(sorted, k) diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index a297d27944bb..0e5b9ef80be3 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -30,13 +30,6 @@ const ( tenantPrefixByte = '\xfe' ) -// Constants to subdivide unsafe loss of quorum recovery data into groups. -// Currently we only store keys as they are applied, but might benefit from -// archiving them to make them more "durable". -const ( - appliedUnsafeReplicaRecoveryPrefix = "applied" -) - // Constants for system-reserved keys in the KV map. // // Note: Preserve group-wise ordering when adding new constants. @@ -181,18 +174,31 @@ var ( // localStoreIdentSuffix stores an immutable identifier for this // store, created when the store is first bootstrapped. localStoreIdentSuffix = []byte("iden") + // localStoreLossOfQuorumRecoveryInfix is an infix for the group of keys used + // by loss of quorum recovery operations to track progress and outcome. + // This infix is followed by the suffix defining type of recovery key. + localStoreLossOfQuorumRecoveryInfix = []byte("loqr") // LocalStoreUnsafeReplicaRecoverySuffix is a suffix for temporary record // entries put when loss of quorum recovery operations are performed offline // on the store. // See StoreUnsafeReplicaRecoveryKey for details. - localStoreUnsafeReplicaRecoverySuffix = makeKey([]byte("loqr"), - []byte(appliedUnsafeReplicaRecoveryPrefix)) + localStoreUnsafeReplicaRecoverySuffix = makeKey(localStoreLossOfQuorumRecoveryInfix, + []byte("applied")) // LocalStoreUnsafeReplicaRecoveryKeyMin is the start of keyspace used to store // loss of quorum recovery record entries. LocalStoreUnsafeReplicaRecoveryKeyMin = MakeStoreKey(localStoreUnsafeReplicaRecoverySuffix, nil) // LocalStoreUnsafeReplicaRecoveryKeyMax is the end of keyspace used to store // loss of quorum recovery record entries. LocalStoreUnsafeReplicaRecoveryKeyMax = LocalStoreUnsafeReplicaRecoveryKeyMin.PrefixEnd() + // localStoreLossOfQuorumRecoveryStatusSuffix is a local key store suffix to + // store results of loss of quorum recovery plan application. + localStoreLossOfQuorumRecoveryStatusSuffix = makeKey(localStoreLossOfQuorumRecoveryInfix, + []byte("status")) + // localStoreLossOfQuorumRecoveryCleanupActionsSuffix is a local key store + // suffix to store information for loss of quorum recovery cleanup actions + // performed after node restart. + localStoreLossOfQuorumRecoveryCleanupActionsSuffix = makeKey(localStoreLossOfQuorumRecoveryInfix, + []byte("cleanup")) // localStoreNodeTombstoneSuffix stores key value pairs that map // nodeIDs to time of removal from cluster. localStoreNodeTombstoneSuffix = []byte("ntmb") diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index 90ac292fcb06..ee7b14fcc76c 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -124,6 +124,19 @@ func DecodeStoreCachedSettingsKey(key roachpb.Key) (settingKey roachpb.Key, err return } +// StoreLossOfQuorumRecoveryStatusKey is a key used for storing results of loss +// of quorum recovery plan application. +func StoreLossOfQuorumRecoveryStatusKey() roachpb.Key { + return MakeStoreKey(localStoreLossOfQuorumRecoveryStatusSuffix, nil) +} + +// StoreLossOfQuorumRecoveryCleanupActionsKey is a key used for storing data for +// post recovery cleanup actions node would perform after restart if plan was +// applied. +func StoreLossOfQuorumRecoveryCleanupActionsKey() roachpb.Key { + return MakeStoreKey(localStoreLossOfQuorumRecoveryCleanupActionsSuffix, nil) +} + // StoreUnsafeReplicaRecoveryKey creates a key for loss of quorum replica // recovery entry. Those keys are written by `debug recover apply-plan` command // on the store while node is stopped. Once node boots up, entries are diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index c498c1df8f75..6fa74e7b6b3f 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -144,6 +144,8 @@ var constSubKeyDict = []struct { {"/nodeTombstone", localStoreNodeTombstoneSuffix}, {"/cachedSettings", localStoreCachedSettingsSuffix}, {"/lossOfQuorumRecovery/applied", localStoreUnsafeReplicaRecoverySuffix}, + {"/lossOfQuorumRecovery/status", localStoreLossOfQuorumRecoveryStatusSuffix}, + {"/lossOfQuorumRecovery/cleanup", localStoreLossOfQuorumRecoveryCleanupActionsSuffix}, } func nodeTombstoneKeyPrint(buf *redact.StringBuilder, key roachpb.Key) { diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index 3b1da7f7082f..9f6b46e82ced 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -237,6 +237,8 @@ func TestPrettyPrint(t *testing.T) { {keys.StoreNodeTombstoneKey(123), "/Local/Store/nodeTombstone/n123", revertSupportUnknown}, {keys.StoreCachedSettingsKey(roachpb.Key("a")), `/Local/Store/cachedSettings/"a"`, revertSupportUnknown}, {keys.StoreUnsafeReplicaRecoveryKey(loqRecoveryID), fmt.Sprintf(`/Local/Store/lossOfQuorumRecovery/applied/%s`, loqRecoveryID), revertSupportUnknown}, + {keys.StoreLossOfQuorumRecoveryStatusKey(), "/Local/Store/lossOfQuorumRecovery/status", revertSupportUnknown}, + {keys.StoreLossOfQuorumRecoveryCleanupActionsKey(), "/Local/Store/lossOfQuorumRecovery/cleanup", revertSupportUnknown}, {keys.AbortSpanKey(roachpb.RangeID(1000001), txnID), fmt.Sprintf(`/Local/RangeID/1000001/r/AbortSpan/%q`, txnID), revertSupportUnknown}, {keys.RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState", revertSupportUnknown}, diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index 36b4a67227e9..88936905672d 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/retry", + "//pkg/util/strutil", "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/kv/kvserver/loqrecovery/apply.go b/pkg/kv/kvserver/loqrecovery/apply.go index ff7a56ee146c..509e72d0425f 100644 --- a/pkg/kv/kvserver/loqrecovery/apply.go +++ b/pkg/kv/kvserver/loqrecovery/apply.go @@ -17,11 +17,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/strutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -202,6 +206,11 @@ func applyReplicaUpdate( report.AlreadyUpdated = true return report, nil } + // Sanity check if removed replica ID matches one in the plan. + if _, ok := localDesc.Replicas().GetReplicaDescriptorByID(update.OldReplicaID); !ok { + return PrepareReplicaReport{}, errors.Errorf( + "can not find replica with ID %d for range r%d", update.OldReplicaID, update.RangeID) + } sl := stateloader.Make(localDesc.RangeID) ms, err := sl.LoadMVCCStats(ctx, readWriter) @@ -316,7 +325,6 @@ type ApplyUpdateReport struct { // second step of applying recovery plan. func CommitReplicaChanges(batches map[roachpb.StoreID]storage.Batch) (ApplyUpdateReport, error) { var report ApplyUpdateReport - failed := false var updateErrors []string // Commit changes to all stores. Stores could have pending changes if plan // contains replicas belonging to them, or have no changes if no replicas @@ -330,14 +338,101 @@ func CommitReplicaChanges(batches map[roachpb.StoreID]storage.Batch) (ApplyUpdat // If we fail here, we can only try to run the whole process from scratch // as this store is somehow broken. updateErrors = append(updateErrors, fmt.Sprintf("failed to update store s%d: %v", id, err)) - failed = true } else { report.UpdatedStores = append(report.UpdatedStores, id) } } - if failed { + if len(updateErrors) > 0 { return report, errors.Errorf( "failed to commit update to one or more stores: %s", strings.Join(updateErrors, "; ")) } return report, nil } + +// MaybeApplyPendingRecoveryPlan applies loss of quorum recovery plan if it is +// staged in planStore. Changes would be applied to engines when their +// identities match storeIDs of replicas in the plan. +// Plan applications errors like mismatch of store with plan, inability to +// deserialize values ets are only reported to logs and application status but +// are not propagated to caller. Only serious errors that imply misconfiguration +// or planStorage issues are propagated. +// Regardless of application success or failure, staged plan would be removed. +func MaybeApplyPendingRecoveryPlan( + ctx context.Context, planStore PlanStore, engines []storage.Engine, clock timeutil.TimeSource, +) error { + if len(engines) < 1 { + return nil + } + + applyPlan := func(nodeID roachpb.NodeID, plan loqrecoverypb.ReplicaUpdatePlan) error { + log.Infof(ctx, "applying staged loss of quorum recovery plan %s", plan.PlanID) + batches := make(map[roachpb.StoreID]storage.Batch) + for _, e := range engines { + ident, err := kvstorage.ReadStoreIdent(ctx, e) + if err != nil { + return errors.Wrap(err, "failed to read store ident when trying to apply loss of quorum recovery plan") + } + b := e.NewBatch() + defer b.Close() + batches[ident.StoreID] = b + } + prepRep, err := PrepareUpdateReplicas(ctx, plan, uuid.DefaultGenerator, clock.Now(), nodeID, batches) + if err != nil { + return err + } + if len(prepRep.MissingStores) > 0 { + log.Warningf(ctx, "loss of quorum recovery plan application expected stores on the node %s", + strutil.JoinIDs("s", prepRep.MissingStores)) + } + _, err = CommitReplicaChanges(batches) + if err != nil { + // This is not very good as are in a partial success situation, but we don't + // have a good solution other than report that as error. Let the user + // decide what to do next. + return err + } + return nil + } + + plan, exists, err := planStore.LoadPlan() + if err != nil { + // This is fatal error, we don't write application report since we didn't + // check the store yet. + return errors.Wrap(err, "failed to check if loss of quorum recovery plan is staged") + } + if !exists { + return nil + } + + // First read node parameters from the first store. + storeIdent, err := kvstorage.ReadStoreIdent(ctx, engines[0]) + if err != nil { + if errors.Is(err, &kvstorage.NotBootstrappedError{}) { + // This is wrong, we must not have staged plans in a non-bootstrapped + // node. But we can't write an error here as store init might refuse to + // work if there are already some keys in store. + log.Errorf(ctx, "node is not bootstrapped but it already has a recovery plan staged: %s", err) + return nil + } + return err + } + + if err := planStore.RemovePlan(); err != nil { + log.Errorf(ctx, "failed to remove loss of quorum recovery plan: %s", err) + } + + err = applyPlan(storeIdent.NodeID, plan) + r := loqrecoverypb.PlanApplicationResult{ + AppliedPlanID: plan.PlanID, + ApplyTimestamp: clock.Now(), + } + if err != nil { + r.Error = err.Error() + log.Errorf(ctx, "failed to apply staged loss of quorum recovery plan %s", err) + } + if err = writeNodeRecoveryResults(ctx, engines[0], r, + loqrecoverypb.DeferredRecoveryActions{DecommissionedNodeIDs: plan.DecommissionedNodeIDs}); err != nil { + log.Errorf(ctx, "failed to write loss of quorum recovery results to store: %s", err) + } + return nil +} diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go index fff394377321..9e50998bbb43 100644 --- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go +++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go @@ -140,3 +140,7 @@ func (m *ClusterReplicaInfo) ReplicaCount() (size int) { } return size } + +func (a DeferredRecoveryActions) Empty() bool { + return len(a.DecommissionedNodeIDs) == 0 +} diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto index 049b1e0a865d..6471811e2d87 100644 --- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto +++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto @@ -161,9 +161,37 @@ message NodeRecoveryStatus { bytes applied_plan_id = 3 [ (gogoproto.customname) = "AppliedPlanID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; - // LastProcessingTime is a node wall clock time when last recovery plan was applied. + // ApplyTimestamp is a node wall clock time when last recovery plan was applied. google.protobuf.Timestamp apply_timestamp = 4 [(gogoproto.stdtime) = true]; // If most recent recovery plan application failed, Error will contain // aggregated error messages containing all encountered errors. string error = 5; } + +// PlanApplicationResult is a value stored inside node local storage whenever +// loss of quorum recovery plan is applied for the purpose of tracking recovery +// progress. +message PlanApplicationResult { + bytes applied_plan_id = 1 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "AppliedPlanID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; + // LastProcessingTime is a node wall clock time when last recovery plan was applied. + google.protobuf.Timestamp apply_timestamp = 2 [ + (gogoproto.nullable) = false, + (gogoproto.stdtime) = true]; + // If most recent recovery plan application failed, Error will contain + // aggregated error messages containing all encountered errors. + string error = 3; +} + +// DeferredRecoveryActions contains data for recovery actions that need to be +// performed after node restarts if it applied a recovery plan. +message DeferredRecoveryActions { + // DecommissionedNodeIDs is a set of node IDs that need to be decommissioned + // when a node restarts. Those nodes were marked as decommissioned in the + // local node tombstone storage as a part of plan application, but are pending + // liveness info update. + repeated int32 decommissioned_node_ids = 1 [(gogoproto.customname) = "DecommissionedNodeIDs", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; +} diff --git a/pkg/kv/kvserver/loqrecovery/plan.go b/pkg/kv/kvserver/loqrecovery/plan.go index 84885c6df4b4..58b42070600b 100644 --- a/pkg/kv/kvserver/loqrecovery/plan.go +++ b/pkg/kv/kvserver/loqrecovery/plan.go @@ -122,7 +122,12 @@ func PlanReplicas( clusterInfo loqrecoverypb.ClusterReplicaInfo, deadStoreIDs []roachpb.StoreID, deadNodeIDs []roachpb.NodeID, + uuidGen uuid.Generator, ) (loqrecoverypb.ReplicaUpdatePlan, PlanningReport, error) { + planID, err := uuidGen.NewV4() + if err != nil { + return loqrecoverypb.ReplicaUpdatePlan{}, PlanningReport{}, err + } var replicas []loqrecoverypb.ReplicaInfo for _, node := range clusterInfo.LocalInfo { replicas = append(replicas, node.Replicas...) @@ -168,7 +173,7 @@ func PlanReplicas( return loqrecoverypb.ReplicaUpdatePlan{ Updates: updates, - PlanID: uuid.MakeV4(), + PlanID: planID, DecommissionedNodeIDs: decommissionNodeIDs, ClusterID: clusterInfo.ClusterID, }, report, err diff --git a/pkg/kv/kvserver/loqrecovery/record.go b/pkg/kv/kvserver/loqrecovery/record.go index d043268ec644..bbb0f454aeb6 100644 --- a/pkg/kv/kvserver/loqrecovery/record.go +++ b/pkg/kv/kvserver/loqrecovery/record.go @@ -18,6 +18,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -164,3 +166,63 @@ func UpdateRangeLogWithRecovery( } return nil } + +func writeNodeRecoveryResults( + ctx context.Context, + writer storage.ReadWriter, + result loqrecoverypb.PlanApplicationResult, + actions loqrecoverypb.DeferredRecoveryActions, +) error { + var fullErr error + err := storage.MVCCPutProto(ctx, writer, nil, keys.StoreLossOfQuorumRecoveryStatusKey(), + hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &result) + fullErr = errors.Wrap(err, "failed to write loss of quorum recovery plan application status") + if !actions.Empty() { + err = storage.MVCCPutProto(ctx, writer, nil, keys.StoreLossOfQuorumRecoveryCleanupActionsKey(), + hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &actions) + fullErr = errors.CombineErrors(fullErr, + errors.Wrap(err, "failed to write loss of quorum recovery cleanup action")) + } else { + _, err = storage.MVCCDelete(ctx, writer, nil, keys.StoreLossOfQuorumRecoveryCleanupActionsKey(), + hlc.Timestamp{}, hlc.ClockTimestamp{}, nil) + fullErr = errors.CombineErrors(fullErr, + errors.Wrap(err, "failed to clean loss of quorum recovery cleanup action")) + } + return fullErr +} + +func readNodeRecoveryStatusInfo( + ctx context.Context, reader storage.Reader, +) (loqrecoverypb.PlanApplicationResult, bool, error) { + var result loqrecoverypb.PlanApplicationResult + ok, err := storage.MVCCGetProto(ctx, reader, keys.StoreLossOfQuorumRecoveryStatusKey(), + hlc.Timestamp{}, &result, storage.MVCCGetOptions{}) + if err != nil { + log.Error(ctx, "failed to read loss of quorum recovery plan application status") + return loqrecoverypb.PlanApplicationResult{}, false, err + } + return result, ok, nil +} + +// ReadCleanupActionsInfo reads cleanup actions info if it is present in the +// reader. +func ReadCleanupActionsInfo( + ctx context.Context, writer storage.ReadWriter, +) (loqrecoverypb.DeferredRecoveryActions, bool, error) { + var result loqrecoverypb.DeferredRecoveryActions + exists, err := storage.MVCCGetProto(ctx, writer, keys.StoreLossOfQuorumRecoveryCleanupActionsKey(), + hlc.Timestamp{}, &result, storage.MVCCGetOptions{}) + if err != nil { + log.Errorf(ctx, "failed to read loss of quorum cleanup actions key: %s", err) + return loqrecoverypb.DeferredRecoveryActions{}, false, err + } + return result, exists, nil +} + +// RemoveCleanupActionsInfo removes cleanup actions info if it is present in the +// reader. +func RemoveCleanupActionsInfo(ctx context.Context, writer storage.ReadWriter) error { + _, err := storage.MVCCDelete(ctx, writer, nil, keys.StoreLossOfQuorumRecoveryCleanupActionsKey(), + hlc.Timestamp{}, hlc.ClockTimestamp{}, nil) + return err +} diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go index 68036eff3932..8258d293537b 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go @@ -12,6 +12,7 @@ package loqrecovery import ( "context" + "encoding/binary" "fmt" "sort" "strconv" @@ -37,10 +38,66 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" "go.etcd.io/raft/v3/raftpb" "gopkg.in/yaml.v2" ) +/* +quorumRecoveryEnv provides an environment to run data driven tests for loss of +quorum recovery. Provided commands cover replica info collection, change +planning, plan application and checking or generated recovery reports. +It doesn't cover plan and replica info serialization and actual cli commands and +interaction with user. + +Supported commands: +replication-data + + Loads following data into stores. All previously existing data is wiped. Only + range local information about replicas in populated together with optional + raft log entries. Data is created using proxy structs to avoid the need to + populate unnecessary fields. + +descriptor-data + + Initializes optional "meta range" info with provided descriptor data. + +collect-replica-info [stores=(1,2,3)] + + Collects replica info and saves into env. stores argument provides ids of + stores test wants to analyze. This allows test not to collect all replicas if + needed. If omitted, then all stores are collected. + +make-plan [stores=(1,2,3)|nodes=(1,2,3)] [force=] + + Creates a recovery plan based on replica info saved to env by + collect-replica-info command and optional descriptors. Stores or nodes args + provide optional list of dead nodes or dead stores which reflect cli command + line flags. force flag forces plan creation in presence of range + inconsistencies. + Resulting plan is saved into the environment. + +dump-store [stores=(1,2,3)] + + Prints content of the replica states in the store. If no stores are provided, + uses all populated stores. + +apply-plan [stores=(1,2,3)] [restart=] + + Applies recovery plan on specified stores. If no stores are provided, uses all + populated stores. If restart = false, simulates cli version of command where + application could fail and roll back with an error. If restart = true, + simulates half online approach where unattended operation succeeds but writes + potential error into store. + +dump-events [remove=] [status=] + + Dumps events about replica recovery found in stores. + If remove=true then corresponding method is told to remove events after + dumping in a way range log population does when consuming those events. + If status=true, half-online approach recovery report is dumped for each store. +*/ + // Range info used for test data to avoid providing unnecessary fields that are // not used in replica removal. type testReplicaInfo struct { @@ -136,6 +193,42 @@ type testDescriptorData struct { Generation roachpb.RangeGeneration `yaml:"Generation,omitempty"` } +type seqUUIDGen struct { + seq uint32 +} + +func NewSeqGen(val uint32) uuid.Generator { + return &seqUUIDGen{seq: val} +} + +func (g *seqUUIDGen) NewV1() (uuid.UUID, error) { + nextV1 := g.next() + nextV1.SetVersion(uuid.V1) + return nextV1, nil +} + +func (*seqUUIDGen) NewV3(uuid.UUID, string) uuid.UUID { + panic("not implemented") +} + +func (g *seqUUIDGen) NewV4() (uuid.UUID, error) { + nextV4 := g.next() + nextV4.SetVersion(uuid.V4) + return nextV4, nil +} + +func (*seqUUIDGen) NewV5(uuid.UUID, string) uuid.UUID { + panic("not implemented") +} + +func (g *seqUUIDGen) next() uuid.UUID { + next := uuid.UUID{} + binary.BigEndian.PutUint32(next[:], g.seq) + next.SetVariant(uuid.VariantRFC4122) + g.seq++ + return next +} + // Store with its owning NodeID for easier grouping by owning nodes. type wrappedStore struct { engine storage.Engine @@ -143,17 +236,21 @@ type wrappedStore struct { } type quorumRecoveryEnv struct { - // Stores with data + // Stores with data. clusterID uuid.UUID stores map[roachpb.StoreID]wrappedStore - // Optional mata ranges content + // Optional mata ranges content. meta []roachpb.RangeDescriptor - // Collected info from nodes + // Collected info from nodes. replicas loqrecoverypb.ClusterReplicaInfo - // plan to update replicas + // plan to update replicas. plan loqrecoverypb.ReplicaUpdatePlan + + // Helper resources to make time and identifiers predictable. + uuidGen uuid.Generator + clock timeutil.TimeSource } func (e *quorumRecoveryEnv) Handle(t *testing.T, d datadriven.TestData) string { @@ -459,7 +556,7 @@ func (e *quorumRecoveryEnv) handleMakePlan(t *testing.T, d datadriven.TestData) plan, report, err := PlanReplicas(context.Background(), loqrecoverypb.ClusterReplicaInfo{ Descriptors: e.replicas.Descriptors, LocalInfo: e.replicas.LocalInfo, - }, stores, nodes) + }, stores, nodes, e.uuidGen) if err != nil { return "", err } @@ -575,7 +672,7 @@ func iterateSelectedStores( for _, id := range storeIDs { wrappedStore, ok := stores[id] if !ok { - t.Fatalf("replica info requested from store that was not populated: %d", id) + t.Fatalf("attempt to get store that was not populated: %d", id) } f(wrappedStore.engine, wrappedStore.nodeID, id) } @@ -680,24 +777,50 @@ func (e *quorumRecoveryEnv) handleDumpStore(t *testing.T, d datadriven.TestData) func (e *quorumRecoveryEnv) handleApplyPlan(t *testing.T, d datadriven.TestData) (string, error) { ctx := context.Background() stores := e.parseStoresArg(t, d, true /* defaultToAll */) - nodes := e.groupStoresByNodeStore(t, stores) - defer func() { - for _, storeBatches := range nodes { - for _, b := range storeBatches { - b.Close() + var restart bool + if d.HasArg("restart") { + d.ScanArgs(t, "restart", &restart) + } + + if !restart { + nodes := e.groupStoresByNodeStore(t, stores) + defer func() { + for _, storeBatches := range nodes { + for _, b := range storeBatches { + b.Close() + } + } + }() + updateTime := timeutil.Now() + for nodeID, stores := range nodes { + _, err := PrepareUpdateReplicas(ctx, e.plan, e.uuidGen, updateTime, nodeID, + stores) + if err != nil { + return "", err + } + if _, err = CommitReplicaChanges(stores); err != nil { + return "", err } } - }() - updateTime := timeutil.Now() - for nodeID, stores := range nodes { - _, err := PrepareUpdateReplicas(ctx, e.plan, uuid.DefaultGenerator, updateTime, nodeID, stores) - if err != nil { - return "", err + return "ok", nil + } + + nodes := e.groupStoresByNode(t, stores) + for _, stores := range nodes { + ps := PlanStore{ + path: "", + fs: vfs.NewMem(), } - if _, err := CommitReplicaChanges(stores); err != nil { + if err := ps.SavePlan(e.plan); err != nil { + t.Fatal("failed to save plan to plan store", err) + } + + err := MaybeApplyPendingRecoveryPlan(ctx, ps, stores, e.clock) + if err != nil { return "", err } } + return "ok", nil } @@ -717,22 +840,51 @@ func (e *quorumRecoveryEnv) dumpRecoveryEvents( if d.HasArg("remove") { d.ScanArgs(t, "remove", &removeEvents) } + dumpStatus := false + if d.HasArg("status") { + d.ScanArgs(t, "status", &dumpStatus) + } var events []string logEvents := func(ctx context.Context, record loqrecoverypb.ReplicaRecoveryRecord) (bool, error) { event := record.AsStructuredLog() - events = append(events, fmt.Sprintf("Updated range r%d, Key:%s, Store:s%d ReplicaID:%d", + events = append(events, fmt.Sprintf("updated range r%d, Key:%s, Store:s%d ReplicaID:%d", event.RangeID, event.StartKey, event.StoreID, event.UpdatedReplicaID)) return removeEvents, nil } + var statuses []string stores := e.parseStoresArg(t, d, true /* defaultToAll */) - for _, store := range stores { - if _, err := RegisterOfflineRecoveryEvents(ctx, e.stores[store].engine, logEvents); err != nil { + for _, storeID := range stores { + store, ok := e.stores[storeID] + if !ok { + t.Fatalf("store s%d doesn't exist, but event dump is requested for it", store) + } + if _, err := RegisterOfflineRecoveryEvents(ctx, store.engine, logEvents); err != nil { + return "", err + } + status, ok, err := readNodeRecoveryStatusInfo(ctx, store.engine) + if err != nil { return "", err } + if ok { + statuses = append(statuses, + fmt.Sprintf("node n%d applied plan %s at %s:%s", store.nodeID, status.AppliedPlanID, status.ApplyTimestamp, + status.Error)) + } + } + + var sb strings.Builder + if len(events) > 0 { + sb.WriteString(fmt.Sprintf("Events:\n%s\n", strings.Join(events, "\n"))) } - return strings.Join(events, "\n"), nil + if dumpStatus && len(statuses) > 0 { + sb.WriteString(fmt.Sprintf("Statuses:\n%s", strings.Join(statuses, "\n"))) + } + if sb.Len() > 0 { + return sb.String(), nil + } + return "ok", nil } func descriptorView(desc roachpb.RangeDescriptor) storeDescriptorView { diff --git a/pkg/kv/kvserver/loqrecovery/recovery_test.go b/pkg/kv/kvserver/loqrecovery/recovery_test.go index cb39cf70f41a..cdd3e30db5a6 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_test.go @@ -12,11 +12,13 @@ package loqrecovery import ( "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" ) @@ -25,7 +27,11 @@ func TestQuorumRecovery(t *testing.T) { defer leaktest.AfterTest(t)() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { - env := quorumRecoveryEnv{} + testTime, _ := time.Parse(time.RFC3339, "2022-02-24T01:40:00Z") + env := quorumRecoveryEnv{ + uuidGen: NewSeqGen(1), + clock: timeutil.NewManualTime(testTime), + } defer env.cleanupStores() datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { return env.Handle(t, *d) diff --git a/pkg/kv/kvserver/loqrecovery/testdata/half_online_application_plan b/pkg/kv/kvserver/loqrecovery/testdata/half_online_application_plan new file mode 100644 index 000000000000..aae83511d7bd --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/testdata/half_online_application_plan @@ -0,0 +1,86 @@ +# Test verifying that changes to replicas are recorded and records +# are translated to logging upon restarts. +# To do that, we generate replicas without quorum in two stores, +# perform recovery and assert the dump of recovery events. + +replication-data +- StoreID: 1 + RangeID: 1 + StartKey: /Min + EndKey: /Table/1 + Replicas: + - { NodeID: 1, StoreID: 1, ReplicaID: 1} # Designated replica in this store + - { NodeID: 4, StoreID: 4, ReplicaID: 4} + - { NodeID: 5, StoreID: 5, ReplicaID: 5} + RangeAppliedIndex: 11 + RaftCommittedIndex: 13 +- StoreID: 1 + RangeID: 2 + StartKey: /Table/1 + EndKey: /Table/5 + Replicas: + - { NodeID: 1, StoreID: 1, ReplicaID: 1} # Designated replica in this store + - { NodeID: 4, StoreID: 4, ReplicaID: 4} + - { NodeID: 5, StoreID: 5, ReplicaID: 5} + RangeAppliedIndex: 11 + RaftCommittedIndex: 13 +- StoreID: 2 + RangeID: 3 + StartKey: /Table/5 + EndKey: /Max + Replicas: + - { NodeID: 2, StoreID: 2, ReplicaID: 2} # Designated replica in this store + - { NodeID: 4, StoreID: 4, ReplicaID: 4} + - { NodeID: 5, StoreID: 5, ReplicaID: 5} + RangeAppliedIndex: 10 + RaftCommittedIndex: 13 +---- +ok + +collect-replica-info stores=(1,2) +---- +ok + +make-plan +---- +Replica updates: +- RangeID: 1 + StartKey: /Min + OldReplicaID: 1 + NewReplica: + NodeID: 1 + StoreID: 1 + ReplicaID: 16 + NextReplicaID: 17 +- RangeID: 2 + StartKey: /Table/1 + OldReplicaID: 1 + NewReplica: + NodeID: 1 + StoreID: 1 + ReplicaID: 16 + NextReplicaID: 17 +- RangeID: 3 + StartKey: /Table/5 + OldReplicaID: 2 + NewReplica: + NodeID: 2 + StoreID: 2 + ReplicaID: 16 + NextReplicaID: 17 +Decommissioned nodes: +[n4, n5] + +apply-plan stores=(1,2) restart=true +---- +ok + +dump-events stores=(1,2) remove=true status=true +---- +Events: +updated range r1, Key:/Min, Store:s1 ReplicaID:16 +updated range r2, Key:/Table/1, Store:s1 ReplicaID:16 +updated range r3, Key:/Table/5, Store:s2 ReplicaID:16 +Statuses: +node n1 applied plan 00000001-0000-4000-8000-000000000000 at 2022-02-24 01:40:00 +0000 UTC: +node n2 applied plan 00000001-0000-4000-8000-000000000000 at 2022-02-24 01:40:00 +0000 UTC: diff --git a/pkg/kv/kvserver/loqrecovery/testdata/half_online_apply_mismatching_plan b/pkg/kv/kvserver/loqrecovery/testdata/half_online_apply_mismatching_plan new file mode 100644 index 000000000000..cc19f4ab09a4 --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/testdata/half_online_apply_mismatching_plan @@ -0,0 +1,61 @@ +# Test verifying that if node fails to apply plan on restart +# it records an error into the status. +# To do that we swap replication data from under the recovery +# after creating plan so that replica in the store doesn't +# match initial one. + +replication-data +- StoreID: 1 + RangeID: 1 + StartKey: /Min + EndKey: /Max + Replicas: + - { NodeID: 1, StoreID: 1, ReplicaID: 1} # Designated replica in this store + - { NodeID: 4, StoreID: 4, ReplicaID: 4} + - { NodeID: 5, StoreID: 5, ReplicaID: 5} + RangeAppliedIndex: 11 + RaftCommittedIndex: 13 +---- +ok + +collect-replica-info stores=(1) +---- +ok + +make-plan +---- +Replica updates: +- RangeID: 1 + StartKey: /Min + OldReplicaID: 1 + NewReplica: + NodeID: 1 + StoreID: 1 + ReplicaID: 16 + NextReplicaID: 17 +Decommissioned nodes: +[n4, n5] + +# We now replace a replica with a different one to confuse plan application. +replication-data +- StoreID: 1 + RangeID: 1 + StartKey: /Min + EndKey: /Max + Replicas: + - { NodeID: 1, StoreID: 1, ReplicaID: 6} # New designated replica in this store + - { NodeID: 4, StoreID: 4, ReplicaID: 4} + - { NodeID: 5, StoreID: 5, ReplicaID: 5} + RangeAppliedIndex: 18 + RaftCommittedIndex: 18 +---- +ok + +apply-plan stores=(1) restart=true +---- +ok + +dump-events stores=(1) status=true +---- +Statuses: +node n1 applied plan 00000001-0000-4000-8000-000000000000 at 2022-02-24 01:40:00 +0000 UTC:failed to prepare update replica for range r1 on store s1: can not find replica with ID 1 for range r1 diff --git a/pkg/kv/kvserver/loqrecovery/testdata/max_applied_voter_wins b/pkg/kv/kvserver/loqrecovery/testdata/max_applied_voter_wins index 79f57f80b6cd..5b399cd87fd8 100644 --- a/pkg/kv/kvserver/loqrecovery/testdata/max_applied_voter_wins +++ b/pkg/kv/kvserver/loqrecovery/testdata/max_applied_voter_wins @@ -82,7 +82,8 @@ dump-store stores=(1,2) dump-events stores=(1,2) ---- -Updated range r1, Key:/Min, Store:s1 ReplicaID:16 +Events: +updated range r1, Key:/Min, Store:s1 ReplicaID:16 # Second use case where stale replica which remained from before split @@ -344,4 +345,5 @@ dump-store stores=(1,2) dump-events stores=(1,2) ---- -Updated range r1, Key:/Min, Store:s1 ReplicaID:16 +Events: +updated range r1, Key:/Min, Store:s1 ReplicaID:16 diff --git a/pkg/kv/kvserver/loqrecovery/testdata/replica_changes_recorded b/pkg/kv/kvserver/loqrecovery/testdata/replica_changes_recorded index 2e3cb5ed5e55..6280c5266bb4 100644 --- a/pkg/kv/kvserver/loqrecovery/testdata/replica_changes_recorded +++ b/pkg/kv/kvserver/loqrecovery/testdata/replica_changes_recorded @@ -77,9 +77,10 @@ ok dump-events stores=(1,2) remove=true ---- -Updated range r1, Key:/Min, Store:s1 ReplicaID:16 -Updated range r2, Key:/Table/1, Store:s1 ReplicaID:16 -Updated range r3, Key:/Table/5, Store:s2 ReplicaID:16 +Events: +updated range r1, Key:/Min, Store:s1 ReplicaID:16 +updated range r2, Key:/Table/1, Store:s1 ReplicaID:16 +updated range r3, Key:/Table/5, Store:s2 ReplicaID:16 # We now try to dump stores once again to verify that record # consumer cleans up events if it passed them on successfully. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 201613853199..8238038e0c6d 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -272,6 +272,7 @@ go_library( "//pkg/util/retry", "//pkg/util/schedulerlatency", "//pkg/util/stop", + "//pkg/util/strutil", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/timeutil/ptp", diff --git a/pkg/server/loss_of_quorum.go b/pkg/server/loss_of_quorum.go index 8b6c6b93e7b8..aa910d324f7a 100644 --- a/pkg/server/loss_of_quorum.go +++ b/pkg/server/loss_of_quorum.go @@ -12,14 +12,20 @@ package server import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/strutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/vfs" ) @@ -83,8 +89,12 @@ func logPendingLossOfQuorumRecoveryEvents(ctx context.Context, stores *kvserver. } } -func publishPendingLossOfQuorumRecoveryEvents( - ctx context.Context, ie isql.Executor, stores *kvserver.Stores, stopper *stop.Stopper, +func maybeRunLossOfQuorumRecoveryCleanup( + ctx context.Context, + ie isql.Executor, + stores *kvserver.Stores, + server *Server, + stopper *stop.Stopper, ) { _ = stopper.RunAsyncTask(ctx, "publish-loss-of-quorum-events", func(ctx context.Context) { if err := stores.VisitStores(func(s *kvserver.Store) error { @@ -116,4 +126,67 @@ func publishPendingLossOfQuorumRecoveryEvents( log.Errorf(ctx, "failed to update range log with loss of quorum recovery events: %v", err) } }) + + var cleanup loqrecoverypb.DeferredRecoveryActions + var actionsSource storage.ReadWriter + err := stores.VisitStores(func(s *kvserver.Store) error { + c, found, err := loqrecovery.ReadCleanupActionsInfo(ctx, s.Engine()) + if err != nil { + log.Errorf(ctx, "failed to read loss of quorum recovery cleanup actions info from store: %s", err) + return nil + } + if found { + cleanup = c + actionsSource = s.Engine() + return iterutil.StopIteration() + } + return nil + }) + if err := iterutil.Map(err); err != nil { + log.Infof(ctx, "failed to iterate node stores while searching for loq recovery cleanup info: %s", err) + return + } + if len(cleanup.DecommissionedNodeIDs) == 0 { + return + } + _ = stopper.RunAsyncTask(ctx, "maybe-mark-nodes-as-decommissioned", func(ctx context.Context) { + log.Infof(ctx, "loss of quorum recovery decommissioning removed nodes %s", + strutil.JoinIDs("n", cleanup.DecommissionedNodeIDs)) + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Second, + MaxBackoff: time.Hour, + Multiplier: 2, + } + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + // Nodes are already dead, but are in active state. Internal checks doesn't + // allow us throwing nodes away, and they need to go through legal state + // transitions within liveness to succeed. To achieve that we mark nodes as + // decommissioning first, followed by decommissioned. + // Those operations may fail because other nodes could be restarted + // concurrently and also trying this cleanup. We rely on retry and change + // being idempotent for operation to complete. + // Mind that it is valid to mark decommissioned nodes as decommissioned and + // that would result in a noop, so it is safe to always go through this + // cycle without prior checks for current state. + err := server.Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, + cleanup.DecommissionedNodeIDs) + if err != nil { + log.Infof(ctx, + "loss of quorum recovery cleanup failed to decommissioning dead nodes, this is ok as cluster might not be healed yet: %s", err) + continue + } + err = server.Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONED, + cleanup.DecommissionedNodeIDs) + if err != nil { + log.Infof(ctx, + "loss of quorum recovery cleanup failed to decommissioning dead nodes, this is ok as cluster might not be healed yet: %s", err) + continue + } + if err = loqrecovery.RemoveCleanupActionsInfo(ctx, actionsSource); err != nil { + log.Infof(ctx, "failed to remove ") + } + break + } + log.Infof(ctx, "loss of quorum recovery cleanup finished decommissioning removed nodes") + }) } diff --git a/pkg/server/server.go b/pkg/server/server.go index 4c0ab095a0a1..e8fc1ae9b3e8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -253,6 +253,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } stopper.AddCloser(&engines) + // Loss of quorum recovery store is created and pending plan is applied to + // engines as soon as engines are created and before any data is read in a + // way similar to offline engine content patching. + planStore, err := newPlanStore(cfg) + if err != nil { + return nil, errors.Wrap(err, "failed to create loss of quorum plan store") + } + if err := loqrecovery.MaybeApplyPendingRecoveryPlan(ctx, planStore, engines, timeutil.DefaultTimeSource{}); err != nil { + return nil, errors.Wrap(err, "failed to apply loss of quorum recovery plan") + } + nodeTombStorage, checkPingFor := getPingCheckDecommissionFn(engines) g := gossip.New( @@ -1025,13 +1036,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { }, ) - // TODO(oleg): plan store creation needs to move to the start of this method - // right after stores are created. We need it to retrieve pending plan and - // patch replicas before any initialization occurs. - planStore, err := newPlanStore(cfg) - if err != nil { - return nil, errors.Wrap(err, "failed to create loss of quorum recovery server") - } recoveryServer := loqrecovery.NewServer( nodeIDContainer, stores, @@ -1851,16 +1855,15 @@ func (s *Server) PreStart(ctx context.Context) error { return errors.Wrapf(err, "failed to start KV prober") } - // As final stage of loss of quorum recovery, write events into corresponding - // range logs. We do it as a separate stage to log events early just in case - // startup fails, and write to range log once the server is running as we need - // to run sql statements to update rangelog. - publishPendingLossOfQuorumRecoveryEvents( - workersCtx, + // Perform loss of quorum recovery cleanup if any actions were scheduled. + // Cleanup actions rely on node being connected to the cluster and hopefully + // in a healthy or healthier stats to update node liveness records. + maybeRunLossOfQuorumRecoveryCleanup( + ctx, s.node.execCfg.InternalDB.Executor(), s.node.stores, - s.stopper, - ) + s, + s.stopper) log.Event(ctx, "server initialized") diff --git a/pkg/util/strutil/util.go b/pkg/util/strutil/util.go index cfc73024043b..c548535b9df5 100644 --- a/pkg/util/strutil/util.go +++ b/pkg/util/strutil/util.go @@ -39,7 +39,7 @@ func AppendInt(b []byte, x int, width int) []byte { // JoinIDs joins a slice of any ids into a comma separated string. Each ID could // be prefixed with a string (e.g. n1, n2, n3 to represent nodes). -func JoinIDs[T ~int|~int32|~int64](prefix string, ids []T) string { +func JoinIDs[T ~int | ~int32 | ~int64](prefix string, ids []T) string { idNames := make([]string, 0, len(ids)) for _, id := range ids { idNames = append(idNames, fmt.Sprintf("%s%d", prefix, id))