Skip to content

Commit

Permalink
kvserver/loqrecovery: record and post replica recovery events
Browse files Browse the repository at this point in the history
Previously when replica is rewritten offline to recover from loss
of quorum, no trace of this event was kept in the system.
This is not ideal as it is not possible to understand what happened
unless person performing recovery documented its actions and then
informed person doing investigation.
This diff adds records of such actions. Records are created offline
in store and then propagated to server logs when node is restarted.

Release note: None
  • Loading branch information
aliher1911 committed Jan 5, 2022
1 parent 3c10d6c commit b9cfc1e
Show file tree
Hide file tree
Showing 21 changed files with 538 additions and 58 deletions.
24 changes: 24 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@ e.g. directly access and mutate internal state, breaking system invariants.
Events in this category are logged to the `OPS` channel.


### `debug_recover_replica`

An event of type `debug_recover_replica` is recorded when unsafe loss of quorum recovery is performed.


| Field | Description | Sensitive |
|--|--|--|
| `RangeID` | | no |
| `StoreID` | | no |
| `SurvivorReplicaID` | | no |
| `UpdatedReplicaID` | | no |
| `StartKey` | | yes |
| `EndKey` | | yes |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `NodeID` | The node ID where the event originated. | no |
| `User` | The user which performed the operation. | yes |

### `debug_send_kv_batch`

An event of type `debug_send_kv_batch` is recorded when an arbitrary KV BatchRequest is submitted
Expand Down
14 changes: 9 additions & 5 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -411,16 +413,16 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
var localNodeID roachpb.NodeID
batches := make(map[roachpb.StoreID]storage.Batch)
for _, storeSpec := range debugRecoverExecuteOpts.Stores.Specs {
db, err := OpenExistingStore(storeSpec.Path, stopper, false /* readOnly */)
store, err := OpenExistingStore(storeSpec.Path, stopper, false /* readOnly */)
if err != nil {
return errors.Wrapf(err, "failed to open store at path %q. ensure that store path is "+
"correct and that it is not used by another process", storeSpec.Path)
}
batch := db.NewBatch()
defer db.Close()
batch := store.NewBatch()
defer store.Close()
defer batch.Close()

storeIdent, err := kvserver.ReadStoreIdent(cmd.Context(), db)
storeIdent, err := kvserver.ReadStoreIdent(cmd.Context(), store)
if err != nil {
return err
}
Expand All @@ -434,8 +436,10 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
batches[storeIdent.StoreID] = batch
}

