-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
log_unstable.go
248 lines (228 loc) · 8.99 KB
/
log_unstable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// This code has been modified from its original form by Cockroach Labs, Inc.
// All modifications are Copyright 2024 Cockroach Labs, Inc.
//
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
// unstable contains "unstable" log entries and snapshot state that has
// not yet been written to Storage. The type serves two roles. First, it
// holds on to new log entries and an optional snapshot until they are
// handed to a Ready struct for persistence. Second, it continues to
// hold on to this state after it has been handed off to provide raftLog
// with a view of the in-progress log entries and snapshot until their
// writes have been stabilized and are guaranteed to be reflected in
// queries of Storage. After this point, the corresponding log entries
// and/or snapshot can be cleared from unstable.
//
// unstable.entries[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
// entries[i] has raft log position i+offset.
offset uint64
// if true, snapshot is being written to storage.
snapshotInProgress bool
// entries[:offsetInProgress-offset] are being written to storage.
// Like offset, offsetInProgress is exclusive, meaning that it
// contains the index following the largest in-progress entry.
// Invariant: offset <= offsetInProgress
offsetInProgress uint64
logger Logger
}
// maybeFirstIndex returns the index of the first possible entry in entries
// if it has a snapshot.
func (u *unstable) maybeFirstIndex() (uint64, bool) {
if u.snapshot != nil {
return u.snapshot.Metadata.Index + 1, true
}
return 0, false
}
// maybeLastIndex returns the last index if it has at least one
// unstable entry or snapshot.
func (u *unstable) maybeLastIndex() (uint64, bool) {
if l := len(u.entries); l != 0 {
return u.offset + uint64(l) - 1, true
}
if u.snapshot != nil {
return u.snapshot.Metadata.Index, true
}
return 0, false
}
// maybeTerm returns the term of the entry at index i, if there
// is any.
func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
if i < u.offset {
if u.snapshot != nil && u.snapshot.Metadata.Index == i {
return u.snapshot.Metadata.Term, true
}
return 0, false
}
last, ok := u.maybeLastIndex()
if !ok {
return 0, false
}
if i > last {
return 0, false
}
return u.entries[i-u.offset].Term, true
}
// nextEntries returns the unstable entries that are not already in the process
// of being written to storage.
func (u *unstable) nextEntries() []pb.Entry {
inProgress := int(u.offsetInProgress - u.offset)
if len(u.entries) == inProgress {
return nil
}
return u.entries[inProgress:]
}
// nextSnapshot returns the unstable snapshot, if one exists that is not already
// in the process of being written to storage.
func (u *unstable) nextSnapshot() *pb.Snapshot {
if u.snapshot == nil || u.snapshotInProgress {
return nil
}
return u.snapshot
}
// acceptInProgress marks all entries and the snapshot, if any, in the unstable
// as having begun the process of being written to storage. The entries/snapshot
// will no longer be returned from nextEntries/nextSnapshot. However, new
// entries/snapshots added after a call to acceptInProgress will be returned
// from those methods, until the next call to acceptInProgress.
func (u *unstable) acceptInProgress() {
if len(u.entries) > 0 {
// NOTE: +1 because offsetInProgress is exclusive, like offset.
u.offsetInProgress = u.entries[len(u.entries)-1].Index + 1
}
if u.snapshot != nil {
u.snapshotInProgress = true
}
}
// stableTo marks entries up to the entry with the specified (index, term) as
// being successfully written to stable storage.
//
// The method should only be called when the caller can attest that the entries
// can not be overwritten by an in-progress log append. See the related comment
// in newStorageAppendRespMsg.
func (u *unstable) stableTo(id entryID) {
gt, ok := u.maybeTerm(id.index)
if !ok {
// Unstable entry missing. Ignore.
u.logger.Infof("entry at index %d missing from unstable log; ignoring", id.index)
return
}
if id.index < u.offset {
// Index matched unstable snapshot, not unstable entry. Ignore.
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", id.index)
return
}
if gt != id.term {
// Term mismatch between unstable entry and specified entry. Ignore.
// This is possible if part or all of the unstable log was replaced
// between that time that a set of entries started to be written to
// stable storage and when they finished.
u.logger.Infof("entry at (index,term)=(%d,%d) mismatched with "+
"entry at (%d,%d) in unstable log; ignoring", id.index, id.term, id.index, gt)
return
}
num := int(id.index + 1 - u.offset)
u.entries = u.entries[num:]
u.offset = id.index + 1
u.offsetInProgress = max(u.offsetInProgress, u.offset)
u.shrinkEntriesArray()
}
// shrinkEntriesArray discards the underlying array used by the entries slice
// if most of it isn't being used. This avoids holding references to a bunch of
// potentially large entries that aren't needed anymore. Simply clearing the
// entries wouldn't be safe because clients might still be using them.
func (u *unstable) shrinkEntriesArray() {
// We replace the array if we're using less than half of the space in
// it. This number is fairly arbitrary, chosen as an attempt to balance
// memory usage vs number of allocations. It could probably be improved
// with some focused tuning.
const lenMultiple = 2
if len(u.entries) == 0 {
u.entries = nil
} else if len(u.entries)*lenMultiple < cap(u.entries) {
newEntries := make([]pb.Entry, len(u.entries))
copy(newEntries, u.entries)
u.entries = newEntries
}
}
func (u *unstable) stableSnapTo(i uint64) {
if u.snapshot != nil && u.snapshot.Metadata.Index == i {
u.snapshot = nil
u.snapshotInProgress = false
}
}
func (u *unstable) restore(s pb.Snapshot) {
u.offset = s.Metadata.Index + 1
u.offsetInProgress = u.offset
u.entries = nil
u.snapshot = &s
u.snapshotInProgress = false
}
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
fromIndex := ents[0].Index
switch {
case fromIndex == u.offset+uint64(len(u.entries)):
// fromIndex is the next index in the u.entries, so append directly.
u.entries = append(u.entries, ents...)
case fromIndex <= u.offset:
u.logger.Infof("replace the unstable entries from index %d", fromIndex)
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries.
u.entries = ents
u.offset = fromIndex
u.offsetInProgress = u.offset
default:
// Truncate to fromIndex (exclusive), and append the new entries.
u.logger.Infof("truncate the unstable entries before index %d", fromIndex)
keep := u.slice(u.offset, fromIndex) // NB: appending to this slice is safe,
u.entries = append(keep, ents...) // and will reallocate/copy it
// Only in-progress entries before fromIndex are still considered to be
// in-progress.
u.offsetInProgress = min(u.offsetInProgress, fromIndex)
}
}
// slice returns the entries from the unstable log with indexes in the range
// [lo, hi). The entire range must be stored in the unstable log or the method
// will panic. The returned slice can be appended to, but the entries in it must
// not be changed because they are still shared with unstable.
//
// TODO(pavelkalinnikov): this, and similar []pb.Entry slices, may bubble up all
// the way to the application code through Ready struct. Protect other slices
// similarly, and document how the client can use them.
func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
u.mustCheckOutOfBounds(lo, hi)
// NB: use the full slice expression to limit what the caller can do with the
// returned slice. For example, an append will reallocate and copy this slice
// instead of corrupting the neighbouring u.entries.
return u.entries[lo-u.offset : hi-u.offset : hi-u.offset]
}
// u.offset <= lo <= hi <= u.offset+len(u.entries)
func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
if lo > hi {
u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi)
}
upper := u.offset + uint64(len(u.entries))
if lo < u.offset || hi > upper {
u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
}
}