-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathmvcc_incremental_iterator.go
328 lines (302 loc) · 11.1 KB
/
mvcc_incremental_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package engine
import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
)
// MVCCIncrementalIterator iterates over the diff of the key range
// [startKey,endKey) and time range (startTime,endTime]. If a key was added or
// modified between startTime and endTime, the iterator will position at the
// most recent version (before or at endTime) of that key. If the key was most
// recently deleted, this is signaled with an empty value.
//
// MVCCIncrementalIterator will return an error if either of the following are
// encountered:
// 1. An inline value (non-user data)
// 2. An intent whose timestamp lies within the time bounds
//
// Note: The endTime is inclusive to be consistent with the non-incremental
// iterator, where reads at a given timestamp return writes at that
// timestamp. The startTime is then made exclusive so that iterating time 1 to
// 2 and then 2 to 3 will only return values with time 2 once. An exclusive
// start time would normally make it difficult to scan timestamp 0, but
// CockroachDB uses that as a sentinel for key metadata anyway.
//
// Expected usage:
// iter := NewMVCCIncrementalIterator(e, IterOptions{
// StartTime: startTime,
// EndTime: endTime,
// UpperBound: endKey,
// })
// defer iter.Close()
// for iter.SeekGE(startKey); ; iter.Next() {
// ok, err := iter.Valid()
// if !ok { ... }
// [code using iter.Key() and iter.Value()]
// }
// if err := iter.Error(); err != nil {
// ...
// }
//
// Note regarding the correctness of the time-bound iterator optimization:
//
// When using (t_s, t_e], say there is a version (committed or provisional)
// k@t where t is in that interval, that is visible to iter. All sstables
// containing k@t will be included in timeBoundIter. Note that there may be
// multiple sequence numbers for the key k@t at the storage layer, say k@t#n1,
// k@t#n2, where n1 > n2, some of which may be deleted, but the latest
// sequence number will be visible using iter (since not being visible would be
// a contradiction of the initial assumption that k@t is visible to iter).
// Since there is no delete across all sstables that deletes k@t#n1, there is
// no delete in the subset of sstables used by timeBoundIter that deletes
// k@t#n1, so the timeBoundIter will see k@t.
//
// NOTE: This is not used by CockroachDB and has been preserved to serve as an
// oracle to prove the correctness of the new export logic.
type MVCCIncrementalIterator struct {
iter Iterator
// A time-bound iterator cannot be used by itself due to a bug in the time-
// bound iterator (#28358). This was historically augmented with an iterator
// without the time-bound optimization to act as a sanity iterator, but
// issues remained (#43799), so now the iterator above is the main iterator
// the timeBoundIter is used to check if any keys can be skipped by the main
// iterator.
timeBoundIter Iterator
startTime hlc.Timestamp
endTime hlc.Timestamp
err error
valid bool
// For allocation avoidance.
meta enginepb.MVCCMetadata
}
var _ SimpleIterator = &MVCCIncrementalIterator{}
// MVCCIncrementalIterOptions bundles options for NewMVCCIncrementalIterator.
type MVCCIncrementalIterOptions struct {
IterOptions IterOptions
StartTime hlc.Timestamp
EndTime hlc.Timestamp
}
// NewMVCCIncrementalIterator creates an MVCCIncrementalIterator with the
// specified reader and options.
func NewMVCCIncrementalIterator(
reader Reader, opts MVCCIncrementalIterOptions,
) *MVCCIncrementalIterator {
var iter Iterator
var timeBoundIter Iterator
if !opts.IterOptions.MinTimestampHint.IsEmpty() && !opts.IterOptions.MaxTimestampHint.IsEmpty() {
// An iterator without the timestamp hints is created to ensure that the
// iterator visits every required version of every key that has changed.
iter = reader.NewIterator(IterOptions{
UpperBound: opts.IterOptions.UpperBound,
})
timeBoundIter = reader.NewIterator(opts.IterOptions)
} else {
iter = reader.NewIterator(opts.IterOptions)
}
return &MVCCIncrementalIterator{
iter: iter,
startTime: opts.StartTime,
endTime: opts.EndTime,
timeBoundIter: timeBoundIter,
}
}
// SeekGE advances the iterator to the first key in the engine which is >= the
// provided key.
func (i *MVCCIncrementalIterator) SeekGE(startKey MVCCKey) {
if i.timeBoundIter != nil {
i.timeBoundIter.SeekGE(startKey)
if ok, err := i.timeBoundIter.Valid(); !ok {
i.err = err
i.valid = false
return
}
tbiKey := i.timeBoundIter.Key().Key
// The main iterator should jump to further of the start key and the first
// key seen by the TBI.
if tbiKey.Compare(startKey.Key) > 0 {
// Seek to the first version of the key that TBI provided.
startKey = MakeMVCCMetadataKey(tbiKey)
}
}
i.iter.SeekGE(startKey)
i.err = nil
i.valid = true
i.advance()
}
// Close frees up resources held by the iterator.
func (i *MVCCIncrementalIterator) Close() {
i.iter.Close()
if i.timeBoundIter != nil {
i.timeBoundIter.Close()
}
}
// Next advances the iterator to the next key/value in the iteration. After this
// call, Valid() will be true if the iterator was not positioned at the last
// key.
func (i *MVCCIncrementalIterator) Next() {
i.iter.Next()
i.advance()
}
// NextKey advances the iterator to the next key. This operation is
// distinct from Next which advances to the next version of the current key or
// the next key if the iterator is currently located at the last version for a
// key.
func (i *MVCCIncrementalIterator) NextKey() {
i.iter.NextKey()
i.advance()
}
// maybeSkipKeys checks if any keys can be skipped by using a time-bound
// iterator. If it can, the main iterator is advanced to the next key
// that should be considered. The TBI will point to a key with the same
// MVCC key.
//
// Pre: The TBI should be pointing to either the same key or the key prior
// to the same key as the main iterator. This is enforced by a combination
// of a) only updating the time-bound iterator from this method, and seeking
// the main iterator to match and b) only calling Next/NextKey on the main
// iterator once between invocations of this method.
//
// Post: If the main iterator was ahead of the TBI, the TBI will be advanced
// and the main iterator will point to the same MVCC key as the TBI, but
// possibly different timestamp.
func (i *MVCCIncrementalIterator) maybeSkipKeys() {
if i.timeBoundIter != nil {
// If we have a time bound iterator, we should check if we should skip
// keys.
tbiKey := i.timeBoundIter.Key().Key
iterKey := i.iter.Key().Key
if iterKey.Compare(tbiKey) > 0 {
// If the iterKey got ahead of the TBI key, advance the TBI Key.
i.timeBoundIter.NextKey()
if ok, err := i.timeBoundIter.Valid(); !ok {
i.err = err
i.valid = false
return
}
tbiKey = i.timeBoundIter.Key().Key
// The fast-path is when the TBI and the main iterator are in lockstep.
// In this case, the main iterator was referencing the next key that
// would be visited by the TBI. Now they should be referencing the same
// MVCC key, so the if statement below will not be entered.
// This means that for the incremental iterator to perform a Next or
// NextKey in this case, it will require only 1 extra NextKey call to the
// TBI.
if iterKey.Compare(tbiKey) < 0 {
// In the case that the next MVCC key that the TBI observes is not the
// same as the main iterator, we may be able to skip over a large group
// of keys. The main iterator is seeked to the TBI in hopes that many
// keys were skipped. Note that a Seek is an order of magnitude more
// expensive than a Next call.
seekKey := MakeMVCCMetadataKey(tbiKey)
i.iter.SeekGE(seekKey)
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
return
}
}
}
}
}
// advance advances the main iterator until it is referencing a key within
// (start_time, end_time]. It may return an error if it encounters an
// unexpected key such as a key with an inline value.
func (i *MVCCIncrementalIterator) advance() {
for {
if !i.valid {
return
}
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
return
}
i.maybeSkipKeys()
if !i.valid {
return
}
unsafeMetaKey := i.iter.UnsafeKey()
if unsafeMetaKey.IsValue() {
i.meta.Reset()
i.meta.Timestamp = hlc.LegacyTimestamp(unsafeMetaKey.Timestamp)
} else {
if i.err = protoutil.Unmarshal(i.iter.UnsafeValue(), &i.meta); i.err != nil {
i.valid = false
return
}
}
if i.meta.IsInline() {
// Inline values are only used in non-user data. They're not needed
// for backup, so they're not handled by this method. If one shows
// up, throw an error so it's obvious something is wrong.
i.valid = false
i.err = errors.Errorf("inline values are unsupported by MVCCIncrementalIterator: %s",
unsafeMetaKey.Key)
return
}
metaTimestamp := hlc.Timestamp(i.meta.Timestamp)
if i.meta.Txn != nil {
if i.startTime.Less(metaTimestamp) && metaTimestamp.LessEq(i.endTime) {
i.err = &roachpb.WriteIntentError{
Intents: []roachpb.Intent{
roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key),
},
}
i.valid = false
return
}
i.iter.Next()
continue
}
// Note that MVCC keys are sorted by key, then by _descending_ timestamp
// order with the exception of the metakey (timestamp 0) being sorted
// first. See mvcc.h for more information.
if i.endTime.Less(metaTimestamp) {
i.iter.Next()
continue
}
if metaTimestamp.LessEq(i.startTime) {
i.iter.NextKey()
continue
}
break
}
}
// Valid must be called after any call to Reset(), Next(), or similar methods.
// It returns (true, nil) if the iterator points to a valid key (it is undefined
// to call Key(), Value(), or similar methods unless Valid() has returned (true,
// nil)). It returns (false, nil) if the iterator has moved past the end of the
// valid range, or (false, err) if an error has occurred. Valid() will never
// return true with a non-nil error.
func (i *MVCCIncrementalIterator) Valid() (bool, error) {
return i.valid, i.err
}
// Key returns the current key.
func (i *MVCCIncrementalIterator) Key() MVCCKey {
return i.iter.Key()
}
// Value returns the current value as a byte slice.
func (i *MVCCIncrementalIterator) Value() []byte {
return i.iter.Value()
}
// UnsafeKey returns the same key as Key, but the memory is invalidated on the
// next call to {Next,Reset,Close}.
func (i *MVCCIncrementalIterator) UnsafeKey() MVCCKey {
return i.iter.UnsafeKey()
}
// UnsafeValue returns the same value as Value, but the memory is invalidated on
// the next call to {Next,Reset,Close}.
func (i *MVCCIncrementalIterator) UnsafeValue() []byte {
return i.iter.UnsafeValue()
}