-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
replica_raft.go
2776 lines (2594 loc) · 114 KB
/
replica_raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"context"
"math/rand"
"sort"
"strings"
"time"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
"go.etcd.io/raft/v3/tracker"
)
var (
// raftLogTruncationClearRangeThreshold is the number of entries at which Raft
// log truncation uses a Pebble range tombstone rather than point deletes. It
// is set high enough to avoid writing too many range tombstones to Pebble,
// but low enough that we don't do too many point deletes either (in
// particular, we don't want to overflow the Pebble write batch).
//
// In the steady state, Raft log truncation occurs when RaftLogQueueStaleSize
// (64 KB) or RaftLogQueueStaleThreshold (100 entries) is exceeded, so
// truncations are generally small. If followers are lagging, we let the log
// grow to RaftLogTruncationThreshold (16 MB) before truncating.
//
// 100k was chosen because it is unlikely to be hit in most common cases,
// keeping the number of range tombstones low, but will trigger when Raft logs
// have grown abnormally large. RaftLogTruncationThreshold will typically not
// trigger it, unless the average log entry is <= 160 bytes. The key size is
// ~16 bytes, so Pebble point deletion batches will be bounded at ~1.6MB.
raftLogTruncationClearRangeThreshold = kvpb.RaftIndex(util.ConstantWithMetamorphicTestRange(
"raft-log-truncation-clearrange-threshold", 100000 /* default */, 1 /* min */, 1e6 /* max */))
// raftDisableLeaderFollowsLeaseholder disables lease/leader colocation.
raftDisableLeaderFollowsLeaseholder = envutil.EnvOrDefaultBool(
"COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER", false)
)
// evalAndPropose prepares the necessary pending command struct and initializes
// a client command ID if one hasn't been. A verified lease is supplied as a
// parameter if the command requires a lease; nil otherwise. It then evaluates
// the command and proposes it to Raft on success.
//
// The method accepts a concurrency guard, which it assumes responsibility for
// if it succeeds in proposing a command into Raft. If the method does not
// return an error, the guard is guaranteed to be eventually freed and the
// caller should relinquish all ownership of it. If it does return an error, the
// caller retains full ownership over the guard.
//
// evalAndPropose takes ownership of the supplied token; the caller should
// tok.Move() it into this method. It will be used to untrack the request once
// it comes out of the proposal buffer.
//
// Nothing here or below can take out a raftMu lock, since executeWriteBatch()
// is already holding readOnlyCmdMu when calling this. Locking raftMu after it
// would violate the locking order specified for Store.mu.
//
// Return values:
// - a channel which receives a response or error upon application
// - a closure used to attempt to abandon the command. When called, it unbinds
// the command's context from its Raft proposal. The client is then free to
// terminate execution, although it is given no guarantee that the proposal
// won't still go on to commit and apply at some later time.
// - the proposal's ID.
// - any error obtained during the creation or proposal of the command, in
// which case the other returned values are zero.
func (r *Replica) evalAndPropose(
ctx context.Context,
ba *kvpb.BatchRequest,
g *concurrency.Guard,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
tok TrackedRequestToken,
) (
chan proposalResult,
func(),
kvserverbase.CmdIDKey,
*kvadmission.StoreWriteBytes,
*kvpb.Error,
) {
defer tok.DoneIfNotMoved(ctx)
idKey := raftlog.MakeCmdIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui)
log.Event(proposal.ctx, "evaluated request")
// If the request hit a server-side concurrency retry error, immediately
// propagate the error. Don't assume ownership of the concurrency guard.
if isConcurrencyRetryError(pErr) {
pErr = maybeAttachLease(pErr, &st.Lease)
return nil, nil, "", nil, pErr
} else if _, ok := pErr.GetDetail().(*kvpb.ReplicaCorruptionError); ok {
return nil, nil, "", nil, pErr
}
// Pull out proposal channel to return. proposal.doneCh may be set to
// nil if it is signaled in this function.
proposalCh := proposal.doneCh
// There are two cases where request evaluation does not lead to a Raft
// proposal:
// 1. proposal.command == nil indicates that the evaluation was a no-op
// and that no Raft command needs to be proposed.
// 2. pErr != nil corresponds to a failed proposal - the command resulted
// in an error.
if proposal.command == nil {
if proposal.Local.RequiresRaft() {
return nil, nil, "", nil, kvpb.NewError(errors.AssertionFailedf(
"proposal resulting from batch %s erroneously bypassed Raft", ba))
}
intents := proposal.Local.DetachEncounteredIntents()
endTxns := proposal.Local.DetachEndTxns(pErr != nil /* alwaysOnly */)
r.handleReadWriteLocalEvalResult(ctx, *proposal.Local)
// NB: it is intentional that this returns both an error and results.
// Some actions should also be taken if the command itself fails. For
// example, discovered intents should be pushed to make sure they get
// dealt with proactively rather than waiting for a future command to
// find them.
proposal.ec = makeUnreplicatedEndCmds(r, g, *st)
pr := makeProposalResult(proposal.Local.Reply, pErr, intents, endTxns)
proposal.finishApplication(ctx, pr)
return proposalCh, func() {}, "", nil, nil
}
// Make it a truly replicated proposal. We measure the replication latency
// from this point on.
proposal.ec = makeReplicatedEndCmds(r, g, *st, timeutil.Now())
log.VEventf(proposal.ctx, 2,
"proposing command to write %d new keys, %d new values, %d new intents, "+
"write batch size=%d bytes",
proposal.command.ReplicatedEvalResult.Delta.KeyCount,
proposal.command.ReplicatedEvalResult.Delta.ValCount,
proposal.command.ReplicatedEvalResult.Delta.IntentCount,
proposal.command.WriteBatch.Size(),
)
// NB: if ba.AsyncConsensus is true, we will tell admission control about
// writes that may not have happened yet. We consider this ok, since (a) the
// typical lag in consensus is expected to be small compared to the time
// granularity of admission control doing token and size estimation (which
// is 15s). Also, admission control corrects for gaps in reporting.
writeBytes := kvadmission.NewStoreWriteBytes()
if proposal.command.WriteBatch != nil {
writeBytes.WriteBytes = int64(len(proposal.command.WriteBatch.Data))
}
if proposal.command.ReplicatedEvalResult.AddSSTable != nil {
writeBytes.IngestedBytes = int64(len(proposal.command.ReplicatedEvalResult.AddSSTable.Data))
}
// If the request requested that Raft consensus be performed asynchronously,
// return a proposal result immediately on the proposal's done channel.
// The channel's capacity will be large enough to accommodate this.
maybeFinishSpan := func() {}
defer func() { maybeFinishSpan() }() // NB: late binding is important
if ba.AsyncConsensus {
if ets := proposal.Local.DetachEndTxns(false /* alwaysOnly */); len(ets) != 0 {
// Disallow async consensus for commands with EndTxnIntents because
// any !Always EndTxnIntent can't be cleaned up until after the
// command succeeds.
return nil, nil, "", writeBytes, kvpb.NewErrorf("cannot perform consensus asynchronously for "+
"proposal with EndTxnIntents=%v; %v", ets, ba)
}
// Fork the proposal's context span so that the proposal's context
// can outlive the original proposer's context.
proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus")
if proposal.sp != nil {
// We can't leak this span if we fail to hand the proposal to the
// replication layer, so finish it later in this method if we are to
// return with an error. (On success, we'll reset this to a noop).
maybeFinishSpan = proposal.sp.Finish
}
// Signal the proposal's response channel immediately.
reply := *proposal.Local.Reply
reply.Responses = append([]kvpb.ResponseUnion(nil), reply.Responses...)
pr := makeProposalResult(&reply, nil /* pErr */, proposal.Local.DetachEncounteredIntents(), nil /* eti */)
proposal.signalProposalResult(pr)
// Continue with proposal...
}
if meta := kvflowcontrol.MetaFromContext(ctx); meta != nil {
proposal.raftAdmissionMeta = meta
}
// Attach information about the proposer's lease to the command, for
// verification below raft. Lease requests are special since they are not
// necessarily proposed under a valid lease (by necessity). Instead, they
// reference the previous lease. Note that TransferLease also skip lease
// checks (for technical reasons, see `TransferLease.flags`) and uses the
// same mechanism.
if ba.IsSingleSkipsLeaseCheckRequest() {
// Lease-related commands have below-raft special casing and will carry the
// lease sequence of the lease they are intending to follow.
// The remaining requests that skip a lease check (at the time of writing
// ProbeRequest) will assign a zero lease sequence and thus won't be able
// to mutate state.
var seq roachpb.LeaseSequence
switch t := ba.Requests[0].GetInner().(type) {
case *kvpb.RequestLeaseRequest:
seq = t.PrevLease.Sequence
case *kvpb.TransferLeaseRequest:
seq = t.PrevLease.Sequence
default:
}
proposal.command.ProposerLeaseSequence = seq
} else if !st.Lease.OwnedBy(r.store.StoreID()) {
// Perform a sanity check that the lease is owned by this replica. This must
// have been ascertained by the callers in
// checkExecutionCanProceedBeforeStorageSnapshot.
log.Fatalf(ctx, "cannot propose %s on follower with remotely owned lease %s", ba, st.Lease)
} else {
proposal.command.ProposerLeaseSequence = st.Lease.Sequence
}
// Once a command is written to the raft log, it must be loaded into memory
// and replayed on all replicas. If a command is too big, stop it here. If
// the command is not too big, acquire an appropriate amount of quota from
// the replica's proposal quota pool.
//
// TODO(tschottdorf): blocking a proposal here will leave it dangling in the
// closed timestamp tracker for an extended period of time, which will in turn
// prevent the node-wide closed timestamp from making progress. This is quite
// unfortunate; we should hoist the quota pool before the reference with the
// closed timestamp tracker is acquired. This is better anyway; right now many
// commands can evaluate but then be blocked on quota, which has worse memory
// behavior.
quotaSize := uint64(proposal.command.Size())
if maxSize := uint64(kvserverbase.MaxCommandSize.Get(&r.store.cfg.Settings.SV)); quotaSize > maxSize {
return nil, nil, "", nil, kvpb.NewError(errors.Errorf(
"command is too large: %d bytes (max: %d)", quotaSize, maxSize,
))
}
log.VEventf(proposal.ctx, 2, "acquiring proposal quota (%d bytes)", quotaSize)
var err error
proposal.quotaAlloc, err = r.maybeAcquireProposalQuota(ctx, ba, quotaSize)
if err != nil {
return nil, nil, "", nil, kvpb.NewError(err)
}
// Make sure we clean up the proposal if we fail to insert it into the
// proposal buffer successfully. This ensures that we always release any
// quota that we acquire.
defer func() {
if pErr != nil {
proposal.releaseQuota()
}
}()
if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil {
filterArgs := kvserverbase.ProposalFilterArgs{
Ctx: ctx,
Cmd: proposal.command,
QuotaAlloc: proposal.quotaAlloc,
CmdID: idKey,
Req: *ba,
}
if pErr = filter(filterArgs); pErr != nil {
return nil, nil, "", nil, pErr
}
}
pErr = r.propose(ctx, proposal, tok.Move(ctx))
if pErr != nil {
return nil, nil, "", nil, pErr
}
// We've successfully handed the proposal to the replication layer, so this
// method should not finish the trace span if we forked one off above.
maybeFinishSpan = func() {}
// Abandoning a proposal unbinds its context so that the proposal's client
// is free to terminate execution. However, it does nothing to try to
// prevent the command from succeeding. In particular, endCmds will still be
// invoked when the command is applied. There are a handful of cases where
// the command may not be applied (or even processed): the process crashes
// or the local replica is removed from the range.
abandon := func() {
// The proposal may or may not be in the Replica's proposals map.
// Instead of trying to look it up, simply modify the captured object
// directly. The raftMu must be locked to modify the context of a
// proposal because as soon as we propose a command to Raft, ownership
// passes to the "below Raft" machinery.
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
// TODO(radu): Should this context be created via tracer.ForkSpan?
// We'd need to make sure the span is finished eventually.
proposal.ctx = r.AnnotateCtx(context.TODO())
}
return proposalCh, abandon, idKey, writeBytes, nil
}
// propose encodes a command, starts tracking it, and proposes it to Raft.
//
// The method hands ownership of the command over to the Raft machinery. After
// the method returns, all access to the command must be performed while holding
// Replica.mu and Replica.raftMu.
//
// propose takes ownership of the supplied token; the caller should tok.Move()
// it into this method. It will be used to untrack the request once it comes out
// of the proposal buffer.
//
// Note that this method is called for "new" proposals but also by
// `tryReproposeWithNewLeaseIndex`. This second call leaves questions on what
// exactly the desired semantics are - some fields (MaxLeaseIndex,
// ClosedTimestamp) will be set and this re-entrance into `propose`
// is hard to fully understand. (The reset of `MaxLeaseIndex` inside this
// method is a faer-fueled but likely unneeded consequence of this).
//
// TODO(repl): adopt the below issue which will see each proposal passed to this
// method exactly once:
//
// https://github.com/cockroachdb/cockroach/issues/98477
func (r *Replica) propose(
ctx context.Context, p *ProposalData, tok TrackedRequestToken,
) (pErr *kvpb.Error) {
defer tok.DoneIfNotMoved(ctx)
// If an error occurs reset the command's MaxLeaseIndex to its initial value.
// Failure to propose will propagate to the client. An invariant of this
// package is that proposals which are finished carry a raft command with a
// MaxLeaseIndex equal to the proposal command's max lease index.
defer func(prev kvpb.LeaseAppliedIndex) {
if useReproposalsV2 {
// The following poorly understood code is not necessary in V2 since
// we never mutate MaxLeaseIndex and don't hit propose() twice for
// the same *ProposalData.
return
}
if pErr != nil {
p.command.MaxLeaseIndex = prev
}
}(p.command.MaxLeaseIndex)
if !useReproposalsV2 {
// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a MaxLeaseFooter.
p.command.MaxLeaseIndex = 0
} else {
if p.command.MaxLeaseIndex > 0 {
// TODO: there are a number of other fields that should still be unset.
// Verify them all. Some architectural improvements where we pass in a
// subset of ProposalData and then complete it here would be even better.
return kvpb.NewError(errors.AssertionFailedf("MaxLeaseIndex is set: %+v", p))
}
}
if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil {
if err := checkReplicationChangeAllowed(p.command, r.Desc(), r.StoreID()); err != nil {
log.Errorf(ctx, "%v", err)
return kvpb.NewError(err)
}
log.KvDistribution.Infof(p.ctx, "proposing %s", crt)
} else if p.command.ReplicatedEvalResult.AddSSTable != nil {
log.VEvent(p.ctx, 4, "sideloadable proposal detected")
r.store.metrics.AddSSTableProposals.Inc(1)
} else if log.V(4) {
log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary())
}
raftAdmissionMeta := p.raftAdmissionMeta
if !p.useReplicationAdmissionControl() {
raftAdmissionMeta = nil
}
data, err := raftlog.EncodeCommand(ctx, p.command, p.idKey, raftAdmissionMeta)
if err != nil {
return kvpb.NewError(err)
}
p.encodedCommand = data
// Too verbose even for verbose logging, so manually enable if you want to
// debug proposal sizes.
if false {
log.Infof(p.ctx, `%s: proposal: %d
RaftCommand.ReplicatedEvalResult: %d
RaftCommand.ReplicatedEvalResult.Delta: %d
RaftCommand.WriteBatch: %d
`, p.Request.Summary(), p.command.Size(),
p.command.ReplicatedEvalResult.Size(),
p.command.ReplicatedEvalResult.Delta.Size(),
p.command.WriteBatch.Size(),
)
}
// Log an event if this is a large proposal. These are more likely to cause
// blips or worse, and it's good to be able to pick them from traces.
//
// TODO(tschottdorf): can we mark them so lightstep can group them?
const largeProposalEventThresholdBytes = 2 << 19 // 512kb
if ln := len(p.encodedCommand); ln > largeProposalEventThresholdBytes {
log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(ln)))
}
// Insert into the proposal buffer, which passes the command to Raft to be
// proposed. The proposal buffer assigns the command a maximum lease index
// when it sequences it.
//
// NB: we must not hold r.mu while using the proposal buffer, see comment
// on the field.
log.VEvent(p.ctx, 2, "submitting proposal to proposal buffer")
if err := r.mu.proposalBuf.Insert(ctx, p, tok.Move(ctx)); err != nil {
return kvpb.NewError(err)
}
return nil
}
func checkReplicationChangeAllowed(
command *kvserverpb.RaftCommand, desc *roachpb.RangeDescriptor, storeID roachpb.StoreID,
) error {
// The following deals with removing a leaseholder. A voter can be removed
// in two ways. 1) Simple (old style) where there is a reconfiguration
// turning a voter into a LEARNER / NON-VOTER. 2) Through an intermediate
// joint configuration, where the replica remains in the descriptor, but
// as VOTER_{OUTGOING, DEMOTING}. When leaving the JOINT config (a second
// Raft operation), the removed replica transitions a LEARNER / NON-VOTER.
//
// In case (1) the lease needs to be transferred out before a removal is
// proposed (cooperative transfer). The code below permits leaseholder
// removal only if entering a joint configuration (option 2 above) in which
// the leaseholder is (any kind of) voter, and in addition, this joint config
// should include a VOTER_INCOMING replica. In this case, the lease is
// transferred to this new replica in maybeLeaveAtomicChangeReplicas right
// before we exit the joint configuration.
//
// When the leaseholder is replaced by a new replica, transferring the
// lease in the joint config allows transferring directly from old to new,
// since both are active in the joint config, without going through a third
// node or adding the new node before transferring, which might reduce
// fault tolerance. For example, consider v1 in region1 (leaseholder), v2
// in region2 and v3 in region3. We want to relocate v1 to a new node v4 in
// region1. We add v4 as LEARNER. At this point we can't transfer the lease
// to v4, so we could transfer it to v2 first, but this is likely to hurt
// application performance. We could instead add v4 as VOTER first, and
// then transfer lease directly to v4, but this would change the number of
// replicas to 4, and if region1 goes down, we loose a quorum. Instead,
// we move to a joint config where v1 (VOTER_DEMOTING_LEARNER) transfer the
// lease to v4 (VOTER_INCOMING) directly.
//
// Our implementation assumes that the intention of the caller is for the
// VOTER_INCOMING node to be the replacement replica, and hence get the
// lease. We therefore don't dynamically select a lease target during the
// joint config, and hand it to the VOTER_INCOMING node. This means,
// however, that we only allow a VOTER_DEMOTING to have the lease in a
// joint configuration, when there's also a VOTER_INCOMING node (that
// will be used as a target for the lease transfer). Otherwise, the caller
// is expected to shed the lease before entering a joint configuration.
// See also https://github.com/cockroachdb/cockroach/issues/67740.
lhDesc, lhDescOK := desc.GetReplicaDescriptor(storeID)
if !lhDescOK {
return kvpb.NewRangeNotFoundError(desc.RangeID, storeID)
}
proposedDesc := command.ReplicatedEvalResult.State.Desc
// This is a reconfiguration command, we make sure the proposed
// config is legal w.r.t. the current leaseholder: we now allow the
// leaseholder to be a VOTER_DEMOTING as long as there is a VOTER_INCOMING.
// Otherwise, the leaseholder must be a full voter in the target config.
// This check won't allow exiting the joint config before the lease is
// transferred away. The previous leaseholder is a LEARNER in the target config,
// and therefore shouldn't continue holding the lease.
if err := roachpb.CheckCanReceiveLease(
lhDesc, proposedDesc.Replicas(), true, /* wasLastLeaseholder */
); err != nil {
err = errors.Handled(err)
err = errors.Mark(err, errMarkInvalidReplicationChange)
err = errors.Wrapf(err, "%v received invalid ChangeReplicasTrigger %s to "+
"remove self (leaseholder); lhRemovalAllowed: %v; current desc: %v; proposed desc: %v",
lhDesc, command.ReplicatedEvalResult.ChangeReplicas, true /* lhRemovalAllowed */, desc, proposedDesc)
return err
}
return nil
}
func (r *Replica) numPendingProposalsRLocked() int {
return len(r.mu.proposals) + r.mu.proposalBuf.AllocatedIdx()
}
// hasPendingProposalsRLocked is part of the quiescer interface.
// It returns true if this node has any outstanding proposals. A client might be
// waiting for the outcome of these proposals, so we definitely don't want to
// quiesce while such proposals are in-flight.
//
// Note that this method says nothing about other node's outstanding proposals:
// if this node is the current leaseholders, previous leaseholders might have
// proposals on which they're waiting. If this node is not the current
// leaseholder, then obviously whoever is the current leaseholder might have
// pending proposals. This method is called in two places: on the current
// leaseholder when deciding whether the leaseholder should attempt to quiesce
// the range, and then on every follower to confirm that the range can indeed be
// quiesced.
func (r *Replica) hasPendingProposalsRLocked() bool {
return r.numPendingProposalsRLocked() > 0 ||
// If slow proposals just finished, it's possible that
// refreshProposalsLocked hasn't been invoked yet. We don't want to quiesce
// until it has been, since otherwise we're never fully resetting this
// Replica's contribution to `requests.slow.raft`. So we only claim to
// have no pending proposals when we've done one last refresh that resets
// the counter, i.e. in a few ticks at most.
r.mu.slowProposalCount > 0
}
// hasPendingProposalQuotaRLocked is part of the quiescer interface. It returns
// true if there are any commands that haven't completed replicating that are
// tracked by this node's quota pool (i.e. commands that haven't been acked by
// all live replicas).
// We can't quiesce while there's outstanding quota because the respective quota
// would not be released while quiesced, and it might prevent the range from
// unquiescing (leading to deadlock). See #46699.
func (r *Replica) hasPendingProposalQuotaRLocked() bool {
if r.mu.proposalQuota == nil {
return false
}
return !r.mu.proposalQuota.Full()
}
// ticksSinceLastProposalRLocked returns the number of ticks since the last
// proposal.
func (r *Replica) ticksSinceLastProposalRLocked() int {
return r.mu.ticks - r.mu.lastProposalAtTicks
}
// isRaftLeader returns true if this replica believes it is the current
// Raft leader.
//
// NB: This can race with Raft ready processing, where the Raft group has
// processed a leader change before updating the replica state in a separate
// critical section. The caller should always verify this against the Raft
// status where necessary.
func (r *Replica) isRaftLeaderRLocked() bool {
// Defensively check replicaID != 0.
return r.replicaID != 0 && r.replicaID == r.mu.leaderID
}
var errRemoved = errors.New("replica removed")
// stepRaftGroup calls Step on the replica's RawNode with the provided request's
// message. Before doing so, it assures that the replica is unquiesced and ready
// to handle the request.
func (r *Replica) stepRaftGroup(req *kvserverpb.RaftMessageRequest) error {
// We're processing an incoming raft message (from a batch that may
// include MsgVotes), so don't campaign if we wake up our raft
// group to avoid election ties.
const mayCampaign = false
return r.withRaftGroup(mayCampaign, func(raftGroup *raft.RawNode) (bool, error) {
// If we're a follower, and we receive a message from a non-leader replica
// while quiesced, we wake up the leader too to prevent spurious elections.
//
// This typically happens in the case of a partial network partition where
// some other replica is partitioned away from the leader but can reach this
// replica. In that case, the partitioned replica will send us a prevote
// message, which we'll typically reject (e.g. because it's behind on its
// log, or if CheckQuorum+PreVote is enabled because we have a current
// leader). However, if we don't also wake the leader, we'll now have two
// unquiesced followers, and eventually they'll call an election that
// unseats the leader. If this replica wins (often the case since we're
// up-to-date on the log), then we'll immediately transfer leadership back
// to the leaseholder, i.e. the old leader, and the cycle repeats.
//
// Note that such partial partitions will typically result in persistent
// mass unquiescence due to the continuous prevotes.
if r.mu.quiescent {
st := r.raftBasicStatusRLocked()
hasLeader := st.RaftState == raft.StateFollower && st.Lead != 0
fromLeader := uint64(req.FromReplica.ReplicaID) == st.Lead
wakeLeader := hasLeader && !fromLeader
r.maybeUnquiesceLocked(wakeLeader, mayCampaign)
}
r.mu.lastUpdateTimes.update(req.FromReplica.ReplicaID, timeutil.Now())
switch req.Message.Type {
case raftpb.MsgPreVote, raftpb.MsgVote:
// If we receive a (pre)vote request, and we find our leader to be dead or
// removed, forget it so we can grant the (pre)votes.
r.maybeForgetLeaderOnVoteRequestLocked()
case raftpb.MsgSnap:
// Occasionally a snapshot message may arrive under an outdated term,
// which would lead to Raft discarding the snapshot. This should be
// really rare in practice, but it does happen in tests and in particular
// can happen to the synchronous snapshots on the learner path, which
// will then have to wait for the raft snapshot queue to send another
// snapshot. However, in some tests it is desirable to disable the
// raft snapshot queue. This workaround makes that possible.
//
// See TestReportUnreachableRemoveRace for the test that prompted
// this addition.
if term := raftGroup.BasicStatus().Term; term > req.Message.Term {
req.Message.Term = term
}
}
err := raftGroup.Step(req.Message)
if errors.Is(err, raft.ErrProposalDropped) {
// A proposal was forwarded to this replica but we couldn't propose it.
// Swallow the error since we don't have an effective way of signaling
// this to the sender.
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
err = nil
}
return false /* unquiesceAndWakeLeader */, err
})
}
type handleSnapshotStats struct {
offered bool
applied bool
}
type handleRaftReadyStats struct {
tBegin, tEnd time.Time
append logstore.AppendStats
tApplicationBegin, tApplicationEnd time.Time
apply applyCommittedEntriesStats
tSnapBegin, tSnapEnd time.Time
snap handleSnapshotStats
}
// SafeFormat implements redact.SafeFormatter
func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) {
dTotal := s.tEnd.Sub(s.tBegin)
dAppend := s.append.End.Sub(s.append.Begin)
dApply := s.tApplicationEnd.Sub(s.tApplicationBegin)
dPebble := s.append.PebbleEnd.Sub(s.append.PebbleBegin)
dSnap := s.tSnapEnd.Sub(s.tSnapBegin)
dUnaccounted := dTotal - dSnap - dAppend - dApply - dPebble
{
p.Printf("raft ready handling: %.2fs [append=%.2fs, apply=%.2fs, ",
dTotal.Seconds(), dAppend.Seconds(), dApply.Seconds())
if s.append.Sync {
var sync redact.SafeString
if s.append.NonBlocking {
sync = "non-blocking-sync" // actual sync time not reflected in this case
} else {
sync = "sync"
}
p.Printf("%s=%.2fs", sync, dPebble.Seconds())
}
}
if dSnap > 0 {
p.Printf(", snap=%.2fs", dSnap.Seconds())
}
p.Printf(", other=%.2fs]", dUnaccounted.Seconds())
p.Printf(", wrote [")
if b := s.append.PebbleBytes; b > 0 {
p.Printf("append-batch=%s, ", humanizeutil.IBytes(b))
}
if b, n := s.append.RegularBytes, s.append.RegularEntries; n > 0 || b > 0 {
p.Printf("append-ent=%s (%d), ", humanizeutil.IBytes(b), n)
}
if b, n := s.append.SideloadedBytes, s.append.SideloadedEntries; n > 0 || b > 0 {
p.Printf("append-sst=%s (%d), ", humanizeutil.IBytes(b), n)
}
if b, n := s.apply.numEntriesProcessedBytes, s.apply.numEntriesProcessed; n > 0 || b > 0 {
p.Printf("apply=%s (%d", humanizeutil.IBytes(b), n)
if c := s.apply.numBatchesProcessed; c > 1 {
p.Printf(" in %d batches", c)
}
p.SafeString(")")
}
p.SafeString("]")
if n := s.apply.stateAssertions; n > 0 {
p.Printf(", state_assertions=%d", n)
}
if s.snap.offered {
if s.snap.applied {
p.Printf(", snapshot applied")
} else {
p.Printf(", snapshot ignored")
}
}
if !(s.append.PebbleCommitStats == storage.BatchCommitStats{}) {
p.Printf(" pebble stats: [%s]", s.append.PebbleCommitStats)
}
}
func (s handleRaftReadyStats) String() string {
return redact.StringWithoutMarkers(s)
}
// noSnap can be passed to handleRaftReady when no snapshot should be processed.
var noSnap IncomingSnapshot
// handleRaftReady processes a raft.Ready containing entries and messages that
// are ready to read, be saved to stable storage, committed, or sent to other
// peers. It takes a non-empty IncomingSnapshot to indicate that it is
// about to process a snapshot.
//
// The returned string is nonzero whenever an error is returned to give a
// non-sensitive cue as to what happened.
func (r *Replica) handleRaftReady(
ctx context.Context, inSnap IncomingSnapshot,
) (handleRaftReadyStats, error) {
// Don't process anything if this fn returns false.
if fn := r.store.cfg.TestingKnobs.DisableProcessRaft; fn != nil && fn(r.store.StoreID()) {
return handleRaftReadyStats{
tBegin: timeutil.Now(),
tEnd: timeutil.Now(),
}, nil
}
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.handleRaftReadyRaftMuLocked(ctx, inSnap)
}
// handleRaftReadyRaftMuLocked is the same as handleRaftReady but requires that
// the replica's raftMu be held.
//
// The returned string is nonzero whenever an error is returned to give a
// non-sensitive cue as to what happened.
func (r *Replica) handleRaftReadyRaftMuLocked(
ctx context.Context, inSnap IncomingSnapshot,
) (stats handleRaftReadyStats, _ error) {
// handleRaftReadyRaftMuLocked is not prepared to handle context cancellation,
// so assert that it's given a non-cancellable context.
if ctx.Done() != nil {
return handleRaftReadyStats{}, errors.AssertionFailedf(
"handleRaftReadyRaftMuLocked cannot be called with a cancellable context")
}
// NB: we need to reference the named return parameter here. If `stats` were
// just a local, we'd be modifying the local but not the return value in the
// defer below.
stats = handleRaftReadyStats{
tBegin: timeutil.Now(),
}
defer func() {
stats.tEnd = timeutil.Now()
}()
if inSnap.Desc != nil {
stats.snap.offered = true
}
var hasReady bool
var softState *raft.SoftState
var outboundMsgs []raftpb.Message
var msgStorageAppend, msgStorageApply raftpb.Message
r.mu.Lock()
state := logstore.RaftState{ // used for append below
LastIndex: r.mu.lastIndexNotDurable,
LastTerm: r.mu.lastTermNotDurable,
ByteSize: r.mu.raftLogSize,
}
leaderID := r.mu.leaderID
lastLeaderID := leaderID
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup)
numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup)
if err != nil {
return false, err
}
if hasReady = raftGroup.HasReady(); hasReady {
syncRd := raftGroup.Ready()
logRaftReady(ctx, syncRd)
asyncRd := makeAsyncReady(syncRd)
softState = asyncRd.SoftState
outboundMsgs, msgStorageAppend, msgStorageApply = splitLocalStorageMsgs(asyncRd.Messages)
}
// We unquiesce if we have a Ready (= there's work to do). We also have
// to unquiesce if we just flushed some proposals but there isn't a
// Ready, which can happen if the proposals got dropped (raft does this
// if it doesn't know who the leader is). And, for extra defense in depth,
// we also unquiesce if there are outstanding proposals.
//
// NB: if we had the invariant that the group can only be in quiesced
// state if it knows the leader (state.Lead) AND we knew that raft would
// never give us an empty ready here (i.e. the only reason to drop a
// proposal is not knowing the leader) then numFlushed would not be
// necessary. The latter is likely true but we don't want to rely on
// it. The former is maybe true, but there's no easy way to enforce it.
unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0
return unquiesceAndWakeLeader, nil
})
r.mu.applyingEntries = hasMsg(msgStorageApply)
pausedFollowers := r.mu.pausedFollowers
r.mu.Unlock()
if errors.Is(err, errRemoved) {
// If we've been removed then just return.
return stats, nil
} else if err != nil {
return stats, errors.Wrap(err, "checking raft group for Ready")
}
if !hasReady {
// We must update the proposal quota even if we don't have a ready.
// Consider the case when our quota is of size 1 and two out of three
// replicas have committed one log entry while the third is lagging
// behind. When the third replica finally does catch up and sends
// along a MsgAppResp, since the entry is already committed on the
// leader replica, no Ready is emitted. But given that the third
// replica has caught up, we can release
// some quota back to the pool.
r.updateProposalQuotaRaftMuLocked(ctx, lastLeaderID)
return stats, nil
}
refreshReason := noReason
if softState != nil && leaderID != roachpb.ReplicaID(softState.Lead) {
// Refresh pending commands if the Raft leader has changed. This is usually
// the first indication we have of a new leader on a restarted node.
//
// TODO(peter): Re-proposing commands when SoftState.Lead changes can lead
// to wasteful multiple-reproposals when we later see an empty Raft command
// indicating a newly elected leader or a conf change. Replay protection
// prevents any corruption, so the waste is only a performance issue.
if log.V(3) {
log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, softState.Lead)
}
if !r.store.TestingKnobs().DisableRefreshReasonNewLeader {
refreshReason = reasonNewLeader
}
leaderID = roachpb.ReplicaID(softState.Lead)
}
r.traceMessageSends(outboundMsgs, "sending messages")
r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers, true /* willDeliverLocal */)
// If the ready struct includes entries that have been committed, these
// entries will be applied to the Replica's replicated state machine down
// below, after appending new entries to the raft log and sending messages
// to peers. However, the process of appending new entries to the raft log
// and then applying committed entries to the state machine can take some
// time - and these entries are already durably committed. If they have
// clients waiting on them, we'd like to acknowledge their success as soon
// as possible. To facilitate this, we take a quick pass over the committed
// entries and acknowledge as many as we can trivially prove will not be
// rejected beneath raft.
//
// Note that the Entries slice in the MsgStorageApply cannot refer to entries
// that are also in the Entries slice in the MsgStorageAppend. Raft will not
// allow unstable entries to be applied when AsyncStorageWrites is enabled.
//
// If we disable AsyncStorageWrites in the future, this property will no
// longer be true, and the two slices could overlap. For example, this can
// happen when a follower is being caught up on committed commands. We could
// acknowledge these commands early even though they aren't durably in the
// local raft log yet (since they're committed via a quorum elsewhere), but
// we'd likely want to revert to an earlier version of this code that chose to
// be conservative and avoid this behavior by passing the last Ready cycle's
// `lastIndex` for a maxIndex argument to
// AckCommittedEntriesBeforeApplication.
//
// TODO(nvanbenschoten): this is less important with async storage writes.
// Consider getting rid of it.
sm := r.getStateMachine()
dec := r.getDecoder()
var appTask apply.Task
if hasMsg(msgStorageApply) {
appTask = apply.MakeTask(sm, dec)
appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize)
defer appTask.Close()
if err := appTask.Decode(ctx, msgStorageApply.Entries); err != nil {
return stats, err
}
if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication {
if err := appTask.AckCommittedEntriesBeforeApplication(ctx); err != nil {
return stats, err
}
}
}
if hasMsg(msgStorageAppend) {
if msgStorageAppend.Snapshot != nil {
if inSnap.Desc == nil {
// If we didn't expect Raft to have a snapshot but it has one
// regardless, that is unexpected and indicates a programming
// error.
return stats, errors.AssertionFailedf(
"have inSnap=nil, but raft has a snapshot %s",
raft.DescribeSnapshot(*msgStorageAppend.Snapshot),
)
}
snapUUID, err := uuid.FromBytes(msgStorageAppend.Snapshot.Data)
if err != nil {
return stats, errors.Wrap(err, "invalid snapshot id")
}
if inSnap.SnapUUID == (uuid.UUID{}) {
log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath")
}
if snapUUID != inSnap.SnapUUID {
log.Fatalf(ctx, "incoming snapshot id doesn't match raft snapshot id: %s != %s", snapUUID, inSnap.SnapUUID)
}
snap := *msgStorageAppend.Snapshot
hs := raftpb.HardState{
Term: msgStorageAppend.Term,
Vote: msgStorageAppend.Vote,
Commit: msgStorageAppend.Commit,
}
if len(msgStorageAppend.Entries) != 0 {
log.Fatalf(ctx, "found Entries in MsgStorageAppend with non-empty Snapshot")
}
// Applying this snapshot may require us to subsume one or more of our right
// neighbors. This occurs if this replica is informed about the merges via a
// Raft snapshot instead of a MsgApp containing the merge commits, e.g.,
// because it went offline before the merge commits applied and did not come
// back online until after the merge commits were truncated away.
subsumedRepls, releaseMergeLock := r.maybeAcquireSnapshotMergeLock(ctx, inSnap)
defer releaseMergeLock()
stats.tSnapBegin = timeutil.Now()
if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil {
return stats, errors.Wrap(err, "while applying snapshot")
}
stats.tSnapEnd = timeutil.Now()
stats.snap.applied = true
// r.mu.lastIndexNotDurable, r.mu.lastTermNotDurable and r.mu.raftLogSize
// were updated in applySnapshot, but we also want to make sure we reflect
// these changes in the local variables we're tracking here.
r.mu.RLock()
state = logstore.RaftState{
LastIndex: r.mu.lastIndexNotDurable,
LastTerm: r.mu.lastTermNotDurable,
ByteSize: r.mu.raftLogSize,
}
r.mu.RUnlock()
// We refresh pending commands after applying a snapshot because this
// replica may have been temporarily partitioned from the Raft group and
// missed leadership changes that occurred. Suppose node A is the leader,
// and then node C gets partitioned away from the others. Leadership passes
// back and forth between A and B during the partition, but when the
// partition is healed node A is leader again.
if !r.store.TestingKnobs().DisableRefreshReasonSnapshotApplied &&
refreshReason == noReason {
refreshReason = reasonSnapshotApplied
}
// Send MsgStorageAppend's responses.
r.sendRaftMessages(ctx, msgStorageAppend.Responses, nil /* blocked */, true /* willDeliverLocal */)
} else {
// TODO(pavelkalinnikov): find a way to move it to storeEntries.
if msgStorageAppend.Commit != 0 && !r.IsInitialized() {
log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s", r)
}
// TODO(pavelkalinnikov): construct and store this in Replica.
// TODO(pavelkalinnikov): fields like raftEntryCache are the same across all
// ranges, so can be passed to LogStore methods instead of being stored in it.
s := logstore.LogStore{
RangeID: r.RangeID,
Engine: r.store.TODOEngine(),
Sideload: r.raftMu.sideloaded,
StateLoader: r.raftMu.stateLoader.StateLoader,
SyncWaiter: r.store.syncWaiter,
EntryCache: r.store.raftEntryCache,
Settings: r.store.cfg.Settings,
Metrics: logstore.Metrics{
RaftLogCommitLatency: r.store.metrics.RaftLogCommitLatency,
},
}