Skip to content

Commit

Permalink
loqrecovery,server: apply staged loqrecovery plans on start
Browse files Browse the repository at this point in the history
This commit adds loss of quorum recovery plan application functionality
to server startup. Plans are staged during application phase and then
applied when rolling restart of affected nodes is performed.
Plan application peforms same actions that are performed by debug
recovery apply-plan on offline storage. It is also updating loss of
quorum recovery status stored in a store local key.

Release note: None
  • Loading branch information
aliher1911 committed Jan 31, 2023
1 parent 62843cc commit 9c6b9e9
Show file tree
Hide file tree
Showing 21 changed files with 668 additions and 64 deletions.
9 changes: 5 additions & 4 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error {
ctx,
replicas,
deadStoreIDs,
deadNodeIDs)
deadNodeIDs,
uuid.DefaultGenerator)
if err != nil {
return err
}
Expand Down Expand Up @@ -613,8 +614,8 @@ func stageRecoveryOntoCluster(
// Proposed report
_, _ = fmt.Fprintf(stderr, "Proposed changes in plan %s:\n", plan.PlanID)
for _, u := range plan.Updates {
_, _ = fmt.Fprintf(stderr, " range r%d:%s updating replica %s to %s and discarding all others.\n",
u.RangeID, roachpb.Key(u.StartKey), u.OldReplicaID, u.NextReplicaID)
_, _ = fmt.Fprintf(stderr, " range r%d:%s updating replica %s to %s on node n%d and discarding all others.\n",
u.RangeID, roachpb.Key(u.StartKey), u.OldReplicaID, u.NextReplicaID, u.NodeID())
}
_, _ = fmt.Fprintf(stderr, "\nNodes %s will be marked as decommissioned.\n", strutil.JoinIDs("n", plan.DecommissionedNodeIDs))

Expand Down Expand Up @@ -690,7 +691,7 @@ To verify recovery status invoke:
return nil
}

