-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathreplica_init.go
340 lines (305 loc) · 12.4 KB
/
replica_init.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package storage
import (
"context"
"math/rand"
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/spanlatch"
"github.com/cockroachdb/cockroach/pkg/storage/split"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
)
const (
splitQueueThrottleDuration = 5 * time.Second
mergeQueueThrottleDuration = 5 * time.Second
)
func newReplica(rangeID roachpb.RangeID, store *Store) *Replica {
r := &Replica{
AmbientContext: store.cfg.AmbientCtx,
RangeID: rangeID,
store: store,
abortSpan: abortspan.New(rangeID),
txnWaitQueue: txnwait.NewQueue(store),
}
r.mu.pendingLeaseRequest = makePendingLeaseRequest(r)
r.mu.stateLoader = stateloader.Make(rangeID)
r.mu.quiescent = true
r.mu.zone = store.cfg.DefaultZoneConfig
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
})
if leaseHistoryMaxEntries > 0 {
r.leaseHistory = newLeaseHistory()
}
if store.cfg.StorePool != nil {
r.leaseholderStats = newReplicaStats(store.Clock(), store.cfg.StorePool.getNodeLocalityString)
}
// Pass nil for the localityOracle because we intentionally don't track the
// origin locality of write load.
r.writeStats = newReplicaStats(store.Clock(), nil)
// Init rangeStr with the range ID.
r.rangeStr.store(0, &roachpb.RangeDescriptor{RangeID: rangeID})
// Add replica log tag - the value is rangeStr.String().
r.AmbientContext.AddLogTag("r", &r.rangeStr)
// Add replica pointer value. NB: this was historically useful for debugging
// replica GC issues, but is a distraction at the moment.
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))
r.raftMu.stateLoader = stateloader.Make(rangeID)
r.splitQueueThrottle = util.Every(splitQueueThrottleDuration)
r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration)
return r
}
func (r *Replica) init(
desc *roachpb.RangeDescriptor, clock *hlc.Clock, replicaID roachpb.ReplicaID,
) error {
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
return r.initRaftMuLockedReplicaMuLocked(desc, clock, replicaID)
}
func (r *Replica) initRaftMuLockedReplicaMuLocked(
desc *roachpb.RangeDescriptor, clock *hlc.Clock, replicaID roachpb.ReplicaID,
) error {
ctx := r.AnnotateCtx(context.TODO())
if r.mu.state.Desc != nil && r.isInitializedRLocked() {
log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID)
}
if desc.IsInitialized() && replicaID != 0 {
return errors.Errorf("replicaID must be 0 when creating an initialized replica")
}
r.latchMgr = spanlatch.Make(r.store.stopper, r.store.metrics.SlowLatchRequests)
r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]ReplicaChecksum{}
// Clear the internal raft group in case we're being reset. Since we're
// reloading the raft state below, it isn't safe to use the existing raft
// group.
r.mu.internalRaftGroup = nil
r.mu.proposalBuf.Init((*replicaProposer)(r))
var err error
if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.store.Engine(), desc); err != nil {
return err
}
// Init the minLeaseProposedTS such that we won't use an existing lease (if
// any). This is so that, after a restart, we don't propose under old leases.
// If the replica is being created through a split, this value will be
// overridden.
if !r.store.cfg.TestingKnobs.DontPreventUseOfOldLeaseOnStart {
// Only do this if there was a previous lease. This shouldn't be important
// to do but consider that the first lease which is obtained is back-dated
// to a zero start timestamp (and this de-flakes some tests). If we set the
// min proposed TS here, this lease could not be renewed (by the semantics
// of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits,
// this problem would multiply to a number of replicas at cluster bootstrap.
// Instead, we make the first lease special (which is OK) and the problem
// disappears.
if r.mu.state.Lease.Sequence > 0 {
r.mu.minLeaseProposedTS = clock.Now()
}
}
r.rangeStr.store(0, r.mu.state.Desc)
r.mu.lastIndex, err = r.mu.stateLoader.LoadLastIndex(ctx, r.store.Engine())
if err != nil {
return err
}
r.mu.lastTerm = invalidLastTerm
if replicaID == 0 {
repDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID())
if !ok {
// This is intentionally not an error and is the code path exercised
// during preemptive snapshots. The replica ID will be sent when the
// actual raft replica change occurs.
return nil
}
replicaID = repDesc.ReplicaID
}
r.rangeStr.store(replicaID, r.mu.state.Desc)
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
if r.mu.replicaID == 0 {
if err := r.setReplicaIDRaftMuLockedMuLocked(ctx, replicaID); err != nil {
return err
}
} else if r.mu.replicaID != replicaID {
log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d",
r.mu.replicaID, replicaID)
}
r.assertStateLocked(ctx, r.store.Engine())
return nil
}
func (r *Replica) setReplicaIDRaftMuLockedMuLocked(
ctx context.Context, replicaID roachpb.ReplicaID,
) error {
if r.mu.replicaID != 0 {
log.Fatalf(ctx, "cannot set replica ID from anything other than 0, currently %d",
r.mu.replicaID)
}
if replicaID == 0 {
// If the incoming message does not have a new replica ID it is a
// preemptive snapshot. We'll update minReplicaID if the snapshot is
// accepted.
return nil
}
if replicaID < r.mu.tombstoneReplicaID {
return &roachpb.RaftGroupDeletedError{}
}
if r.mu.replicaID > replicaID {
return errors.Errorf("replicaID cannot move backwards from %d to %d", r.mu.replicaID, replicaID)
}
if r.mu.destroyStatus.RemovingOrRemoved() {
// This replica has been marked for removal and we're trying to resurrect it.
log.Fatalf(ctx, "cannot resurect replica %d", r.mu.replicaID)
}
// Initialize or update the sideloaded storage. If the sideloaded storage
// already exists (which is iff the previous replicaID was non-zero), then
// we have to move the contained files over (this corresponds to the case in
// which our replica is removed and re-added to the range, without having
// the replica GC'ed in the meantime).
//
// Note that we can't race with a concurrent replicaGC here because both that
// and this is under raftMu.
ssBase := r.store.Engine().GetAuxiliaryDir()
rangeID := r.mu.state.Desc.RangeID
if err := moveSideloadedData(r.raftMu.sideloaded, ssBase, rangeID, replicaID); err != nil {
return err
}
var err error
if r.raftMu.sideloaded, err = newDiskSideloadStorage(
r.store.cfg.Settings,
rangeID,
replicaID,
ssBase,
r.store.limiters.BulkIOWriteRate,
r.store.engine,
); err != nil {
return errors.Wrap(err, "while initializing sideloaded storage")
}
r.mu.replicaID = replicaID
// Sanity check that we do not already have a raft group as we did not
// know our replica ID before this call.
if r.mu.internalRaftGroup != nil {
log.Fatalf(ctx, "somehow had an initialized raft group on a zero valued replica")
}
return nil
}
// IsInitialized is true if we know the metadata of this range, either
// because we created it or we have received an initial snapshot from
// another node. It is false when a range has been created in response
// to an incoming message but we are waiting for our initial snapshot.
func (r *Replica) IsInitialized() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.isInitializedRLocked()
}
// isInitializedRLocked is true if we know the metadata of this range, either
// because we created it or we have received an initial snapshot from
// another node. It is false when a range has been created in response
// to an incoming message but we are waiting for our initial snapshot.
// isInitializedLocked requires that the replica lock is held.
func (r *Replica) isInitializedRLocked() bool {
return r.mu.state.Desc.IsInitialized()
}
// maybeInitializeRaftGroup check whether the internal Raft group has
// not yet been initialized. If not, it is created and set to campaign
// if this replica is the most recent owner of the range lease.
func (r *Replica) maybeInitializeRaftGroup(ctx context.Context) {
r.mu.RLock()
// If this replica hasn't initialized the Raft group, create it and
// unquiesce and wake the leader to ensure the replica comes up to date.
initialized := r.mu.internalRaftGroup != nil
// If this replica has been removed or is in the process of being removed
// then it'll never handle any raft events so there's no reason to initialize
// it now.
removed := !r.mu.destroyStatus.IsAlive()
r.mu.RUnlock()
if initialized || removed {
return
}
// Acquire raftMu, but need to maintain lock ordering (raftMu then mu).
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
// If we raced on checking the destroyStatus above that's fine as
// the below withRaftGroupLocked will no-op.
if _, err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
return true, nil
}); err != nil {
log.VErrEventf(ctx, 1, "unable to initialize raft group: %s", err)
}
}
// setDescAfterSplit sets the replica's descriptor and then frees resources
// which may need to discover the split like the txnWaitQueue and rangefeeds.
func (r *Replica) setDescAfterSplit(ctx context.Context, desc *roachpb.RangeDescriptor) {
r.setDesc(ctx, desc)
// Clear the LHS txn wait queue, to redirect to the RHS if
// appropriate. We do this after setDescWithoutProcessUpdate
// to ensure that no pre-split commands are inserted into the
// txnWaitQueue after we clear it.
r.txnWaitQueue.Clear(false /* disable */)
// The rangefeed processor will no longer be provided logical ops for
// its entire range, so it needs to be shut down and all registrations
// need to retry.
// TODO(nvanbenschoten): It should be possible to only reject registrations
// that overlap with the new range of the split and keep registrations that
// are only interested in keys that are still on the original range running.
r.disconnectRangefeedWithReason(
roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
)
// Clear the original range's request stats, since they include requests for
// spans that are now owned by the new range.
r.leaseholderStats.resetRequestCounts()
}
// setDesc atomically sets the replica's descriptor. It requires raftMu to be
// locked.
func (r *Replica) setDesc(ctx context.Context, desc *roachpb.RangeDescriptor) {
r.mu.Lock()
defer r.mu.Unlock()
if desc.RangeID != r.RangeID {
log.Fatalf(ctx, "range descriptor ID (%d) does not match replica's range ID (%d)",
desc.RangeID, r.RangeID)
}
if r.mu.state.Desc != nil && r.mu.state.Desc.IsInitialized() &&
(desc == nil || !desc.IsInitialized()) {
log.Fatalf(ctx, "cannot replace initialized descriptor with uninitialized one: %+v -> %+v",
r.mu.state.Desc, desc)
}
if r.mu.state.Desc != nil && r.mu.state.Desc.IsInitialized() &&
!r.mu.state.Desc.StartKey.Equal(desc.StartKey) {
log.Fatalf(ctx, "attempted to change replica's start key from %s to %s",
r.mu.state.Desc.StartKey, desc.StartKey)
}
// Determine if a new replica was added. This is true if the new max replica
// ID is greater than the old max replica ID.
oldMaxID := maxReplicaIDOfAny(r.mu.state.Desc)
newMaxID := maxReplicaIDOfAny(desc)
if newMaxID > oldMaxID {
r.mu.lastReplicaAdded = newMaxID
r.mu.lastReplicaAddedTime = timeutil.Now()
} else if r.mu.lastReplicaAdded > newMaxID {
// The last replica added was removed.
r.mu.lastReplicaAdded = 0
r.mu.lastReplicaAddedTime = time.Time{}
}
r.rangeStr.store(r.mu.replicaID, desc)
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.mu.state.Desc = desc
}