-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
replica_init.go
365 lines (334 loc) · 14.3 KB
/
replica_init.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"bytes"
"context"
"math/rand"
"time"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3"
)
const (
splitQueueThrottleDuration = 5 * time.Second
mergeQueueThrottleDuration = 5 * time.Second
)
// newReplica constructs a new Replica. If the desc is initialized, the store
// must be present in it and the corresponding replica descriptor must have
// replicaID as its ReplicaID.
func newReplica(
ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID,
) (*Replica, error) {
repl := newUnloadedReplica(ctx, desc, store, replicaID)
repl.raftMu.Lock()
defer repl.raftMu.Unlock()
repl.mu.Lock()
defer repl.mu.Unlock()
if err := repl.loadRaftMuLockedReplicaMuLocked(desc); err != nil {
return nil, err
}
return repl, nil
}
// newUnloadedReplica partially constructs a replica. The primary reason this
// function exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is
// to avoid attempting to fully constructing a Replica prior to proving that it
// can exist during the delicate synchronization dance that occurs in
// Store.tryGetOrCreateReplica(). A Replica returned from this function must not
// be used in any way until it's load() method has been called.
func newUnloadedReplica(
ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID,
) *Replica {
if replicaID == 0 {
log.Fatalf(context.TODO(), "cannot construct a replica for range %d with a 0 replica ID", desc.RangeID)
}
r := &Replica{
AmbientContext: store.cfg.AmbientCtx,
RangeID: desc.RangeID,
store: store,
abortSpan: abortspan.New(desc.RangeID),
concMgr: concurrency.NewManager(concurrency.Config{
NodeDesc: store.nodeDesc,
RangeDesc: desc,
Settings: store.ClusterSettings(),
DB: store.DB(),
Clock: store.Clock(),
Stopper: store.Stopper(),
IntentResolver: store.intentResolver,
TxnWaitMetrics: store.txnWaitMetrics,
SlowLatchGauge: store.metrics.SlowLatchRequests,
DisableTxnPushing: store.TestingKnobs().DontPushOnWriteIntentError,
TxnWaitKnobs: store.TestingKnobs().TxnWaitKnobs,
}),
}
r.mu.pendingLeaseRequest = makePendingLeaseRequest(r)
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
r.mu.zone = store.cfg.DefaultZoneConfig
r.mu.replicaID = replicaID
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
})
r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]ReplicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader
r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps
if leaseHistoryMaxEntries > 0 {
r.leaseHistory = newLeaseHistory()
}
if store.cfg.StorePool != nil {
r.leaseholderStats = newReplicaStats(store.Clock(), store.cfg.StorePool.getNodeLocalityString)
}
// Pass nil for the localityOracle because we intentionally don't track the
// origin locality of write load.
r.writeStats = newReplicaStats(store.Clock(), nil)
// Init rangeStr with the range ID.
r.rangeStr.store(replicaID, &roachpb.RangeDescriptor{RangeID: desc.RangeID})
// Add replica log tag - the value is rangeStr.String().
r.AmbientContext.AddLogTag("r", &r.rangeStr)
// Add replica pointer value. NB: this was historically useful for debugging
// replica GC issues, but is a distraction at the moment.
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))
r.raftMu.stateLoader = stateloader.Make(desc.RangeID)
r.splitQueueThrottle = util.Every(splitQueueThrottleDuration)
r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration)
return r
}
// setStartKeyLocked sets r.startKey. Note that this field has special semantics
// described on its comment. Callers to this method are initializing an
// uninitialized Replica and hold Replica.mu.
func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) {
r.mu.AssertHeld()
if r.startKey != nil {
log.Fatalf(
r.AnnotateCtx(context.Background()),
"start key written twice: was %s, now %s", r.startKey, startKey,
)
}
r.startKey = startKey
}
// loadRaftMuLockedReplicaMuLocked will load the state of the replica from disk.
// If desc is initialized, the Replica will be initialized when this method
// returns. An initialized Replica may not be reloaded. If this method is called
// with an uninitialized desc it may be called again later with an initialized
// desc.
//
// This method is called in three places:
//
// 1) newReplica - used when the store is initializing and during testing
// 2) tryGetOrCreateReplica - see newUnloadedReplica
// 3) splitPostApply - this call initializes a previously uninitialized Replica.
//
func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) error {
ctx := r.AnnotateCtx(context.TODO())
if r.mu.state.Desc != nil && r.isInitializedRLocked() {
log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID)
} else if r.mu.replicaID == 0 {
// NB: This is just a defensive check as r.mu.replicaID should never be 0.
log.Fatalf(ctx, "r%d: cannot initialize replica without a replicaID", desc.RangeID)
}
if desc.IsInitialized() {
r.setStartKeyLocked(desc.StartKey)
}
// Clear the internal raft group in case we're being reset. Since we're
// reloading the raft state below, it isn't safe to use the existing raft
// group.
r.mu.internalRaftGroup = nil
var err error
if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.Engine(), desc); err != nil {
return err
}
r.mu.lastIndex, err = r.mu.stateLoader.LoadLastIndex(ctx, r.Engine())
if err != nil {
return err
}
r.mu.lastTerm = invalidLastTerm
// Ensure that we're not trying to load a replica with a different ID than
// was used to construct this Replica.
replicaID := r.mu.replicaID
if replicaDesc, found := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()); found {
replicaID = replicaDesc.ReplicaID
} else if desc.IsInitialized() {
log.Fatalf(ctx, "r%d: cannot initialize replica which is not in descriptor %v", desc.RangeID, desc)
}
if r.mu.replicaID != replicaID {
log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d",
r.mu.replicaID, replicaID)
}
r.setDescLockedRaftMuLocked(ctx, desc)
// Only do this if there was a previous lease. This shouldn't be important
// to do but consider that the first lease which is obtained is back-dated
// to a zero start timestamp (and this de-flakes some tests). If we set the
// min proposed TS here, this lease could not be renewed (by the semantics
// of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits,
// this problem would multiply to a number of replicas at cluster bootstrap.
// Instead, we make the first lease special (which is OK) and the problem
// disappears.
if r.mu.state.Lease.Sequence > 0 {
r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp()
}
ssBase := r.Engine().GetAuxiliaryDir()
if r.raftMu.sideloaded, err = newDiskSideloadStorage(
r.store.cfg.Settings,
desc.RangeID,
replicaID,
ssBase,
r.store.limiters.BulkIOWriteRate,
r.store.engine,
); err != nil {
return errors.Wrap(err, "while initializing sideloaded storage")
}
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())
return nil
}
// IsInitialized is true if we know the metadata of this replica's range, either
// because we created it or we have received an initial snapshot from another
// node. It is false when a replica has been created in response to an incoming
// message but we are waiting for our initial snapshot.
func (r *Replica) IsInitialized() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.isInitializedRLocked()
}
// TenantID returns the associated tenant ID and a boolean to indicate that it
// is valid. It will be invalid only if the replica is not initialized.
func (r *Replica) TenantID() (roachpb.TenantID, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.getTenantIDRLocked()
}
func (r *Replica) getTenantIDRLocked() (roachpb.TenantID, bool) {
return r.mu.tenantID, r.mu.tenantID != (roachpb.TenantID{})
}
// isInitializedRLocked is true if we know the metadata of this range, either
// because we created it or we have received an initial snapshot from
// another node. It is false when a range has been created in response
// to an incoming message but we are waiting for our initial snapshot.
// isInitializedLocked requires that the replica lock is held.
func (r *Replica) isInitializedRLocked() bool {
return r.mu.state.Desc.IsInitialized()
}
// maybeInitializeRaftGroup check whether the internal Raft group has
// not yet been initialized. If not, it is created and set to campaign
// if this replica is the most recent owner of the range lease.
func (r *Replica) maybeInitializeRaftGroup(ctx context.Context) {
r.mu.RLock()
// If this replica hasn't initialized the Raft group, create it and
// unquiesce and wake the leader to ensure the replica comes up to date.
initialized := r.mu.internalRaftGroup != nil
// If this replica has been removed or is in the process of being removed
// then it'll never handle any raft events so there's no reason to initialize
// it now.
removed := !r.mu.destroyStatus.IsAlive()
r.mu.RUnlock()
if initialized || removed {
return
}
// Acquire raftMu, but need to maintain lock ordering (raftMu then mu).
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
// If we raced on checking the destroyStatus above that's fine as
// the below withRaftGroupLocked will no-op.
if err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
return true, nil
}); err != nil && !errors.Is(err, errRemoved) {
log.VErrEventf(ctx, 1, "unable to initialize raft group: %s", err)
}
}
// setDescRaftMuLocked atomically sets the replica's descriptor. It requires raftMu to be
// locked.
func (r *Replica) setDescRaftMuLocked(ctx context.Context, desc *roachpb.RangeDescriptor) {
r.mu.Lock()
defer r.mu.Unlock()
r.setDescLockedRaftMuLocked(ctx, desc)
}
func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.RangeDescriptor) {
if desc.RangeID != r.RangeID {
log.Fatalf(ctx, "range descriptor ID (%d) does not match replica's range ID (%d)",
desc.RangeID, r.RangeID)
}
if r.mu.state.Desc.IsInitialized() &&
(desc == nil || !desc.IsInitialized()) {
log.Fatalf(ctx, "cannot replace initialized descriptor with uninitialized one: %+v -> %+v",
r.mu.state.Desc, desc)
}
if r.mu.state.Desc.IsInitialized() &&
!r.mu.state.Desc.StartKey.Equal(desc.StartKey) {
log.Fatalf(ctx, "attempted to change replica's start key from %s to %s",
r.mu.state.Desc.StartKey, desc.StartKey)
}
// NB: It might be nice to assert that the current replica exists in desc
// however we allow it to not be present for three reasons:
//
// 1) When removing the current replica we update the descriptor to the point
// of removal even though we will delete the Replica's data in the same
// batch. We could avoid setting the local descriptor in this case.
// 2) When the DisableEagerReplicaRemoval testing knob is enabled. We
// could remove all tests which utilize this behavior now that there's
// no other mechanism for range state which does not contain the current
// store to exist on disk.
// 3) Various unit tests do not provide a valid descriptor.
replDesc, found := desc.GetReplicaDescriptor(r.StoreID())
if found && replDesc.ReplicaID != r.mu.replicaID {
log.Fatalf(ctx, "attempted to change replica's ID from %d to %d",
r.mu.replicaID, replDesc.ReplicaID)
}
// Initialize the tenant. The must be the first time that the descriptor has
// been initialized. Note that the desc.StartKey never changes throughout the
// life of a range.
if desc.IsInitialized() && r.mu.tenantID == (roachpb.TenantID{}) {
_, tenantID, err := keys.DecodeTenantPrefix(desc.StartKey.AsRawKey())
if err != nil {
log.Fatalf(ctx, "failed to decode tenant prefix from key for "+
"replica %v: %v", r, err)
}
r.mu.tenantID = tenantID
r.store.metrics.acquireTenant(tenantID)
if tenantID != roachpb.SystemTenantID {
r.tenantLimiter = r.store.tenantRateLimiters.GetTenant(tenantID, r.store.stopper.ShouldQuiesce())
}
}
// Determine if a new replica was added. This is true if the new max replica
// ID is greater than the old max replica ID.
oldMaxID := maxReplicaIDOfAny(r.mu.state.Desc)
newMaxID := maxReplicaIDOfAny(desc)
if newMaxID > oldMaxID {
r.mu.lastReplicaAdded = newMaxID
r.mu.lastReplicaAddedTime = timeutil.Now()
} else if r.mu.lastReplicaAdded > newMaxID {
// The last replica added was removed.
r.mu.lastReplicaAdded = 0
r.mu.lastReplicaAddedTime = time.Time{}
}
r.rangeStr.store(r.mu.replicaID, desc)
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.concMgr.OnRangeDescUpdated(desc)
r.mu.state.Desc = desc
// Prioritize the NodeLiveness Range in the Raft scheduler above all other
// Ranges to ensure that liveness never sees high Raft scheduler latency.
if bytes.HasPrefix(desc.StartKey, keys.NodeLivenessPrefix) {
r.store.scheduler.SetPriorityID(desc.RangeID)
}
}