func sortedKeys[T ~int32](set map[T]any) []T {
func sortedKeys[T ~int | ~int32 | ~int64](set map[T]any) []T {
var sorted []T
for k := range set {
sorted = append(sorted, k)
Expand Down
24 changes: 15 additions & 9 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ const (
tenantPrefixByte = '\xfe'
)

// Constants to subdivide unsafe loss of quorum recovery data into groups.
// Currently we only store keys as they are applied, but might benefit from
// archiving them to make them more "durable".
const (
appliedUnsafeReplicaRecoveryPrefix = "applied"
)

// Constants for system-reserved keys in the KV map.
//
// Note: Preserve group-wise ordering when adding new constants.
Expand Down Expand Up @@ -181,18 +174,31 @@ var (
// localStoreIdentSuffix stores an immutable identifier for this
// store, created when the store is first bootstrapped.
localStoreIdentSuffix = []byte("iden")
// localStoreLossOfQuorumRecoveryInfix is an infix for the group of keys used
// by loss of quorum recovery operations to track progress and outcome.
// This infix is followed by the suffix defining type of recovery key.
localStoreLossOfQuorumRecoveryInfix = []byte("loqr")
// LocalStoreUnsafeReplicaRecoverySuffix is a suffix for temporary record
// entries put when loss of quorum recovery operations are performed offline
// on the store.
// See StoreUnsafeReplicaRecoveryKey for details.
localStoreUnsafeReplicaRecoverySuffix = makeKey([]byte("loqr"),
[]byte(appliedUnsafeReplicaRecoveryPrefix))
localStoreUnsafeReplicaRecoverySuffix = makeKey(localStoreLossOfQuorumRecoveryInfix,
[]byte("applied"))
// LocalStoreUnsafeReplicaRecoveryKeyMin is the start of keyspace used to store
// loss of quorum recovery record entries.
LocalStoreUnsafeReplicaRecoveryKeyMin = MakeStoreKey(localStoreUnsafeReplicaRecoverySuffix, nil)
// LocalStoreUnsafeReplicaRecoveryKeyMax is the end of keyspace used to store
// loss of quorum recovery record entries.
LocalStoreUnsafeReplicaRecoveryKeyMax = LocalStoreUnsafeReplicaRecoveryKeyMin.PrefixEnd()
// localStoreLossOfQuorumRecoveryStatusSuffix is a local key store suffix to
// store results of loss of quorum recovery plan application.
localStoreLossOfQuorumRecoveryStatusSuffix = makeKey(localStoreLossOfQuorumRecoveryInfix,
[]byte("status"))
// localStoreLossOfQuorumRecoveryCleanupActionsSuffix is a local key store
// suffix to store information for loss of quorum recovery cleanup actions
// performed after node restart.
localStoreLossOfQuorumRecoveryCleanupActionsSuffix = makeKey(localStoreLossOfQuorumRecoveryInfix,
[]byte("cleanup"))
// localStoreNodeTombstoneSuffix stores key value pairs that map
// nodeIDs to time of removal from cluster.
localStoreNodeTombstoneSuffix = []byte("ntmb")
Expand Down
13 changes: 13 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ func DecodeStoreCachedSettingsKey(key roachpb.Key) (settingKey roachpb.Key, err
return
}

// StoreLossOfQuorumRecoveryStatusKey is a key used for storing results of loss
// of quorum recovery plan application.
func StoreLossOfQuorumRecoveryStatusKey() roachpb.Key {
return MakeStoreKey(localStoreLossOfQuorumRecoveryStatusSuffix, nil)
}

// StoreLossOfQuorumRecoveryCleanupActionsKey is a key used for storing data for
// post recovery cleanup actions node would perform after restart if plan was
// applied.
func StoreLossOfQuorumRecoveryCleanupActionsKey() roachpb.Key {
return MakeStoreKey(localStoreLossOfQuorumRecoveryCleanupActionsSuffix, nil)
}

// StoreUnsafeReplicaRecoveryKey creates a key for loss of quorum replica
// recovery entry. Those keys are written by `debug recover apply-plan` command
// on the store while node is stopped. Once node boots up, entries are
Expand Down
2 changes: 2 additions & 0 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ var constSubKeyDict = []struct {
{"/nodeTombstone", localStoreNodeTombstoneSuffix},
{"/cachedSettings", localStoreCachedSettingsSuffix},
{"/lossOfQuorumRecovery/applied", localStoreUnsafeReplicaRecoverySuffix},
{"/lossOfQuorumRecovery/status", localStoreLossOfQuorumRecoveryStatusSuffix},
{"/lossOfQuorumRecovery/cleanup", localStoreLossOfQuorumRecoveryCleanupActionsSuffix},
}

func nodeTombstoneKeyPrint(buf *redact.StringBuilder, key roachpb.Key) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ func TestPrettyPrint(t *testing.T) {
{keys.StoreNodeTombstoneKey(123), "/Local/Store/nodeTombstone/n123", revertSupportUnknown},
{keys.StoreCachedSettingsKey(roachpb.Key("a")), `/Local/Store/cachedSettings/"a"`, revertSupportUnknown},
{keys.StoreUnsafeReplicaRecoveryKey(loqRecoveryID), fmt.Sprintf(`/Local/Store/lossOfQuorumRecovery/applied/%s`, loqRecoveryID), revertSupportUnknown},
{keys.StoreLossOfQuorumRecoveryStatusKey(), "/Local/Store/lossOfQuorumRecovery/status", revertSupportUnknown},
{keys.StoreLossOfQuorumRecoveryCleanupActionsKey(), "/Local/Store/lossOfQuorumRecovery/cleanup", revertSupportUnknown},

{keys.AbortSpanKey(roachpb.RangeID(1000001), txnID), fmt.Sprintf(`/Local/RangeID/1000001/r/AbortSpan/%q`, txnID), revertSupportUnknown},
{keys.RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState", revertSupportUnknown},
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/loqrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/strutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
Expand Down
101 changes: 98 additions & 3 deletions pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/strutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -202,6 +206,11 @@ func applyReplicaUpdate(
report.AlreadyUpdated = true
return report, nil
}
// Sanity check if removed replica ID matches one in the plan.
if _, ok := localDesc.Replicas().GetReplicaDescriptorByID(update.OldReplicaID); !ok {
return PrepareReplicaReport{}, errors.Errorf(
"can not find replica with ID %d for range r%d", update.OldReplicaID, update.RangeID)
}

sl := stateloader.Make(localDesc.RangeID)
ms, err := sl.LoadMVCCStats(ctx, readWriter)
Expand Down Expand Up @@ -316,7 +325,6 @@ type ApplyUpdateReport struct {
// second step of applying recovery plan.
func CommitReplicaChanges(batches map[roachpb.StoreID]storage.Batch) (ApplyUpdateReport, error) {
var report ApplyUpdateReport
failed := false
var updateErrors []string
// Commit changes to all stores. Stores could have pending changes if plan
// contains replicas belonging to them, or have no changes if no replicas
Expand All @@ -330,14 +338,101 @@ func CommitReplicaChanges(batches map[roachpb.StoreID]storage.Batch) (ApplyUpdat
// If we fail here, we can only try to run the whole process from scratch
// as this store is somehow broken.
updateErrors = append(updateErrors, fmt.Sprintf("failed to update store s%d: %v", id, err))
failed = true
} else {
report.UpdatedStores = append(report.UpdatedStores, id)
}
}
if failed {
if len(updateErrors) > 0 {
return report, errors.Errorf(
"failed to commit update to one or more stores: %s", strings.Join(updateErrors, "; "))
}
return report, nil
}

// MaybeApplyPendingRecoveryPlan applies loss of quorum recovery plan if it is
// staged in planStore. Changes would be applied to engines when their
// identities match storeIDs of replicas in the plan.
// Plan applications errors like mismatch of store with plan, inability to
// deserialize values ets are only reported to logs and application status but
// are not propagated to caller. Only serious errors that imply misconfiguration
// or planStorage issues are propagated.
// Regardless of application success or failure, staged plan would be removed.
func MaybeApplyPendingRecoveryPlan(
ctx context.Context, planStore PlanStore, engines []storage.Engine, clock timeutil.TimeSource,
) error {
if len(engines) < 1 {
return nil
}

applyPlan := func(nodeID roachpb.NodeID, plan loqrecoverypb.ReplicaUpdatePlan) error {
log.Infof(ctx, "applying staged loss of quorum recovery plan %s", plan.PlanID)
batches := make(map[roachpb.StoreID]storage.Batch)
for _, e := range engines {
ident, err := kvstorage.ReadStoreIdent(ctx, e)
if err != nil {
return errors.Wrap(err, "failed to read store ident when trying to apply loss of quorum recovery plan")
}
b := e.NewBatch()
defer b.Close()
batches[ident.StoreID] = b
}
prepRep, err := PrepareUpdateReplicas(ctx, plan, uuid.DefaultGenerator, clock.Now(), nodeID, batches)
if err != nil {
return err
}
if len(prepRep.MissingStores) > 0 {
log.Warningf(ctx, "loss of quorum recovery plan application expected stores on the node %s",
strutil.JoinIDs("s", prepRep.MissingStores))
}
_, err = CommitReplicaChanges(batches)
if err != nil {
// This is not very good as are in a partial success situation, but we don't
// have a good solution other than report that as error. Let the user
// decide what to do next.
return err
}
return nil
}

plan, exists, err := planStore.LoadPlan()
if err != nil {
// This is fatal error, we don't write application report since we didn't
// check the store yet.
return errors.Wrap(err, "failed to check if loss of quorum recovery plan is staged")
}
if !exists {
return nil
}

// First read node parameters from the first store.
storeIdent, err := kvstorage.ReadStoreIdent(ctx, engines[0])
if err != nil {
if errors.Is(err, &kvstorage.NotBootstrappedError{}) {
// This is wrong, we must not have staged plans in a non-bootstrapped
// node. But we can't write an error here as store init might refuse to
// work if there are already some keys in store.
log.Errorf(ctx, "node is not bootstrapped but it already has a recovery plan staged: %s", err)
return nil
}
return err
}

if err := planStore.RemovePlan(); err != nil {
log.Errorf(ctx, "failed to remove loss of quorum recovery plan: %s", err)
}

err = applyPlan(storeIdent.NodeID, plan)
r := loqrecoverypb.PlanApplicationResult{
AppliedPlanID: plan.PlanID,
ApplyTimestamp: clock.Now(),
}
if err != nil {
r.Error = err.Error()
log.Errorf(ctx, "failed to apply staged loss of quorum recovery plan %s", err)
}
if err = writeNodeRecoveryResults(ctx, engines[0], r,
loqrecoverypb.DeferredRecoveryActions{DecommissionedNodeIDs: plan.DecommissionedNodeIDs}); err != nil {
log.Errorf(ctx, "failed to write loss of quorum recovery results to store: %s", err)
}
return nil
}
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,7 @@ func (m *ClusterReplicaInfo) ReplicaCount() (size int) {
}
return size
}

func (a DeferredRecoveryActions) Empty() bool {
return len(a.DecommissionedNodeIDs) == 0
}
30 changes: 29 additions & 1 deletion pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,37 @@ message NodeRecoveryStatus {
bytes applied_plan_id = 3 [
(gogoproto.customname) = "AppliedPlanID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
// LastProcessingTime is a node wall clock time when last recovery plan was applied.
// ApplyTimestamp is a node wall clock time when last recovery plan was applied.
google.protobuf.Timestamp apply_timestamp = 4 [(gogoproto.stdtime) = true];
// If most recent recovery plan application failed, Error will contain
// aggregated error messages containing all encountered errors.
string error = 5;
}

// PlanApplicationResult is a value stored inside node local storage whenever
// loss of quorum recovery plan is applied for the purpose of tracking recovery
// progress.
message PlanApplicationResult {
bytes applied_plan_id = 1 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "AppliedPlanID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
// LastProcessingTime is a node wall clock time when last recovery plan was applied.
google.protobuf.Timestamp apply_timestamp = 2 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true];
// If most recent recovery plan application failed, Error will contain
// aggregated error messages containing all encountered errors.
string error = 3;
}

// DeferredRecoveryActions contains data for recovery actions that need to be
// performed after node restarts if it applied a recovery plan.
message DeferredRecoveryActions {
// DecommissionedNodeIDs is a set of node IDs that need to be decommissioned
// when a node restarts. Those nodes were marked as decommissioned in the
// local node tombstone storage as a part of plan application, but are pending
// liveness info update.
repeated int32 decommissioned_node_ids = 1 [(gogoproto.customname) = "DecommissionedNodeIDs",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"];
}
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/loqrecovery/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ func PlanReplicas(
clusterInfo loqrecoverypb.ClusterReplicaInfo,
deadStoreIDs []roachpb.StoreID,
deadNodeIDs []roachpb.NodeID,
uuidGen uuid.Generator,
) (loqrecoverypb.ReplicaUpdatePlan, PlanningReport, error) {
planID, err := uuidGen.NewV4()
if err != nil {
return loqrecoverypb.ReplicaUpdatePlan{}, PlanningReport{}, err
}
var replicas []loqrecoverypb.ReplicaInfo
for _, node := range clusterInfo.LocalInfo {
replicas = append(replicas, node.Replicas...)
Expand Down Expand Up @@ -168,7 +173,7 @@ func PlanReplicas(

return loqrecoverypb.ReplicaUpdatePlan{
Updates: updates,
PlanID: uuid.MakeV4(),
PlanID: planID,
DecommissionedNodeIDs: decommissionNodeIDs,
ClusterID: clusterInfo.ClusterID,
}, report, err
Expand Down
Loading

0 comments on commit 9c6b9e9

Please sign in to comment.