-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathpebble_iterator.go
337 lines (293 loc) · 9.47 KB
/
pebble_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package engine
import (
"bytes"
"math"
"sync"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/pebble"
)
// pebbleIterator is a wrapper around a pebble.Iterator that implements the
// Iterator interface.
type pebbleIterator struct {
// Underlying iterator for the DB.
iter *pebble.Iterator
options pebble.IterOptions
// Reusable buffer for MVCC key encoding.
keyBuf []byte
// Buffers for copying iterator bounds to. Note that the underlying memory
// is not GCed upon Close(), to reduce the number of overall allocations.
lowerBoundBuf []byte
upperBoundBuf []byte
// Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips
// SSTables based on MVCC key when true.
prefix bool
}
var _ Iterator = &pebbleIterator{}
var pebbleIterPool = sync.Pool{
New: func() interface{} {
return &pebbleIterator{}
},
}
// Instantiates a new Pebble iterator, or gets one from the pool.
func newPebbleIterator(handle pebble.Reader, opts IterOptions) Iterator {
iter := pebbleIterPool.Get().(*pebbleIterator)
iter.init(handle, opts)
return iter
}
// init resets this pebbleIterator for use with the specified arguments. The
// current instance could either be a cached iterator (eg. in pebbleBatch), or
// a newly-instantiated one through newPebbleIterator.
func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) {
*p = pebbleIterator{
lowerBoundBuf: p.lowerBoundBuf,
upperBoundBuf: p.upperBoundBuf,
prefix: opts.Prefix,
}
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}
if opts.LowerBound != nil {
// This is the same as
// p.options.LowerBound = EncodeKeyToBuf(p.lowerBoundBuf[:0], MVCCKey{Key: opts.LowerBound}) .
// Since we are encoding zero-timestamp MVCC Keys anyway, we can just append
// the NUL byte instead of calling EncodeKey which will do the same thing.
p.lowerBoundBuf = append(p.lowerBoundBuf[:0], opts.LowerBound...)
p.lowerBoundBuf = append(p.lowerBoundBuf, 0x00)
p.options.LowerBound = p.lowerBoundBuf
}
if opts.UpperBound != nil {
// Same as above.
p.upperBoundBuf = append(p.upperBoundBuf[:0], opts.UpperBound...)
p.upperBoundBuf = append(p.upperBoundBuf, 0x00)
p.options.UpperBound = p.upperBoundBuf
}
p.iter = handle.NewIter(&p.options)
if p.iter == nil {
panic("unable to create iterator")
}
}
// Close implements the Iterator interface.
func (p *pebbleIterator) Close() {
if p.iter != nil {
err := p.iter.Close()
if err != nil {
panic(err)
}
p.iter = nil
}
// Reset all fields except for the lower/upper bound buffers. Holding onto
// their underlying memory is more efficient to prevent extra allocations
// down the line.
*p = pebbleIterator{
lowerBoundBuf: p.lowerBoundBuf,
upperBoundBuf: p.upperBoundBuf,
}
pebbleIterPool.Put(p)
}
// Seek implements the Iterator interface.
func (p *pebbleIterator) Seek(key MVCCKey) {
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], key)
if p.prefix {
p.iter.SeekPrefixGE(p.keyBuf)
} else {
p.iter.SeekGE(p.keyBuf)
}
}
// Valid implements the Iterator interface.
func (p *pebbleIterator) Valid() (bool, error) {
return p.iter.Valid(), p.iter.Error()
}
// Next implements the Iterator interface.
func (p *pebbleIterator) Next() {
p.iter.Next()
}
// NextKey implements the Iterator interface.
func (p *pebbleIterator) NextKey() {
if valid, err := p.Valid(); err != nil || !valid {
return
}
p.keyBuf = append(p.keyBuf[:0], p.UnsafeKey().Key...)
for p.iter.Next() {
if !bytes.Equal(p.keyBuf, p.UnsafeKey().Key) {
break
}
}
}
// UnsafeKey implements the Iterator interface.
func (p *pebbleIterator) UnsafeKey() MVCCKey {
if valid, err := p.Valid(); err != nil || !valid {
return MVCCKey{}
}
mvccKey, err := DecodeMVCCKey(p.iter.Key())
if err != nil {
return MVCCKey{}
}
return mvccKey
}
// unsafeRawKey returns the raw key from the underlying pebble.Iterator.
func (p *pebbleIterator) unsafeRawKey() []byte {
if valid, err := p.Valid(); err != nil || !valid {
return nil
}
return p.iter.Key()
}
// UnsafeValue implements the Iterator interface.
func (p *pebbleIterator) UnsafeValue() []byte {
if valid, err := p.Valid(); err != nil || !valid {
return nil
}
return p.iter.Value()
}
// SeekReverse implements the Iterator interface.
func (p *pebbleIterator) SeekReverse(key MVCCKey) {
// Do a SeekGE, not a SeekLT. This is because SeekReverse seeks to the
// greatest key that's less than or equal to the specified key.
p.Seek(key)
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], key)
// The new key could either be greater or equal to the supplied key.
// Backtrack one step if it is greater.
comp := MVCCKeyCompare(p.keyBuf, p.iter.Key())
if comp < 0 && p.iter.Valid() {
p.Prev()
}
}
// Prev implements the Iterator interface.
func (p *pebbleIterator) Prev() {
p.iter.Prev()
}
// PrevKey implements the Iterator interface.
func (p *pebbleIterator) PrevKey() {
if valid, err := p.Valid(); err != nil || !valid {
return
}
curKey := p.Key()
for p.iter.Prev() {
if !bytes.Equal(curKey.Key, p.UnsafeKey().Key) {
break
}
}
}
// Key implements the Iterator interface.
func (p *pebbleIterator) Key() MVCCKey {
key := p.UnsafeKey()
keyCopy := make([]byte, len(key.Key))
copy(keyCopy, key.Key)
key.Key = keyCopy
return key
}
// Value implements the Iterator interface.
func (p *pebbleIterator) Value() []byte {
value := p.UnsafeValue()
valueCopy := make([]byte, len(value))
copy(valueCopy, value)
return valueCopy
}
// ValueProto implements the Iterator interface.
func (p *pebbleIterator) ValueProto(msg protoutil.Message) error {
value := p.UnsafeValue()
return protoutil.Unmarshal(value, msg)
}
// ComputeStats implements the Iterator interface.
func (p *pebbleIterator) ComputeStats(
start, end MVCCKey, nowNanos int64,
) (enginepb.MVCCStats, error) {
return ComputeStatsGo(p, start, end, nowNanos)
}
// Go-only version of IsValidSplitKey. Checks if the specified key is in
// NoSplitSpans.
func isValidSplitKey(key roachpb.Key) bool {
for _, noSplitSpan := range keys.NoSplitSpans {
if noSplitSpan.ContainsKey(key) {
return false
}
}
return true
}
// FindSplitKey implements the Iterator interface.
func (p *pebbleIterator) FindSplitKey(
start, end, minSplitKey MVCCKey, targetSize int64,
) (MVCCKey, error) {
const timestampLen = int64(12)
p.Seek(start)
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], end)
minSplitKeyBuf := EncodeKey(minSplitKey)
prevKey := make([]byte, 0)
sizeSoFar := int64(0)
bestDiff := int64(math.MaxInt64)
bestSplitKey := MVCCKey{}
for ; p.iter.Valid() && MVCCKeyCompare(p.iter.Key(), p.keyBuf) < 0; p.iter.Next() {
mvccKey, err := DecodeMVCCKey(p.iter.Key())
if err != nil {
return MVCCKey{}, err
}
diff := targetSize - sizeSoFar
if diff < 0 {
diff = -diff
}
if diff < bestDiff && MVCCKeyCompare(p.iter.Key(), minSplitKeyBuf) >= 0 && isValidSplitKey(mvccKey.Key) {
// We are going to have to copy bestSplitKey, since by the time we find
// out it's the actual best split key, the underlying slice would have
// changed (due to the iter.Next() call).
//
// TODO(itsbilal): Instead of copying into bestSplitKey each time,
// consider just calling iter.Prev() at the end to get to the same key.
bestDiff = diff
bestSplitKey.Key = append(bestSplitKey.Key[:0], mvccKey.Key...)
bestSplitKey.Timestamp = mvccKey.Timestamp
}
if diff > bestDiff && bestSplitKey.Key != nil {
break
}
sizeSoFar += int64(len(p.iter.Value()))
if mvccKey.IsValue() && bytes.Equal(prevKey, mvccKey.Key) {
// We only advanced timestamps, but not new mvcc keys.
sizeSoFar += timestampLen
} else {
sizeSoFar += int64(len(mvccKey.Key) + 1)
if mvccKey.IsValue() {
sizeSoFar += timestampLen
}
}
prevKey = append(prevKey[:0], mvccKey.Key...)
}
return bestSplitKey, nil
}
// MVCCGet implements the Iterator interface.
func (p *pebbleIterator) MVCCGet(
key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (value *roachpb.Value, intent *roachpb.Intent, err error) {
// TODO(itsbilal): Implement in a separate PR. See #39674.
panic("unimplemented for now, see #39674")
}
// MVCCScan implements the Iterator interface.
func (p *pebbleIterator) MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData []byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) {
// TODO(itsbilal): Implement in a separate PR. See #39674.
panic("unimplemented for now, see #39674")
}
// SetUpperBound implements the Iterator interface.
func (p *pebbleIterator) SetUpperBound(upperBound roachpb.Key) {
p.upperBoundBuf = append(p.upperBoundBuf[:0], upperBound...)
p.upperBoundBuf = append(p.upperBoundBuf, 0x00)
p.options.UpperBound = p.upperBoundBuf
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
}
// Stats implements the Iterator interface.
func (p *pebbleIterator) Stats() IteratorStats {
// TODO(itsbilal): Implement this.
panic("implement me")
}