updateTime := timeutil.Now()
updateUUID, _ := uuid.NewV4()
prepReport, err := loqrecovery.PrepareUpdateReplicas(
cmd.Context(), nodeUpdates, localNodeID, batches)
cmd.Context(), nodeUpdates, updateUUID, updateTime, localNodeID, batches)
if err != nil {
return err
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,25 @@ var (
// localStoreIdentSuffix stores an immutable identifier for this
// store, created when the store is first bootstrapped.
localStoreIdentSuffix = []byte("iden")
// LocalStoreUnsafeReplicaRecoverySuffix is a suffix for temporary record entries put
// when loss of quorum recovery operations are performed offline on the store.
// See StoreReplicaUnsafeRecoveryKey for details.
localStoreUnsafeReplicaRecoverySuffix = []byte("loqr")
// LocalStoreUnsafeReplicaRecoveryKeyMin is the start of keyspace used to store
// loss of quorum recovery record entries.
LocalStoreUnsafeReplicaRecoveryKeyMin = MakeStoreKey(localStoreUnsafeReplicaRecoverySuffix, nil)
// LocalStoreUnsafeReplicaRecoveryKeyMax is the end of keyspace used to store
// loss of quorum recovery record entries.
LocalStoreUnsafeReplicaRecoveryKeyMax = LocalStoreUnsafeReplicaRecoveryKeyMin.PrefixEnd()
// localStoreNodeTombstoneSuffix stores key value pairs that map
// nodeIDs to time of removal from cluster.
localStoreNodeTombstoneSuffix = []byte("ntmb")
// localStoreCachedSettingsSuffix stores the cached settings for node.
localStoreCachedSettingsSuffix = []byte("stng")
// LocalStoreCachedSettingsKeyMin is the start of span of possible cached settings keys.
LocalStoreCachedSettingsKeyMin = MakeStoreKey(localStoreCachedSettingsSuffix, nil)
// LocalStoreCachedSettingsKeyMax is the end of span of possible cached settings keys.
LocalStoreCachedSettingsKeyMax = LocalStoreCachedSettingsKeyMin.PrefixEnd()
// localStoreLastUpSuffix stores the last timestamp that a store's node
// acknowledged that it was still running. This value will be regularly
// refreshed on all stores for a running node; the intention of this value
Expand All @@ -172,12 +188,6 @@ var (
// localRemovedLeakedRaftEntriesSuffix is DEPRECATED and remains to prevent
// reuse.
localRemovedLeakedRaftEntriesSuffix = []byte("dlre")
// localStoreCachedSettingsSuffix stores the cached settings for node.
localStoreCachedSettingsSuffix = []byte("stng")
// LocalStoreCachedSettingsKeyMin is the start of span of possible cached settings keys.
LocalStoreCachedSettingsKeyMin = MakeStoreKey(localStoreCachedSettingsSuffix, nil)
// LocalStoreCachedSettingsKeyMax is the end of span of possible cached settings keys.
LocalStoreCachedSettingsKeyMax = LocalStoreCachedSettingsKeyMin.PrefixEnd()

// 5. Lock table keys
//
Expand Down
15 changes: 8 additions & 7 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,14 @@ var _ = [...]interface{}{
// 4. Store local keys: These contain metadata about an individual store.
// They are unreplicated and unaddressable. The typical example is the
// store 'ident' record. They all share `localStorePrefix`.
StoreClusterVersionKey, // "cver"
StoreGossipKey, // "goss"
StoreHLCUpperBoundKey, // "hlcu"
StoreIdentKey, // "iden"
StoreNodeTombstoneKey, // "ntmb"
StoreLastUpKey, // "uptm"
StoreCachedSettingsKey, // "stng"
StoreClusterVersionKey, // "cver"
StoreGossipKey, // "goss"
StoreHLCUpperBoundKey, // "hlcu"
StoreIdentKey, // "iden"
StoreReplicaUnsafeRecoveryKey, // "loqr"
StoreNodeTombstoneKey, // "ntmb"
StoreCachedSettingsKey, // "stng"
StoreLastUpKey, // "uptm"

// 5. Range lock keys for all replicated locks. All range locks share
// LocalRangeLockTablePrefix. Locks can be acquired on global keys and on
Expand Down
13 changes: 13 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ func DecodeStoreCachedSettingsKey(key roachpb.Key) (settingKey roachpb.Key, err
return
}

// StoreReplicaUnsafeRecoveryKey creates a key for loss of quorum replica recovery entry.
// Those entries are written by `debug recover apply-plan` command on the store while
// node is stopped. Once node boots up, entries are translated into structured log events
// to leave audit trail of recovery operation.
// The key uses a combination of UUID passed as detailPrefix and a sequenceNumber to ensure
// there are no key clashes when creating records. Each run of apply will generate UUID and
// write records with sequential keys under this UUID.
func StoreReplicaUnsafeRecoveryKey(detailPrefix roachpb.Key, sequenceNumber uint64) roachpb.Key {
key := append([]byte(nil), detailPrefix...)
seq := encoding.EncodeUint64Ascending(key, sequenceNumber)
return MakeStoreKey(localStoreUnsafeReplicaRecoverySuffix, seq)
}

// NodeLivenessKey returns the key for the node liveness record.
func NodeLivenessKey(nodeID roachpb.NodeID) roachpb.Key {
key := make(roachpb.Key, 0, len(NodeLivenessPrefix)+9)
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/loqrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"apply.go",
"collect.go",
"plan.go",
"record.go",
"utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery",
Expand All @@ -19,6 +20,7 @@ go_library(
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
Expand All @@ -44,6 +46,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/keysutil",
"//pkg/util/leaktest",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
99 changes: 65 additions & 34 deletions pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
Expand Down Expand Up @@ -63,38 +64,62 @@ type PrepareReplicaReport struct {
AbortedTransactionID uuid.UUID
}

// replicaPrepareBatch is a wrapper around ReadWriter that holds additional
// information to write replicas with as any additional audit information.
type replicaPrepareBatch struct {
readWriter storage.ReadWriter
nextUpdateRecordIndex uint64
}

// PrepareUpdateReplicas prepares all changes to be committed to provided stores
// as a first step of apply stage. This function would write changes to stores
// using provided batches and return a summary of changes that were done together
// with any discrepancies found. The caller could then confirm actions and either
// commit or discard the changes.
// Changes also include update records in the store local keys that are consumed
// on the first start of node. See keys.StoreReplicaUnsafeRecoveryKey for details.
func PrepareUpdateReplicas(
ctx context.Context,
plan loqrecoverypb.ReplicaUpdatePlan,
uuid uuid.UUID,
updateTime time.Time,
nodeID roachpb.NodeID,
batches map[roachpb.StoreID]storage.Batch,
) (PrepareStoreReport, error) {
var report PrepareStoreReport
// TODO(oleg): #73281 Track attempts to apply changes and also fill into audit log
// Make a pre-check for all found stores, so we could confirm action.

updaters := make(map[roachpb.StoreID]*replicaPrepareBatch)
for storeID, batch := range batches {
updaters[storeID] = &replicaPrepareBatch{
readWriter: batch,
nextUpdateRecordIndex: 1,
}
}

// Map contains a set of store names that were found in plan for this node, but were not
// configured in this command invocation.
missing := make(map[roachpb.StoreID]struct{})
for _, update := range plan.Updates {
if nodeID != update.NodeID() {
continue
}
if batch, ok := batches[update.StoreID()]; !ok {
if updater, ok := updaters[update.StoreID()]; !ok {
missing[update.StoreID()] = struct{}{}
continue
} else {
replicaReport, err := applyReplicaUpdate(ctx, batch, update)
replicaReport, err := applyReplicaUpdate(ctx, updater.readWriter, update)
if err != nil {
return PrepareStoreReport{}, errors.Wrapf(err,
"failed to prepare update replica for range r%v on store s%d", update.RangeID, update.StoreID())
return PrepareStoreReport{}, errors.Wrapf(
err,
"failed to prepare update replica for range r%v on store s%d", update.RangeID,
update.StoreID())
}
if !replicaReport.AlreadyUpdated {
report.UpdatedReplicas = append(report.UpdatedReplicas, replicaReport)
if err := writeReplicaRecoveryStoreRecord(
uuid, updateTime.UnixNano(), update, replicaReport, updater); err != nil {
return PrepareStoreReport{}, errors.Wrap(err, "failed writing update evidence records")
}
} else {
report.SkippedReplicas = append(report.SkippedReplicas, replicaReport)
}
Expand Down Expand Up @@ -158,24 +183,24 @@ func applyReplicaUpdate(
if err != nil {
return PrepareReplicaReport{}, err
}
var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
var localDesc roachpb.RangeDescriptor
if err := value.GetProto(&localDesc); err != nil {
return PrepareReplicaReport{}, err
}
// Sanity check that this is indeed the right range.
if desc.RangeID != update.RangeID {
if localDesc.RangeID != update.RangeID {
return PrepareReplicaReport{}, errors.Errorf(
"unexpected range ID at key: expected r%d but found r%d", update.RangeID, desc.RangeID)
"unexpected range ID at key: expected r%d but found r%d", update.RangeID, localDesc.RangeID)
}
// Check if replica is in a fixed state already if we already applied the change.
if len(desc.InternalReplicas) == 1 &&
desc.InternalReplicas[0].ReplicaID == update.NewReplica.ReplicaID &&
desc.NextReplicaID == update.NextReplicaID {
if len(localDesc.InternalReplicas) == 1 &&
localDesc.InternalReplicas[0].ReplicaID == update.NewReplica.ReplicaID &&
localDesc.NextReplicaID == update.NextReplicaID {
report.AlreadyUpdated = true
return report, nil
}

sl := stateloader.Make(desc.RangeID)
sl := stateloader.Make(localDesc.RangeID)
ms, err := sl.LoadMVCCStats(ctx, readWriter)
if err != nil {
return PrepareReplicaReport{}, errors.Wrap(err, "loading MVCCStats")
Expand Down Expand Up @@ -239,23 +264,26 @@ func applyReplicaUpdate(
report.AbortedTransaction = true
report.AbortedTransactionID = intent.Txn.ID
}
newDesc := desc
replicas := []roachpb.ReplicaDescriptor{{
NodeID: update.NewReplica.NodeID,
StoreID: update.NewReplica.StoreID,
ReplicaID: update.NewReplica.ReplicaID,
Type: update.NewReplica.Type,
}}
newDesc := localDesc
replicas := []roachpb.ReplicaDescriptor{
{
NodeID: update.NewReplica.NodeID,
StoreID: update.NewReplica.StoreID,
ReplicaID: update.NewReplica.ReplicaID,
Type: update.NewReplica.Type,
},
}
newDesc.SetReplicas(roachpb.MakeReplicaSet(replicas))
newDesc.NextReplicaID = update.NextReplicaID

if err := storage.MVCCPutProto(ctx, readWriter, &ms, key, clock.Now(),
if err := storage.MVCCPutProto(
ctx, readWriter, &ms, key, clock.Now(),
nil /* txn */, &newDesc); err != nil {
return PrepareReplicaReport{}, err
}
report.OldReplica = replicas[0]
report.RemovedReplicas = desc.Replicas()
report.RemovedReplicas.RemoveReplica(update.NewReplica.NodeID, update.NewReplica.StoreID)
report.RemovedReplicas = localDesc.Replicas()
report.OldReplica, _ = report.RemovedReplicas.RemoveReplica(
update.NewReplica.NodeID, update.NewReplica.StoreID)

// Refresh stats
if err := sl.SetMVCCStats(ctx, readWriter, &ms); err != nil {
Expand All @@ -281,18 +309,21 @@ func CommitReplicaChanges(batches map[roachpb.StoreID]storage.Batch) (ApplyUpdat
// belonging to them, or have no changes if no replicas belong to it or if changes has been
// applied earlier, and we try to reapply the same plan twice.
for id, batch := range batches {
if !batch.Empty() {
if err := batch.Commit(true); err != nil {
// If we fail here, we can only try to run the whole process from scratch as this store is somehow broken.
updateErrors = append(updateErrors, fmt.Sprintf("failed to update store s%d: %v", id, err))
failed = true
} else {
report.UpdatedStores = append(report.UpdatedStores, id)
}
if batch.Empty() {
continue
}
if err := batch.Commit(true); err != nil {
// If we fail here, we can only try to run the whole process from scratch as this store is
// somehow broken.
updateErrors = append(updateErrors, fmt.Sprintf("failed to update store s%d: %v", id, err))
failed = true
} else {
report.UpdatedStores = append(report.UpdatedStores, id)
}
}
if failed {
return report, errors.Errorf("failed to commit update to one or more stores: %s", strings.Join(updateErrors, "; "))
return report, errors.Errorf(
"failed to commit update to one or more stores: %s", strings.Join(updateErrors, "; "))
}
return report, nil
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/util/keysutil",
"//pkg/util/log/eventpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
],
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package loqrecoverypb
import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/keysutil"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
)
Expand Down Expand Up @@ -72,3 +73,21 @@ func (m *ReplicaInfo) Replica() (roachpb.ReplicaDescriptor, error) {
"invalid replica info: its own store s%d is not present in descriptor replicas %s",
m.StoreID, m.Desc)
}

// AsStructuredLog creates a structured log entry from the record.
func (m *ReplicaRecoveryRecord) AsStructuredLog() eventpb.DebugRecoverReplica {
return eventpb.DebugRecoverReplica{
CommonEventDetails: eventpb.CommonEventDetails{
Timestamp: m.Timestamp,
},
CommonDebugEventDetails: eventpb.CommonDebugEventDetails{
NodeID: int32(m.NewReplica.NodeID),
},
RangeID: int64(m.RangeID),
StoreID: int64(m.NewReplica.StoreID),
SurvivorReplicaID: int32(m.OldReplicaID),
UpdatedReplicaID: int32(m.NewReplica.ReplicaID),
StartKey: m.StartKey.AsRKey().String(),
EndKey: m.EndKey.AsRKey().String(),
}
}
Loading

0 comments on commit b9cfc1e

Please sign in to comment.