-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathreplica_raft.go
2317 lines (2119 loc) · 91.8 KB
/
replica_raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package storage
import (
"context"
"fmt"
"math/rand"
"sort"
"time"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/kr/pretty"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)
// insertProposalLocked assigns a MaxLeaseIndex to a proposal and adds
// it to the pending map. Returns the assigned MaxLeaseIndex, if any.
func (r *Replica) insertProposalLocked(
proposal *ProposalData, proposerReplica roachpb.ReplicaDescriptor, proposerLease roachpb.Lease,
) int64 {
// Assign a lease index. Note that we do this as late as possible
// to make sure (to the extent that we can) that we don't assign
// (=predict) the index differently from the order in which commands are
// proposed (and thus likely applied).
if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex {
r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex
}
isLease := proposal.Request.IsLeaseRequest()
if !isLease {
r.mu.lastAssignedLeaseIndex++
}
proposal.command.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex
proposal.command.ProposerReplica = proposerReplica
proposal.command.ProposerLeaseSequence = proposerLease.Sequence
if log.V(4) {
log.Infof(proposal.ctx, "submitting proposal %x: maxLeaseIndex=%d",
proposal.idKey, proposal.command.MaxLeaseIndex)
}
if _, ok := r.mu.proposals[proposal.idKey]; ok {
ctx := r.AnnotateCtx(context.TODO())
log.Fatalf(ctx, "pending command already exists for %s", proposal.idKey)
}
r.mu.proposals[proposal.idKey] = proposal
if isLease {
// For lease requests, we return zero because no real MaxLeaseIndex is assigned.
// We could also return the lastAssignedIndex but this invites confusion.
return 0
}
return int64(proposal.command.MaxLeaseIndex)
}
func makeIDKey() storagebase.CmdIDKey {
idKeyBuf := make([]byte, 0, raftCommandIDLen)
idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63()))
return storagebase.CmdIDKey(idKeyBuf)
}
// propose prepares the necessary pending command struct and initializes a
// client command ID if one hasn't been. A verified lease is supplied
// as a parameter if the command requires a lease; nil otherwise. It
// then proposes the command to Raft if necessary and returns
// - a channel which receives a response or error upon application
// - a closure used to attempt to abandon the command. When called, it tries to
// remove the pending command from the internal commands map. This is
// possible until execution of the command at the local replica has already
// begun, in which case false is returned and the client needs to continue
// waiting for successful execution.
// - a callback to undo quota acquisition if the attempt to propose the batch
// request to raft fails. This also cleans up the command sizes stored for
// the corresponding proposal.
// - the MaxLeaseIndex of the resulting proposal, if any.
// - any error obtained during the creation or proposal of the command, in
// which case the other returned values are zero.
func (r *Replica) propose(
ctx context.Context,
lease roachpb.Lease,
ba roachpb.BatchRequest,
endCmds *endCmds,
spans *spanset.SpanSet,
) (_ chan proposalResult, _ func() bool, _ int64, pErr *roachpb.Error) {
// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
r.mu.RLock()
if !r.mu.destroyStatus.IsAlive() {
err := r.mu.destroyStatus.err
r.mu.RUnlock()
return nil, nil, 0, roachpb.NewError(err)
}
r.mu.RUnlock()
rSpan, err := keys.Range(ba)
if err != nil {
return nil, nil, 0, roachpb.NewError(err)
}
// Checking the context just before proposing can help avoid ambiguous errors.
if err := ctx.Err(); err != nil {
errStr := fmt.Sprintf("%s before proposing: %s", err, ba.Summary())
log.Warning(ctx, errStr)
return nil, nil, 0, roachpb.NewError(errors.Wrap(err, "aborted before proposing"))
}
// Only need to check that the request is in bounds at proposal time, not at
// application time, because the spanlatch manager will synchronize all
// requests (notably EndTransaction with SplitTrigger) that may cause this
// condition to change.
if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, nil, 0, roachpb.NewError(err)
}
idKey := makeIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, endCmds, spans)
log.Event(proposal.ctx, "evaluated request")
// Pull out proposal channel to return. proposal.doneCh may be set to
// nil if it is signaled in this function.
proposalCh := proposal.doneCh
// There are two cases where request evaluation does not lead to a Raft
// proposal:
// 1. proposal.command == nil indicates that the evaluation was a no-op
// and that no Raft command needs to be proposed.
// 2. pErr != nil corresponds to a failed proposal - the command resulted
// in an error.
if proposal.command == nil {
intents := proposal.Local.DetachIntents()
endTxns := proposal.Local.DetachEndTxns(pErr != nil /* alwaysOnly */)
r.handleLocalEvalResult(ctx, *proposal.Local)
pr := proposalResult{
Reply: proposal.Local.Reply,
Err: pErr,
Intents: intents,
EndTxns: endTxns,
}
proposal.finishApplication(pr)
return proposalCh, func() bool { return false }, 0, nil
}
// If the request requested that Raft consensus be performed asynchronously,
// return a proposal result immediately on the proposal's done channel.
// The channel's capacity will be large enough to accommodate this.
if ba.AsyncConsensus {
if ets := proposal.Local.DetachEndTxns(false /* alwaysOnly */); len(ets) != 0 {
// Disallow async consensus for commands with EndTxnIntents because
// any !Always EndTxnIntent can't be cleaned up until after the
// command succeeds.
return nil, nil, 0, roachpb.NewErrorf("cannot perform consensus asynchronously for "+
"proposal with EndTxnIntents=%v; %v", ets, ba)
}
// Fork the proposal's context span so that the proposal's context
// can outlive the original proposer's context.
proposal.ctx, proposal.sp = tracing.ForkCtxSpan(ctx, "async consensus")
// Signal the proposal's response channel immediately.
reply := *proposal.Local.Reply
reply.Responses = append([]roachpb.ResponseUnion(nil), reply.Responses...)
pr := proposalResult{
Reply: &reply,
Intents: proposal.Local.DetachIntents(),
}
proposal.signalProposalResult(pr)
// Continue with proposal...
}
// TODO(irfansharif): This int cast indicates that if someone configures a
// very large max proposal size, there is weird overflow behavior and it
// will not work the way it should.
proposalSize := proposal.command.Size()
if proposalSize > int(MaxCommandSize.Get(&r.store.cfg.Settings.SV)) {
// Once a command is written to the raft log, it must be loaded
// into memory and replayed on all replicas. If a command is
// too big, stop it here.
return nil, nil, 0, roachpb.NewError(errors.Errorf(
"command is too large: %d bytes (max: %d)",
proposalSize, MaxCommandSize.Get(&r.store.cfg.Settings.SV),
))
}
// TODO(tschottdorf): blocking a proposal here will leave it dangling in the
// closed timestamp tracker for an extended period of time, which will in turn
// prevent the node-wide closed timestamp from making progress. This is quite
// unfortunate; we should hoist the quota pool before the reference with the
// closed timestamp tracker is acquired. This is better anyway; right now many
// commands can evaluate but then be blocked on quota, which has worse memory
// behavior.
if err := r.maybeAcquireProposalQuota(ctx, int64(proposalSize)); err != nil {
return nil, nil, 0, roachpb.NewError(err)
}
// submitProposalLocked calls withRaftGroupLocked which requires that raftMu
// is held, but only if Replica.mu.internalRaftGroup == nil. To avoid
// locking Replica.raftMu in the common case where the raft group is
// non-nil, we lock only Replica.mu at first before checking the status of
// the internal raft group. If it equals nil then we fall back to the slow
// path of locking Replica.raftMu. However, in order to maintain our lock
// ordering we need to lock Replica.raftMu here before locking Replica.mu,
// so we unlock Replica.mu before locking them both again.
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.internalRaftGroup == nil {
// Unlock first before locking in {raft,replica}mu order.
r.mu.Unlock()
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
log.Event(proposal.ctx, "acquired {raft,replica}mu")
} else {
log.Event(proposal.ctx, "acquired replica mu")
}
// Add size of proposal to commandSizes map.
if r.mu.commandSizes != nil {
r.mu.commandSizes[proposal.idKey] = proposalSize
}
// Make sure we clean up the proposal if we fail to submit it successfully.
// This is important both to ensure that that the proposals map doesn't
// grow without bound and to ensure that we always release any quota that
// we acquire.
defer func() {
if pErr != nil {
r.cleanupFailedProposalLocked(proposal)
}
}()
// NB: We need to check Replica.mu.destroyStatus again in case the Replica has
// been destroyed between the initial check at the beginning of this method
// and the acquisition of Replica.mu. Failure to do so will leave pending
// proposals that never get cleared.
if !r.mu.destroyStatus.IsAlive() {
return nil, nil, 0, roachpb.NewError(r.mu.destroyStatus.err)
}
repDesc, err := r.getReplicaDescriptorRLocked()
if err != nil {
return nil, nil, 0, roachpb.NewError(err)
}
maxLeaseIndex := r.insertProposalLocked(proposal, repDesc, lease)
if maxLeaseIndex == 0 && !ba.IsLeaseRequest() {
log.Fatalf(ctx, "no MaxLeaseIndex returned for %s", ba)
}
if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil {
filterArgs := storagebase.ProposalFilterArgs{
Ctx: ctx,
Cmd: *proposal.command,
CmdID: idKey,
Req: ba,
}
if pErr := filter(filterArgs); pErr != nil {
return nil, nil, 0, pErr
}
}
if err := r.submitProposalLocked(proposal); err == raft.ErrProposalDropped {
// Silently ignore dropped proposals (they were always silently ignored
// prior to the introduction of ErrProposalDropped).
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
} else if err != nil {
return nil, nil, 0, roachpb.NewError(err)
}
// Must not use `proposal` in the closure below as a proposal which is not
// present in r.mu.proposals is no longer protected by the mutex. Abandoning
// a command only abandons the associated context. As soon as we propose a
// command to Raft, ownership passes to the "below Raft" machinery. In
// particular, endCmds will be invoked when the command is applied. There are
// a handful of cases where the command may not be applied (or even
// processed): the process crashes or the local replica is removed from the
// range.
tryAbandon := func() bool {
r.mu.Lock()
p, ok := r.mu.proposals[idKey]
if ok {
// TODO(radu): Should this context be created via tracer.ForkCtxSpan?
// We'd need to make sure the span is finished eventually.
p.ctx = r.AnnotateCtx(context.TODO())
}
r.mu.Unlock()
return ok
}
return proposalCh, tryAbandon, maxLeaseIndex, nil
}
// submitProposalLocked proposes or re-proposes a command in r.mu.proposals.
// The replica lock must be held.
func (r *Replica) submitProposalLocked(p *ProposalData) error {
p.proposedAtTicks = r.mu.ticks
if r.mu.submitProposalFn != nil {
return r.mu.submitProposalFn(p)
}
return defaultSubmitProposalLocked(r, p)
}
func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
data, err := protoutil.Marshal(p.command)
if err != nil {
return err
}
defer r.store.enqueueRaftUpdateCheck(r.RangeID)
// Too verbose even for verbose logging, so manually enable if you want to
// debug proposal sizes.
if false {
log.Infof(p.ctx, `%s: proposal: %d
RaftCommand.ProposerReplica: %d
RaftCommand.ReplicatedEvalResult: %d
RaftCommand.ReplicatedEvalResult.Delta: %d
RaftCommand.WriteBatch: %d
`, p.Request.Summary(), len(data),
p.command.ProposerReplica.Size(),
p.command.ReplicatedEvalResult.Size(),
p.command.ReplicatedEvalResult.Delta.Size(),
p.command.WriteBatch.Size(),
)
}
const largeProposalEventThresholdBytes = 2 << 19 // 512kb
// Log an event if this is a large proposal. These are more likely to cause
// blips or worse, and it's good to be able to pick them from traces.
//
// TODO(tschottdorf): can we mark them so lightstep can group them?
if size := len(data); size > largeProposalEventThresholdBytes {
log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(size)))
}
if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil {
// EndTransactionRequest with a ChangeReplicasTrigger is special
// because raft needs to understand it; it cannot simply be an
// opaque command.
log.Infof(p.ctx, "proposing %s", crt)
// Ensure that we aren't trying to remove ourselves from the range without
// having previously given up our lease, since the range won't be able
// to make progress while the lease is owned by a removed replica (and
// leases can stay in such a state for a very long time when using epoch-
// based range leases). This shouldn't happen often, but has been seen
// before (#12591).
if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID {
log.Errorf(p.ctx, "received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt)
return errors.Errorf("%s: received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", r, crt)
}
confChangeCtx := ConfChangeContext{
CommandID: string(p.idKey),
Payload: data,
Replica: crt.Replica,
}
encodedCtx, err := protoutil.Marshal(&confChangeCtx)
if err != nil {
return err
}
return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
// We're proposing a command here so there is no need to wake the
// leader if we were quiesced.
r.unquiesceLocked()
return false, /* unquiesceAndWakeLeader */
raftGroup.ProposeConfChange(raftpb.ConfChange{
Type: changeTypeInternalToRaft[crt.ChangeType],
NodeID: uint64(crt.Replica.ReplicaID),
Context: encodedCtx,
})
})
}
return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
encode := encodeRaftCommandV1
if p.command.ReplicatedEvalResult.AddSSTable != nil {
if p.command.ReplicatedEvalResult.AddSSTable.Data == nil {
return false, errors.New("cannot sideload empty SSTable")
}
encode = encodeRaftCommandV2
r.store.metrics.AddSSTableProposals.Inc(1)
log.Event(p.ctx, "sideloadable proposal detected")
}
if log.V(4) {
log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary())
}
// We're proposing a command so there is no need to wake the leader if
// we're quiesced.
r.unquiesceLocked()
return false /* unquiesceAndWakeLeader */, raftGroup.Propose(encode(p.idKey, data))
})
}
// stepRaftGroup calls Step on the replica's RawNode with the provided request's
// message. Before doing so, it assures that the replica is unquiesced and ready
// to handle the request.
func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
// We're processing an incoming raft message (from a batch that may
// include MsgVotes), so don't campaign if we wake up our raft
// group.
return r.withRaftGroup(false, func(raftGroup *raft.RawNode) (bool, error) {
// We're processing a message from another replica which means that the
// other replica is not quiesced, so we don't need to wake the leader.
// Note that we avoid campaigning when receiving raft messages, because
// we expect the originator to campaign instead.
r.unquiesceWithOptionsLocked(false /* campaignOnWake */)
r.mu.lastUpdateTimes.update(req.FromReplica.ReplicaID, timeutil.Now())
err := raftGroup.Step(req.Message)
if err == raft.ErrProposalDropped {
// A proposal was forwarded to this replica but we couldn't propose it.
// Swallow the error since we don't have an effective way of signaling
// this to the sender.
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
err = nil
}
return false /* unquiesceAndWakeLeader */, err
})
}
type handleRaftReadyStats struct {
processed int
}
// noSnap can be passed to handleRaftReady when no snapshot should be processed.
var noSnap IncomingSnapshot
// handleRaftReady processes a raft.Ready containing entries and messages that
// are ready to read, be saved to stable storage, committed or sent to other
// peers. It takes a non-empty IncomingSnapshot to indicate that it is
// about to process a snapshot.
//
// The returned string is nonzero whenever an error is returned to give a
// non-sensitive cue as to what happened.
func (r *Replica) handleRaftReady(
ctx context.Context, inSnap IncomingSnapshot,
) (handleRaftReadyStats, string, error) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.handleRaftReadyRaftMuLocked(ctx, inSnap)
}
// handleRaftReadyLocked is the same as handleRaftReady but requires that the
// replica's raftMu be held.
//
// The returned string is nonzero whenever an error is returned to give a
// non-sensitive cue as to what happened.
func (r *Replica) handleRaftReadyRaftMuLocked(
ctx context.Context, inSnap IncomingSnapshot,
) (handleRaftReadyStats, string, error) {
var stats handleRaftReadyStats
var hasReady bool
var rd raft.Ready
r.mu.Lock()
lastIndex := r.mu.lastIndex // used for append below
lastTerm := r.mu.lastTerm
raftLogSize := r.mu.raftLogSize
leaderID := r.mu.leaderID
lastLeaderID := leaderID
// We defer the check to Replica.updateProposalQuotaRaftMuLocked because we need
// to check it in both cases, if hasReady is false and otherwise.
// If hasReady == false:
// Consider the case when our quota is of size 1 and two out of three
// replicas have committed one log entry while the third is lagging
// behind. When the third replica finally does catch up and sends
// along a MsgAppResp, since the entry is already committed on the
// leader replica, no Ready is emitted. But given that the third
// replica has caught up, we can release
// some quota back to the pool.
// Otherwise:
// Consider the case where there are two replicas and we have a quota
// of size 1. We acquire the quota when the write gets proposed on the
// leader and expect it to be released when the follower commits it
// locally. In order to do so we need to have the entry 'come out of
// raft' and in the case of a two node raft group, this only happens if
// hasReady == true.
// If we don't release quota back at the end of
// handleRaftReadyRaftMuLocked, the next write will get blocked.
defer r.updateProposalQuotaRaftMuLocked(ctx, lastLeaderID)
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
if hasReady = raftGroup.HasReady(); hasReady {
rd = raftGroup.Ready()
}
return hasReady /* unquiesceAndWakeLeader */, nil
})
r.mu.Unlock()
if err != nil {
const expl = "while checking raft group for Ready"
return stats, expl, errors.Wrap(err, expl)
}
if !hasReady {
return stats, "", nil
}
logRaftReady(ctx, rd)
refreshReason := noReason
if rd.SoftState != nil && leaderID != roachpb.ReplicaID(rd.SoftState.Lead) {
// Refresh pending commands if the Raft leader has changed. This is usually
// the first indication we have of a new leader on a restarted node.
//
// TODO(peter): Re-proposing commands when SoftState.Lead changes can lead
// to wasteful multiple-reproposals when we later see an empty Raft command
// indicating a newly elected leader or a conf change. Replay protection
// prevents any corruption, so the waste is only a performance issue.
if log.V(3) {
log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, rd.SoftState.Lead)
}
if !r.store.TestingKnobs().DisableRefreshReasonNewLeader {
refreshReason = reasonNewLeader
}
leaderID = roachpb.ReplicaID(rd.SoftState.Lead)
}
if !raft.IsEmptySnap(rd.Snapshot) {
snapUUID, err := uuid.FromBytes(rd.Snapshot.Data)
if err != nil {
const expl = "invalid snapshot id"
return stats, expl, errors.Wrap(err, expl)
}
if inSnap.SnapUUID == (uuid.UUID{}) {
log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath")
}
if snapUUID != inSnap.SnapUUID {
log.Fatalf(ctx, "incoming snapshot id doesn't match raft snapshot id: %s != %s", snapUUID, inSnap.SnapUUID)
}
// Applying this snapshot may require us to subsume one or more of our right
// neighbors. This occurs if this replica is informed about the merges via a
// Raft snapshot instead of a MsgApp containing the merge commits, e.g.,
// because it went offline before the merge commits applied and did not come
// back online until after the merge commits were truncated away.
subsumedRepls, releaseMergeLock := r.maybeAcquireSnapshotMergeLock(ctx, inSnap)
defer releaseMergeLock()
if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState, subsumedRepls); err != nil {
const expl = "while applying snapshot"
return stats, expl, errors.Wrap(err, expl)
}
// r.mu.lastIndex, r.mu.lastTerm and r.mu.raftLogSize were updated in
// applySnapshot, but we also want to make sure we reflect these changes in
// the local variables we're tracking here.
r.mu.RLock()
lastIndex = r.mu.lastIndex
lastTerm = r.mu.lastTerm
raftLogSize = r.mu.raftLogSize
r.mu.RUnlock()
// We refresh pending commands after applying a snapshot because this
// replica may have been temporarily partitioned from the Raft group and
// missed leadership changes that occurred. Suppose node A is the leader,
// and then node C gets partitioned away from the others. Leadership passes
// back and forth between A and B during the partition, but when the
// partition is healed node A is leader again.
if !r.store.TestingKnobs().DisableRefreshReasonSnapshotApplied &&
refreshReason == noReason {
refreshReason = reasonSnapshotApplied
}
}
// Separate the MsgApp messages from all other Raft message types so that we
// can take advantage of the optimization discussed in the Raft thesis under
// the section: `10.2.1 Writing to the leader’s disk in parallel`. The
// optimization suggests that instead of a leader writing new log entries to
// disk before replicating them to its followers, the leader can instead
// write the entries to disk in parallel with replicating to its followers
// and them writing to their disks.
//
// Here, we invoke this optimization by:
// 1. sending all MsgApps.
// 2. syncing all entries and Raft state to disk.
// 3. sending all other messages.
//
// Since this is all handled in handleRaftReadyRaftMuLocked, we're assured
// that even though we may sync new entries to disk after sending them in
// MsgApps to followers, we'll always have them synced to disk before we
// process followers' MsgAppResps for the corresponding entries because this
// entire method requires RaftMu to be locked. This is a requirement because
// etcd/raft does not support commit quorums that do not include the leader,
// even though the Raft thesis states that this would technically be safe:
// > The leader may even commit an entry before it has been written to its
// > own disk, if a majority of followers have written it to their disks;
// > this is still safe.
//
// However, MsgApps are also used to inform followers of committed entries
// through the Commit index that they contains. Because the optimization
// sends all MsgApps before syncing to disc, we may send out a commit index
// in a MsgApp that we have not ourselves written in HardState.Commit. This
// is ok, because the Commit index can be treated as volatile state, as is
// supported by raft.MustSync. The Raft thesis corroborates this, stating in
// section: `3.8 Persisted state and server restarts` that:
// > Other state variables are safe to lose on a restart, as they can all be
// > recreated. The most interesting example is the commit index, which can
// > safely be reinitialized to zero on a restart.
msgApps, otherMsgs := splitMsgApps(rd.Messages)
r.sendRaftMessages(ctx, msgApps)
// Use a more efficient write-only batch because we don't need to do any
// reads from the batch. Any reads are performed via the "distinct" batch
// which passes the reads through to the underlying DB.
batch := r.store.Engine().NewWriteOnlyBatch()
defer batch.Close()
// We know that all of the writes from here forward will be to distinct keys.
writer := batch.Distinct()
prevLastIndex := lastIndex
if len(rd.Entries) > 0 {
// All of the entries are appended to distinct keys, returning a new
// last index.
thinEntries, sideLoadedEntriesSize, err := r.maybeSideloadEntriesRaftMuLocked(ctx, rd.Entries)
if err != nil {
const expl = "during sideloading"
return stats, expl, errors.Wrap(err, expl)
}
raftLogSize += sideLoadedEntriesSize
if lastIndex, lastTerm, raftLogSize, err = r.append(
ctx, writer, lastIndex, lastTerm, raftLogSize, thinEntries,
); err != nil {
const expl = "during append"
return stats, expl, errors.Wrap(err, expl)
}
}
if !raft.IsEmptyHardState(rd.HardState) {
if !r.IsInitialized() && rd.HardState.Commit != 0 {
log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState)
}
if err := r.raftMu.stateLoader.SetHardState(ctx, writer, rd.HardState); err != nil {
const expl = "during setHardState"
return stats, expl, errors.Wrap(err, expl)
}
}
writer.Close()
// Synchronously commit the batch with the Raft log entries and Raft hard
// state as we're promising not to lose this data.
//
// Note that the data is visible to other goroutines before it is synced to
// disk. This is fine. The important constraints are that these syncs happen
// before Raft messages are sent and before the call to RawNode.Advance. Our
// regular locking is sufficient for this and if other goroutines can see the
// data early, that's fine. In particular, snapshots are not a problem (I
// think they're the only thing that might access log entries or HardState
// from other goroutines). Snapshots do not include either the HardState or
// uncommitted log entries, and even if they did include log entries that
// were not persisted to disk, it wouldn't be a problem because raft does not
// infer the that entries are persisted on the node that sends a snapshot.
start := timeutil.Now()
if err := batch.Commit(rd.MustSync && !disableSyncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil {
const expl = "while committing batch"
return stats, expl, errors.Wrap(err, expl)
}
elapsed := timeutil.Since(start)
r.store.metrics.RaftLogCommitLatency.RecordValue(elapsed.Nanoseconds())
if len(rd.Entries) > 0 {
// We may have just overwritten parts of the log which contain
// sideloaded SSTables from a previous term (and perhaps discarded some
// entries that we didn't overwrite). Remove any such leftover on-disk
// payloads (we can do that now because we've committed the deletion
// just above).
firstPurge := rd.Entries[0].Index // first new entry written
purgeTerm := rd.Entries[0].Term - 1
lastPurge := prevLastIndex // old end of the log, include in deletion
purgedSize, err := maybePurgeSideloaded(ctx, r.raftMu.sideloaded, firstPurge, lastPurge, purgeTerm)
if err != nil {
const expl = "while purging sideloaded storage"
return stats, expl, err
}
raftLogSize -= purgedSize
if raftLogSize < 0 {
// Might have gone negative if node was recently restarted.
raftLogSize = 0
}
}
// Update protected state - last index, last term, raft log size, and raft
// leader ID.
r.mu.Lock()
r.mu.lastIndex = lastIndex
r.mu.lastTerm = lastTerm
r.mu.raftLogSize = raftLogSize
var becameLeader bool
if r.mu.leaderID != leaderID {
r.mu.leaderID = leaderID
// Clear the remote proposal set. Would have been nil already if not
// previously the leader.
becameLeader = r.mu.leaderID == r.mu.replicaID
}
r.mu.Unlock()
// When becoming the leader, proactively add the replica to the replicate
// queue. We might have been handed leadership by a remote node which wanted
// to remove itself from the range.
if becameLeader && r.store.replicateQueue != nil {
r.store.replicateQueue.MaybeAdd(r, r.store.Clock().Now())
}
// Update raft log entry cache. We clear any older, uncommitted log entries
// and cache the latest ones.
r.store.raftEntryCache.Add(r.RangeID, rd.Entries)
r.sendRaftMessages(ctx, otherMsgs)
for _, e := range rd.CommittedEntries {
switch e.Type {
case raftpb.EntryNormal:
// NB: Committed entries are handed to us by Raft. Raft does not
// know about sideloading. Consequently the entries here are all
// already inlined.
var commandID storagebase.CmdIDKey
var command storagepb.RaftCommand
// Process committed entries. etcd raft occasionally adds a nil entry
// (our own commands are never empty). This happens in two situations:
// When a new leader is elected, and when a config change is dropped due
// to the "one at a time" rule. In both cases we may need to resubmit our
// pending proposals (In the former case we resubmit everything because
// we proposed them to a former leader that is no longer able to commit
// them. In the latter case we only need to resubmit pending config
// changes, but it's hard to distinguish so we resubmit everything
// anyway). We delay resubmission until after we have processed the
// entire batch of entries.
if len(e.Data) == 0 {
// Overwrite unconditionally since this is the most aggressive
// reproposal mode.
if !r.store.TestingKnobs().DisableRefreshReasonNewLeaderOrConfigChange {
refreshReason = reasonNewLeaderOrConfigChange
}
commandID = "" // special-cased value, command isn't used
} else {
var encodedCommand []byte
commandID, encodedCommand = DecodeRaftCommand(e.Data)
// An empty command is used to unquiesce a range and wake the
// leader. Clear commandID so it's ignored for processing.
if len(encodedCommand) == 0 {
commandID = ""
} else if err := protoutil.Unmarshal(encodedCommand, &command); err != nil {
const expl = "while unmarshalling entry"
return stats, expl, errors.Wrap(err, expl)
}
}
if changedRepl := r.processRaftCommand(ctx, commandID, e.Term, e.Index, command); changedRepl {
log.Fatalf(ctx, "unexpected replication change from command %s", &command)
}
r.store.metrics.RaftCommandsApplied.Inc(1)
stats.processed++
r.mu.Lock()
if r.mu.replicaID == r.mu.leaderID {
// At this point we're not guaranteed to have proposalQuota
// initialized, the same is true for quotaReleaseQueue and
// commandSizes. By checking if the specified commandID is
// present in commandSizes, we'll only queue the cmdSize if
// they're all initialized.
if cmdSize, ok := r.mu.commandSizes[commandID]; ok {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmdSize)
delete(r.mu.commandSizes, commandID)
}
}
r.mu.Unlock()
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := protoutil.Unmarshal(e.Data, &cc); err != nil {
const expl = "while unmarshaling ConfChange"
return stats, expl, errors.Wrap(err, expl)
}
var ccCtx ConfChangeContext
if err := protoutil.Unmarshal(cc.Context, &ccCtx); err != nil {
const expl = "while unmarshaling ConfChangeContext"
return stats, expl, errors.Wrap(err, expl)
}
var command storagepb.RaftCommand
if err := protoutil.Unmarshal(ccCtx.Payload, &command); err != nil {
const expl = "while unmarshaling RaftCommand"
return stats, expl, errors.Wrap(err, expl)
}
commandID := storagebase.CmdIDKey(ccCtx.CommandID)
if changedRepl := r.processRaftCommand(
ctx, commandID, e.Term, e.Index, command,
); !changedRepl {
// If we did not apply the config change, tell raft that the config change was aborted.
cc = raftpb.ConfChange{}
}
stats.processed++
r.mu.Lock()
if r.mu.replicaID == r.mu.leaderID {
if cmdSize, ok := r.mu.commandSizes[commandID]; ok {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmdSize)
delete(r.mu.commandSizes, commandID)
}
}
r.mu.Unlock()
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ApplyConfChange(cc)
return true, nil
}); err != nil {
const expl = "during ApplyConfChange"
return stats, expl, errors.Wrap(err, expl)
}
default:
log.Fatalf(ctx, "unexpected Raft entry: %v", e)
}
}
if refreshReason != noReason {
r.mu.Lock()
r.refreshProposalsLocked(0, refreshReason)
r.mu.Unlock()
}
// TODO(bdarnell): need to check replica id and not Advance if it
// has changed. Or do we need more locking to guarantee that replica
// ID cannot change during handleRaftReady?
const expl = "during advance"
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.Advance(rd)
// If the Raft group still has more to process then we immediately
// re-enqueue it for another round of processing. This is possible if
// the group's committed entries were paginated due to size limitations
// and we didn't apply all of them in this pass.
if raftGroup.HasReady() {
r.store.enqueueRaftUpdateCheck(r.RangeID)
}
return true, nil
}); err != nil {
return stats, expl, errors.Wrap(err, expl)
}
return stats, "", nil
}
// splitMsgApps splits the Raft message slice into two slices, one containing
// MsgApps and one containing all other message types. Each slice retains the
// relative ordering between messages in the original slice.
func splitMsgApps(msgs []raftpb.Message) (msgApps, otherMsgs []raftpb.Message) {
splitIdx := 0
for i, msg := range msgs {
if msg.Type == raftpb.MsgApp {
msgs[i], msgs[splitIdx] = msgs[splitIdx], msgs[i]
splitIdx++
}
}
return msgs[:splitIdx], msgs[splitIdx:]
}
func fatalOnRaftReadyErr(ctx context.Context, expl string, err error) {
// Mimic the behavior in processRaft.
log.Fatalf(ctx, "%s: %s", log.Safe(expl), err) // TODO(bdarnell)
}
// tick the Raft group, returning true if the raft group exists and is
// unquiesced; false otherwise.
func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) {
ctx := r.AnnotateCtx(context.TODO())
r.unreachablesMu.Lock()
remotes := r.unreachablesMu.remotes
r.unreachablesMu.remotes = nil
r.unreachablesMu.Unlock()
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
// If the raft group is uninitialized, do not initialize on tick.
if r.mu.internalRaftGroup == nil {
return false, nil
}
for remoteReplica := range remotes {
r.mu.internalRaftGroup.ReportUnreachable(uint64(remoteReplica))
}
if r.mu.quiescent {
return false, nil
}
if r.maybeQuiesceLocked(ctx, livenessMap) {
return false, nil
}
r.maybeTransferRaftLeadershipLocked(ctx)
r.mu.ticks++
r.mu.internalRaftGroup.Tick()
refreshAtDelta := r.store.cfg.RaftElectionTimeoutTicks
if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 {
refreshAtDelta = knob
}
if !r.store.TestingKnobs().DisableRefreshReasonTicks && r.mu.ticks%refreshAtDelta == 0 {
// RaftElectionTimeoutTicks is a reasonable approximation of how long we
// should wait before deciding that our previous proposal didn't go
// through. Note that the combination of the above condition and passing
// RaftElectionTimeoutTicks to refreshProposalsLocked means that commands
// will be refreshed when they have been pending for 1 to 2 election
// cycles.
r.refreshProposalsLocked(refreshAtDelta, reasonTicks)
}
return true, nil
}
func (r *Replica) hasRaftReadyRLocked() bool {
return r.mu.internalRaftGroup.HasReady()
}
//go:generate stringer -type refreshRaftReason
type refreshRaftReason int
const (
noReason refreshRaftReason = iota
reasonNewLeader
reasonNewLeaderOrConfigChange
// A snapshot was just applied and so it may have contained commands that we
// proposed whose proposal we still consider to be inflight. These commands
// will never receive a response through the regular channel.
reasonSnapshotApplied
reasonReplicaIDChanged
reasonTicks
)
// refreshProposalsLocked goes through the pending proposals, notifying
// proposers whose proposals need to be retried, and resubmitting proposals
// which were likely dropped (but may still apply at a legal Lease index) -
// ensuring that the proposer will eventually get a reply on the channel it's
// waiting on.
// mu must be held.
//
// refreshAtDelta only applies for reasonTicks and specifies how old (in ticks)
// a command must be for it to be inspected; the usual value is the number of
// ticks of an election timeout (affect only proposals that have had ample time
// to apply but didn't).
func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftReason) {
if refreshAtDelta != 0 && reason != reasonTicks {
log.Fatalf(context.TODO(), "refreshAtDelta specified for reason %s != reasonTicks", reason)
}
numShouldRetry := 0
var reproposals pendingCmdSlice
for _, p := range r.mu.proposals {
if p.command.MaxLeaseIndex == 0 {
// Commands without a MaxLeaseIndex cannot be reproposed, as they might
// apply twice. We also don't want to ask the proposer to retry these
// special commands.
r.cleanupFailedProposalLocked(p)
log.VEventf(p.ctx, 2, "refresh (reason: %s) returning AmbiguousResultError for command "+
"without MaxLeaseIndex: %v", reason, p.command)
p.finishApplication(proposalResult{Err: roachpb.NewError(
roachpb.NewAmbiguousResultError(
fmt.Sprintf("unknown status for command without MaxLeaseIndex "+
"at refreshProposalsLocked time (refresh reason: %s)", reason)))})
} else if cannotApplyAnyMore := !p.command.ReplicatedEvalResult.IsLeaseRequest &&
p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex; cannotApplyAnyMore {
// The command's designated lease index slot was filled up. We got to