-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathraft_snapshot_queue.go
159 lines (142 loc) · 5.43 KB
/
raft_snapshot_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package storage
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
)
const (
// raftSnapshotQueueTimerDuration is the duration between Raft snapshot of
// queued replicas.
raftSnapshotQueueTimerDuration = 0 // zero duration to process Raft snapshots greedily
raftSnapshotPriority float64 = 0
)
// raftSnapshotQueue manages a queue of replicas which may need to catch a
// replica up with a snapshot to their range.
type raftSnapshotQueue struct {
*baseQueue
}
// newRaftSnapshotQueue returns a new instance of raftSnapshotQueue.
func newRaftSnapshotQueue(store *Store, g *gossip.Gossip) *raftSnapshotQueue {
rq := &raftSnapshotQueue{}
rq.baseQueue = newBaseQueue(
"raftsnapshot", rq, store, g,
queueConfig{
maxSize: defaultQueueMaxSize,
// The Raft leader (which sends Raft snapshots) may not be the
// leaseholder. Operating on a replica without holding the lease is the
// reason Raft snapshots cannot be performed by the replicateQueue.
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos,
},
)
return rq
}
func (rq *raftSnapshotQueue) shouldQueue(
ctx context.Context, now hlc.Timestamp, repl *Replica, _ *config.SystemConfig,
) (shouldQ bool, priority float64) {
// If a follower needs a snapshot, enqueue at the highest priority.
if status := repl.RaftStatus(); status != nil {
// raft.Status.Progress is only populated on the Raft group leader.
for id, p := range status.Progress {
if p.State == raft.ProgressStateSnapshot {
// We refuse to send a snapshot of type RAFT to a learner for reasons
// described in processRaftSnapshot, so don't bother queueing.
for _, r := range repl.Desc().Replicas().Learners() {
if r.ReplicaID == roachpb.ReplicaID(id) {
continue
}
}
if log.V(2) {
log.Infof(ctx, "raft snapshot needed, enqueuing")
}
return true, raftSnapshotPriority
}
}
}
return false, 0
}
func (rq *raftSnapshotQueue) process(
ctx context.Context, repl *Replica, _ *config.SystemConfig,
) error {
// If a follower requires a Raft snapshot, perform it.
if status := repl.RaftStatus(); status != nil {
// raft.Status.Progress is only populated on the Raft group leader.
for id, p := range status.Progress {
if p.State == raft.ProgressStateSnapshot {
if log.V(1) {
log.Infof(ctx, "sending raft snapshot")
}
if err := rq.processRaftSnapshot(ctx, repl, roachpb.ReplicaID(id)); err != nil {
return err
}
}
}
}
return nil
}
func (rq *raftSnapshotQueue) processRaftSnapshot(
ctx context.Context, repl *Replica, id roachpb.ReplicaID,
) error {
desc := repl.Desc()
repDesc, ok := desc.GetReplicaDescriptorByID(id)
if !ok {
return errors.Errorf("%s: replica %d not present in %v", repl, id, desc.Replicas())
}
// A learner replica is either getting a snapshot of type LEARNER by the node
// that's adding it or it's been orphaned and it's about to be cleaned up.
// Either way, no point in also sending it a snapshot of type RAFT.
if repDesc.GetType() == roachpb.ReplicaType_LEARNER {
log.Eventf(ctx, "not sending snapshot type RAFT to learner: %s", repDesc)
return nil
}
err := repl.sendSnapshot(ctx, repDesc, SnapshotRequest_RAFT, SnapshotRequest_RECOVERY)
// NB: if the snapshot fails because of an overlapping replica on the
// recipient which is also waiting for a snapshot, the "smart" thing is to
// send that other snapshot with higher priority. The problem is that the
// leader for the overlapping range may not be this node. This happens
// typically during splits and merges when overly aggressive log truncations
// occur.
//
// For splits, the overlapping range will be a replica of the pre-split
// range that needs a snapshot to catch it up across the split trigger.
//
// For merges, the overlapping replicas belong to ranges since subsumed by
// this range. In particular, there can be many of them if merges apply in
// rapid succession. The leftmost replica is the most important one to catch
// up, as it will absorb all of the overlapping replicas when caught up past
// all of the merges.
//
// We're currently not handling this and instead rely on the quota pool to
// make sure that log truncations won't require snapshots for healthy
// followers.
// Report the snapshot status to Raft, which expects us to do this once
// we finish sending the snapshot.
repl.reportSnapshotStatus(ctx, id, err)
return err
}
func (*raftSnapshotQueue) timer(_ time.Duration) time.Duration {
return raftSnapshotQueueTimerDuration
}
func (rq *raftSnapshotQueue) purgatoryChan() <-chan time.Time {
return nil
}