-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathrecord.go
131 lines (118 loc) · 4.58 KB
/
record.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package loqrecovery
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)
// writeReplicaRecoveryStoreRecord adds a replica recovery record to store local
// part of key range. This entry is subsequently used on node startup to
// log the data and preserve this information for subsequent debugging as
// needed.
// Record keys have an index suffix. Every recovery run will find first unused
// slot and write records in keys with sequential index.
// See RegisterOfflineRecoveryEvents for details on where these records
// are read and deleted.
func writeReplicaRecoveryStoreRecord(
timestamp int64,
update loqrecoverypb.ReplicaUpdate,
report PrepareReplicaReport,
index uint64,
writer storage.Writer,
) error {
record := loqrecoverypb.ReplicaRecoveryRecord{
Timestamp: timestamp,
RangeID: report.RangeID,
StartKey: update.StartKey,
EndKey: update.StartKey,
OldReplicaID: report.OldReplicaID,
NewReplica: update.NewReplica,
}
data, err := protoutil.Marshal(&record)
if err != nil {
return errors.Wrap(err, "failed to marshal update record entry")
}
fmt.Printf("Writing replica update record to store.")
return writer.PutUnversioned(keys.StoreReplicaUnsafeRecoveryKey(index), data)
}
// RegisterOfflineRecoveryEvents checks if recovery data was captured in the store and writes
// appropriate structured entries to the log. Temporary storage local keys for those event
// records are removed once they are processed.
// This function is called on startup to ensure that any offline replica recovery actions
// are properly reflected in server logs as needed.
// Any new destinations for this info should be added to this function.
//
// TODO(oleg): #73679 add events to the rangelog. That would require registering events
// at the later stage of startup when SQL is already available.
func RegisterOfflineRecoveryEvents(
ctx context.Context,
readWriter storage.ReadWriter,
logF func(context.Context, eventpb.DebugRecoverReplica),
) error {
eventCount := 0
iter := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
LowerBound: keys.LocalStoreUnsafeReplicaRecoveryKeyMin,
UpperBound: keys.LocalStoreUnsafeReplicaRecoveryKeyMax,
})
defer iter.Close()
storeID := roachpb.StoreID(0)
iter.SeekGE(storage.MVCCKey{Key: keys.LocalStoreUnsafeReplicaRecoveryKeyMin})
valid, err := iter.Valid()
for ; valid && err == nil; valid, err = iter.Valid() {
key := iter.Key()
recoverEntryIndex, _ := keys.DecodeStoreReplicaUnsafeRecoverKey(iter.UnsafeRawKey())
record := loqrecoverypb.ReplicaRecoveryRecord{}
if err = iter.ValueProto(&record); err != nil {
return errors.Wrapf(err, "failed to deserialize replica recovery event at index %d", recoverEntryIndex)
}
logF(ctx, record.AsStructuredLog())
if err := readWriter.ClearUnversioned(key.Key); err != nil {
return errors.Wrapf(err, "failed to delete replica recovery record at index %d", recoverEntryIndex)
}
storeID = record.NewReplica.StoreID
eventCount++
iter.Next()
}
if eventCount > 0 {
log.Infof(ctx, "Registered %d loss of quorum replica recovery events for s%d", eventCount, storeID)
}
return err
}
// findFirstAvailableRecoveryEventIndex finds first unused index of replica recovery events
// store local key range.
func findFirstAvailableRecoveryEventIndex(reader storage.Reader) (uint64, error) {
iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
Prefix: true,
LowerBound: keys.LocalStoreUnsafeReplicaRecoveryKeyMin,
UpperBound: keys.LocalStoreUnsafeReplicaRecoveryKeyMax,
})
defer iter.Close()
iter.SeekLT(storage.MVCCKey{Key: keys.LocalStoreUnsafeReplicaRecoveryKeyMax})
ok, err := iter.Valid()
if err != nil {
return 0, err
}
if !ok {
return 0, nil
}
var index uint64
if index, err = keys.DecodeStoreReplicaUnsafeRecoverKey(iter.Key().Key); err != nil {
return 0, err
}
return index + 1, nil
}