-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathapply.go
307 lines (286 loc) · 12.8 KB
/
apply.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package loqrecovery
import (
"context"
"fmt"
"strings"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
// PrepareStoreReport contains information about all prepared changes for the
// stores. Its purpose is to inform user about actions to be performed.
type PrepareStoreReport struct {
// MissingStores contains a set of stores that node on which this update is performed
// is expected to have according to update plan, but are not provided in config.
// While having a missing store is suspicious, it is not necessary a failure and
// user would be asked to confirm if it is a desired situation.
MissingStores []roachpb.StoreID
// Replicas contain update info about all replicas that are planned for update.
UpdatedReplicas []PrepareReplicaReport
// Replicas identified as ones that doesn't need updating (currently because update was
// already done).
SkippedReplicas []PrepareReplicaReport
}
// PrepareReplicaReport contains information about prepared change for a replica.
// Its purpose is to inform user about actions to be performed.
type PrepareReplicaReport struct {
// Replica identification data.
RangeID roachpb.RangeID
StartKey roachpb.RKey
ReplicaID roachpb.ReplicaID
OldReplicaID roachpb.ReplicaID
// AlreadyUpdated is true if state of replica in store already matches desired
// target state. This would happen if the plan is applied more than once which
// is safe because it is idempotent, but we want to notify user of the situation.
AlreadyUpdated bool
// RemovedReplicas is a set of replicas that were removed from range descriptor.
RemovedReplicas roachpb.ReplicaSet
// Fields indicating if descriptor change intent was found and removed as a part
// or recovery preparation.
AbortedTransaction bool
AbortedTransactionID uuid.UUID
}
// PrepareUpdateReplicas prepares all changes to be committed to provided stores
// as a first step of apply stage. This function would write changes to stores
// using provided batches and return a summary of changes that were done together
// with any discrepancies found. The caller could then confirm actions and either
// commit or discard the changes.
func PrepareUpdateReplicas(
ctx context.Context,
plan loqrecoverypb.ReplicaUpdatePlan,
nodeID roachpb.NodeID,
stores map[roachpb.StoreID]*UpdatableStore,
) (PrepareStoreReport, error) {
var report PrepareStoreReport
updateTs := timeutil.Now().UnixNano()
// Make a pre-check for all found stores, so we could confirm action.
// Map contains a set of store names that were found in plan for this node, but were not
// configured in this command invocation.
missing := make(map[roachpb.StoreID]struct{})
for _, update := range plan.Updates {
if nodeID != update.NodeID() {
continue
}
if store, ok := stores[update.StoreID()]; !ok {
missing[update.StoreID()] = struct{}{}
continue
} else {
replicaReport, err := applyReplicaUpdate(ctx, store.Batch(), update)
if err != nil {
return PrepareStoreReport{}, errors.Wrapf(err,
"failed to prepare update replica for range r%v on store s%d", update.RangeID, update.StoreID())
}
if !replicaReport.AlreadyUpdated {
report.UpdatedReplicas = append(report.UpdatedReplicas, replicaReport)
if err := writeReplicaRecoveryStoreRecord(
updateTs, update, replicaReport, store.nextUpdateRecordIndex, store.Batch()); err != nil {
return PrepareStoreReport{}, errors.Wrap(err, "failed writing update evidence records")
}
store.nextUpdateRecordIndex++
} else {
report.SkippedReplicas = append(report.SkippedReplicas, replicaReport)
}
}
}
if len(missing) > 0 {
report.MissingStores = storeListFromSet(missing)
}
return report, nil
}
func applyReplicaUpdate(
ctx context.Context, readWriter storage.ReadWriter, update loqrecoverypb.ReplicaUpdate,
) (PrepareReplicaReport, error) {
clock := hlc.NewClock(hlc.UnixNano, 0)
report := PrepareReplicaReport{
RangeID: update.RangeID,
ReplicaID: update.NewReplica.ReplicaID,
OldReplicaID: update.OldReplicaID,
StartKey: update.StartKey.AsRKey(),
}
// Write the rewritten descriptor to the range-local descriptor
// key. We do not update the meta copies of the descriptor.
// Instead, we leave them in a temporarily inconsistent state and
// they will be overwritten when the cluster recovers and
// up-replicates this range from its single copy to multiple
// copies. We rely on the fact that all range descriptor updates
// start with a CPut on the range-local copy followed by a blind
// Put to the meta copy.
//
// For example, if we have replicas on s1-s4 but s3 and s4 are
// dead, we will rewrite the replica on s2 to have s2 as its only
// member only. When the cluster is restarted (and the dead nodes
// remain dead), the rewritten replica will be the only one able
// to make progress. It will elect itself leader and upreplicate.
//
// The old replica on s1 is untouched by this process. It will
// eventually either be overwritten by a new replica when s2
// upreplicates, or it will be destroyed by the replica GC queue
// after upreplication has happened and s1 is no longer a member.
// (Note that in the latter case, consistency between s1 and s2 no
// longer matters; the consistency checker will only run on nodes
// that the new leader believes are members of the range).
//
// Note that this tool does not guarantee fully consistent
// results; the most recent writes to the raft log may have been
// lost. In the most unfortunate cases, this means that we would
// be "winding back" a split or a merge, which is almost certainly
// to result in irrecoverable corruption (for example, not only
// will individual values stored in the meta ranges diverge, but
// there will be keys not represented by any ranges or vice
// versa).
key := keys.RangeDescriptorKey(update.StartKey.AsRKey())
value, intent, err := storage.MVCCGet(
ctx, readWriter, key, clock.Now(), storage.MVCCGetOptions{Inconsistent: true})
if value == nil {
return PrepareReplicaReport{}, errors.Errorf(
"failed to find a range descriptor for range %v", key)
}
if err != nil {
return PrepareReplicaReport{}, err
}
var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
return PrepareReplicaReport{}, err
}
// Sanity check that this is indeed the right range.
if desc.RangeID != update.RangeID {
return PrepareReplicaReport{}, errors.Errorf(
"unexpected range ID at key: expected r%d but found r%d", update.RangeID, desc.RangeID)
}
// Check if replica is in a fixed state already if we already applied the change.
if len(desc.InternalReplicas) == 1 &&
desc.InternalReplicas[0].ReplicaID == update.NewReplica.ReplicaID &&
desc.NextReplicaID == update.NextReplicaID {
report.AlreadyUpdated = true
return report, nil
}
sl := stateloader.Make(desc.RangeID)
ms, err := sl.LoadMVCCStats(ctx, readWriter)
if err != nil {
return PrepareReplicaReport{}, errors.Wrap(err, "loading MVCCStats")
}
if intent != nil {
// We rely on the property that transactions involving the range
// descriptor always start on the range-local descriptor's key. When there
// is an intent, this means that it is likely that the transaction did not
// commit, so we abort the intent.
//
// However, this is not guaranteed. For one, applying a command is not
// synced to disk, so in theory whichever store becomes the designated
// survivor may temporarily have "forgotten" that the transaction
// committed in its applied state (it would still have the committed log
// entry, as this is durable state, so it would come back once the node
// was running, but we don't see that materialized state in
// unsafe-remove-dead-replicas). This is unlikely to be a problem in
// practice, since we assume that the store was shut down gracefully and
// besides, the write likely had plenty of time to make it to durable
// storage. More troubling is the fact that the designated survivor may
// simply not yet have learned that the transaction committed; it may not
// have been in the quorum and could've been slow to catch up on the log.
// It may not even have the intent; in theory the remaining replica could
// have missed any number of transactions on the range descriptor (even if
// they are in the log, they may not yet be applied, and the replica may
// not yet have learned that they are committed). This is particularly
// troubling when we miss a split, as the right-hand side of the split
// will exist in the meta ranges and could even be able to make progress.
// For yet another thing to worry about, note that the determinism (across
// different nodes) assumed in this tool can easily break down in similar
// ways (not all stores are going to have the same view of what the
// descriptors are), and so multiple replicas of a range may declare
// themselves the designated survivor. Long story short, use of this tool
// with or without the presence of an intent can - in theory - really
// tear the cluster apart.
//
// A solution to this would require a global view, where in a first step
// we collect from each store in the cluster the replicas present and
// compute from that a "recovery plan", i.e. set of replicas that will
// form the recovered keyspace. We may then find that no such recovery
// plan is trivially achievable, due to any of the above problems. But
// in the common case, we do expect one to exist.
report.AbortedTransaction = true
report.AbortedTransactionID = intent.Txn.ID
// A crude form of the intent resolution process: abort the
// transaction by deleting its record.
txnKey := keys.TransactionKey(intent.Txn.Key, intent.Txn.ID)
if err := storage.MVCCDelete(ctx, readWriter, &ms, txnKey, hlc.Timestamp{}, nil); err != nil {
return PrepareReplicaReport{}, err
}
update := roachpb.LockUpdate{
Span: roachpb.Span{Key: intent.Key},
Txn: intent.Txn,
Status: roachpb.ABORTED,
}
if _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update); err != nil {
return PrepareReplicaReport{}, err
}
report.AbortedTransaction = true
report.AbortedTransactionID = intent.Txn.ID
}
newDesc := desc
replicas := []roachpb.ReplicaDescriptor{{
NodeID: update.NewReplica.NodeID,
StoreID: update.NewReplica.StoreID,
ReplicaID: update.NewReplica.ReplicaID,
Type: update.NewReplica.Type,
}}
newDesc.SetReplicas(roachpb.MakeReplicaSet(replicas))
newDesc.NextReplicaID = update.NextReplicaID
// Write back
if err := storage.MVCCPutProto(ctx, readWriter, &ms, key, clock.Now(),
nil /* txn */, &newDesc); err != nil {
return PrepareReplicaReport{}, err
}
report.RemovedReplicas = desc.Replicas()
report.RemovedReplicas.RemoveReplica(update.NewReplica.NodeID, update.NewReplica.StoreID)
// Refresh stats
if err := sl.SetMVCCStats(ctx, readWriter, &ms); err != nil {
return PrepareReplicaReport{}, errors.Wrap(err, "updating MVCCStats")
}
return report, nil
}
// ApplyUpdateReport contains info about recovery changes applied to stores.
type ApplyUpdateReport struct {
// IDs of successfully updated stores.
UpdatedStores []roachpb.StoreID
}
// CommitReplicaChanges saves content storage batches into stores. This is the second step
// of applying recovery plan.
func CommitReplicaChanges(stores map[roachpb.StoreID]*UpdatableStore) (ApplyUpdateReport, error) {
var report ApplyUpdateReport
failed := false
var updateErrors []string
// Commit changes to all stores. Stores could have pending changes if plan contains replicas
// belonging to them, or have no changes if no replicas belong to it or if changes has been
// applied earlier, and we try to reapply the same plan twice.
for id, store := range stores {
committed, err := store.commit()
if err != nil {
// If we fail here, we can only try to run the whole process from scratch as this store is somehow broken.
updateErrors = append(updateErrors, fmt.Sprintf("failed to update store s%d: %v", id, err))
failed = true
} else {
if committed {
report.UpdatedStores = append(report.UpdatedStores, id)
}
}
}
if failed {
return report, errors.Errorf("failed to commit update to one or more stores: %s", strings.Join(updateErrors, "; "))
}
return report, nil
}