Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fix seek bug in pebbleMVCCScanner #60460

Merged
merged 1 commit into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ func (r *registration) maybeRunCatchupScan() error {
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
// immediately after) the provisional key.
//
// Make a copy since should not pass an unsafe key from the iterator
// that provided it, when asking it to seek.
a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0)
catchupIter.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ go_test(
"mvcc_stats_test.go",
"mvcc_test.go",
"pebble_file_registry_test.go",
"pebble_mvcc_scanner_test.go",
"pebble_test.go",
"slice_test.go",
"sst_info_test.go",
Expand Down
88 changes: 53 additions & 35 deletions pkg/storage/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ type pebbleMVCCScanner struct {
// cur* variables store the "current" record we're pointing to. Updated in
// updateCurrent. Note that the timestamp can be clobbered in the case of
// adding an intent from the intent history but is otherwise meaningful.
curKey MVCCKey
curRawKey []byte
curValue []byte
results pebbleResults
intents pebble.Batch
curUnsafeKey MVCCKey
curRawKey []byte
curValue []byte
results pebbleResults
intents pebble.Batch
// mostRecentTS stores the largest timestamp observed that is equal to or
// above the scan timestamp. Only applicable if failOnMoreRecent is true. If
// set and no other error is hit, a WriteToOld error will be returned from
Expand Down Expand Up @@ -208,7 +208,7 @@ func (p *pebbleMVCCScanner) scan() (*roachpb.Span, error) {
// NB: this is equivalent to:
// append(roachpb.Key(nil), p.curKey.Key...).Next()
// but with half the allocations.
curKey := p.curKey.Key
curKey := p.curUnsafeKey.Key
curKeyCopy := make(roachpb.Key, len(curKey), len(curKey)+1)
copy(curKeyCopy, curKey)
resume = &roachpb.Span{
Expand All @@ -217,7 +217,7 @@ func (p *pebbleMVCCScanner) scan() (*roachpb.Span, error) {
}
} else {
resume = &roachpb.Span{
Key: append(roachpb.Key(nil), p.curKey.Key...),
Key: append(roachpb.Key(nil), p.curUnsafeKey.Key...),
EndKey: p.end,
}
}
Expand Down Expand Up @@ -304,24 +304,24 @@ func (p *pebbleMVCCScanner) uncertaintyError(ts hlc.Timestamp) bool {
// Emit a tuple and return true if we have reason to believe iteration can
// continue.
func (p *pebbleMVCCScanner) getAndAdvance() bool {
if !p.curKey.Timestamp.IsEmpty() {
if !p.curUnsafeKey.Timestamp.IsEmpty() {
// ts < read_ts
if p.curKey.Timestamp.Less(p.ts) {
if p.curUnsafeKey.Timestamp.Less(p.ts) {
// 1. Fast path: there is no intent and our read timestamp is newer
// than the most recent version's timestamp.
return p.addAndAdvance(p.curRawKey, p.curValue)
}

// ts == read_ts
if p.curKey.Timestamp.EqOrdering(p.ts) {
if p.curUnsafeKey.Timestamp.EqOrdering(p.ts) {
if p.failOnMoreRecent {
// 2. Our txn's read timestamp is equal to the most recent
// version's timestamp and the scanner has been configured to
// throw a write too old error on equal or more recent versions.
// Merge the current timestamp with the maximum timestamp we've
// seen so we know to return an error, but then keep scanning so
// that we can return the largest possible time.
p.mostRecentTS.Forward(p.curKey.Timestamp)
p.mostRecentTS.Forward(p.curUnsafeKey.Timestamp)
return p.advanceKey()
}

Expand All @@ -338,16 +338,16 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool {
// Merge the current timestamp with the maximum timestamp we've
// seen so we know to return an error, but then keep scanning so
// that we can return the largest possible time.
p.mostRecentTS.Forward(p.curKey.Timestamp)
p.mostRecentTS.Forward(p.curUnsafeKey.Timestamp)
return p.advanceKey()
}

if p.checkUncertainty {
// 5. Our txn's read timestamp is less than the max timestamp
// seen by the txn. We need to check for clock uncertainty
// errors.
if p.isUncertainValue(p.curKey.Timestamp) {
return p.uncertaintyError(p.curKey.Timestamp)
if p.isUncertainValue(p.curUnsafeKey.Timestamp) {
return p.uncertaintyError(p.curUnsafeKey.Timestamp)
}

// This value is not within the reader's uncertainty window, but
Expand Down Expand Up @@ -484,8 +484,8 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool {
// Note: this assumes that it is safe to corrupt curKey here because we're
// about to advance. If this proves to be a problem later, we can extend
// addAndAdvance to take an MVCCKey explicitly.
p.curKey.Timestamp = metaTS
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curKey)
p.curUnsafeKey.Timestamp = metaTS
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curUnsafeKey)
return p.addAndAdvance(p.keyBuf, value)
}
// 13. If no value in the intent history has a sequence number equal to
Expand Down Expand Up @@ -516,13 +516,13 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool {

// nextKey advances to the next user key.
func (p *pebbleMVCCScanner) nextKey() bool {
p.keyBuf = append(p.keyBuf[:0], p.curKey.Key...)
p.keyBuf = append(p.keyBuf[:0], p.curUnsafeKey.Key...)

for i := 0; i < p.itersBeforeSeek; i++ {
if !p.iterNext() {
return false
}
if !bytes.Equal(p.curKey.Key, p.keyBuf) {
if !bytes.Equal(p.curUnsafeKey.Key, p.keyBuf) {
p.incrementItersBeforeSeek()
return true
}
Expand All @@ -531,6 +531,10 @@ func (p *pebbleMVCCScanner) nextKey() bool {
p.decrementItersBeforeSeek()
// We're pointed at a different version of the same key. Fall back to
// seeking to the next key. We append a NUL to account for the "next-key".
// Note that we cannot rely on curUnsafeKey.Key being unchanged even though
// we are at a different version of the same key -- the underlying
// MVCCIterator is free to mutate the backing for p.curUnsafeKey.Key
// arbitrarily. Therefore we use p.keyBuf here which we have handy.
p.keyBuf = append(p.keyBuf, 0)
return p.iterSeek(MVCCKey{Key: p.keyBuf})
}
Expand Down Expand Up @@ -558,6 +562,11 @@ func (p *pebbleMVCCScanner) backwardLatestVersion(key []byte, i int) bool {
}
}

// We're still not pointed to the latest version of the key. Fall back to
// seeking to the latest version. Note that we cannot rely on key being
// unchanged even though we are at a different version of the same key --
// the underlying MVCCIterator is free to mutate the backing for key
// arbitrarily. Therefore we use p.keyBuf here which we have handy.
p.decrementItersBeforeSeek()
return p.iterSeek(MVCCKey{Key: p.keyBuf})
}
Expand Down Expand Up @@ -591,7 +600,7 @@ func (p *pebbleMVCCScanner) advanceKey() bool {
return false
}
if p.reverse {
return p.prevKey(p.curKey.Key)
return p.prevKey(p.curUnsafeKey.Key)
}
return p.nextKey()
}
Expand Down Expand Up @@ -652,21 +661,26 @@ func (p *pebbleMVCCScanner) addAndAdvance(rawKey []byte, val []byte) bool {
// equal to the specified timestamp, adds it to the result set, then moves onto
// the next user key.
func (p *pebbleMVCCScanner) seekVersion(seekTS hlc.Timestamp, uncertaintyCheck bool) bool {
seekKey := MVCCKey{Key: p.curKey.Key, Timestamp: seekTS}
seekKey := MVCCKey{Key: p.curUnsafeKey.Key, Timestamp: seekTS}
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], seekKey)
origKey := p.keyBuf[:len(p.curKey.Key)]
origKey := p.keyBuf[:len(p.curUnsafeKey.Key)]
// We will need seekKey below, if the next's don't suffice. Even though the
// MVCCIterator will be at a different version of the same key, it is free
// to mutate the backing for p.curUnsafeKey.Key in an arbitrary manner. So
// assign to this copy, to make it stable.
seekKey.Key = origKey

for i := 0; i < p.itersBeforeSeek; i++ {
if !p.iterNext() {
return p.advanceKeyAtEnd()
}
if !bytes.Equal(p.curKey.Key, origKey) {
if !bytes.Equal(p.curUnsafeKey.Key, origKey) {
p.incrementItersBeforeSeek()
return p.advanceKeyAtNewKey(origKey)
}
if p.curKey.Timestamp.LessEq(seekTS) {
if p.curUnsafeKey.Timestamp.LessEq(seekTS) {
p.incrementItersBeforeSeek()
if !uncertaintyCheck || p.curKey.Timestamp.LessEq(p.ts) {
if !uncertaintyCheck || p.curUnsafeKey.Timestamp.LessEq(p.ts) {
return p.addAndAdvance(p.curRawKey, p.curValue)
}
// Iterate through uncertainty interval. Though we found a value in
Expand All @@ -677,8 +691,8 @@ func (p *pebbleMVCCScanner) seekVersion(seekTS hlc.Timestamp, uncertaintyCheck b
// are only uncertain if their timestamps are synthetic. Meanwhile,
// any value with a time in the range (ts, localUncertaintyLimit]
// is uncertain.
if p.isUncertainValue(p.curKey.Timestamp) {
return p.uncertaintyError(p.curKey.Timestamp)
if p.isUncertainValue(p.curUnsafeKey.Timestamp) {
return p.uncertaintyError(p.curUnsafeKey.Timestamp)
}
}
}
Expand All @@ -688,17 +702,17 @@ func (p *pebbleMVCCScanner) seekVersion(seekTS hlc.Timestamp, uncertaintyCheck b
return p.advanceKeyAtEnd()
}
for {
if !bytes.Equal(p.curKey.Key, origKey) {
if !bytes.Equal(p.curUnsafeKey.Key, origKey) {
return p.advanceKeyAtNewKey(origKey)
}
if !uncertaintyCheck || p.curKey.Timestamp.LessEq(p.ts) {
if !uncertaintyCheck || p.curUnsafeKey.Timestamp.LessEq(p.ts) {
return p.addAndAdvance(p.curRawKey, p.curValue)
}
// Iterate through uncertainty interval. See the comment above about why
// a value in this interval is not necessarily cause for an uncertainty
// error.
if p.isUncertainValue(p.curKey.Timestamp) {
return p.uncertaintyError(p.curKey.Timestamp)
if p.isUncertainValue(p.curUnsafeKey.Timestamp) {
return p.uncertaintyError(p.curUnsafeKey.Timestamp)
}
if !p.iterNext() {
return p.advanceKeyAtEnd()
Expand All @@ -715,7 +729,7 @@ func (p *pebbleMVCCScanner) updateCurrent() bool {
p.curRawKey = p.parent.UnsafeRawMVCCKey()

var err error
p.curKey, err = DecodeMVCCKey(p.curRawKey)
p.curUnsafeKey, err = DecodeMVCCKey(p.curRawKey)
if err != nil {
panic(err)
}
Expand All @@ -725,7 +739,11 @@ func (p *pebbleMVCCScanner) updateCurrent() bool {

func (p *pebbleMVCCScanner) iterValid() bool {
if valid, err := p.parent.Valid(); !valid {
p.err = err
// Defensive: unclear if p.err can already be non-nil here, but
// regardless, don't overwrite unless we have a non-nil err.
if err != nil {
p.err = err
}
return false
}
return true
Expand All @@ -747,13 +765,13 @@ func (p *pebbleMVCCScanner) iterSeekReverse(key MVCCKey) bool {
return false
}

if p.curKey.Timestamp.IsEmpty() {
if p.curUnsafeKey.Timestamp.IsEmpty() {
// We landed on an intent or inline value.
return true
}
// We landed on a versioned value, we need to back up to find the
// latest version.
return p.backwardLatestVersion(p.curKey.Key, 0)
return p.backwardLatestVersion(p.curUnsafeKey.Key, 0)
}

// iterNext advances to the next MVCC key.
Expand Down Expand Up @@ -806,7 +824,7 @@ func (p *pebbleMVCCScanner) iterPeekPrev() ([]byte, bool) {
p.curValue = p.savedBuf[len(p.curRawKey):]
// The raw key is always a prefix of the encoded MVCC key. Take advantage of this to
// sub-slice the raw key directly, instead of calling SplitMVCCKey.
p.curKey.Key = p.curRawKey[:len(p.curKey.Key)]
p.curUnsafeKey.Key = p.curRawKey[:len(p.curUnsafeKey.Key)]

// With the current iterator state saved we can move the iterator to the
// previous entry.
Expand Down
115 changes: 115 additions & 0 deletions pkg/storage/pebble_mvcc_scanner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func TestMVCCScanWithManyVersionsAndSeparatedIntents(t *testing.T) {
defer leaktest.AfterTest(t)()

eng := createTestPebbleEngine().(*Pebble)
defer eng.Close()
// Force separated intents for writing.
eng.wrappedIntentWriter, eng.useWrappedIntentWriter =
intentDemuxWriter{w: eng, enabledSeparatedIntents: true}, true

keys := []roachpb.Key{roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")}
// Many versions of each key.
for i := 1; i < 10; i++ {
for _, k := range keys {
require.NoError(t, eng.PutMVCC(MVCCKey{Key: k, Timestamp: hlc.Timestamp{WallTime: int64(i)}},
[]byte(fmt.Sprintf("%d", i))))
}
}
// Write a separated lock for the latest version of each key, to make it provisional.
uuid := uuid.FromUint128(uint128.FromInts(1, 1))
meta := enginepb.MVCCMetadata{
Txn: &enginepb.TxnMeta{
ID: uuid,
WriteTimestamp: hlc.Timestamp{WallTime: 9},
},
Timestamp: hlc.LegacyTimestamp{WallTime: 9},
Deleted: false,
KeyBytes: 2, // arbitrary
ValBytes: 2, // arbitrary
RawBytes: nil,
IntentHistory: nil,
MergeTimestamp: nil,
TxnDidNotUpdateMeta: nil,
}

metaBytes, err := protoutil.Marshal(&meta)
require.NoError(t, err)

for _, k := range keys {
_, err = eng.PutIntent(k, metaBytes, NoExistingIntent, true, uuid)
require.NoError(t, err)
}

reader := eng.NewReadOnly().(*pebbleReadOnly)
defer reader.Close()
iter := intentInterleavingReader{wrappableReader: reader}.NewMVCCIterator(
MVCCKeyAndIntentsIterKind,
IterOptions{LowerBound: keys[0], UpperBound: roachpb.Key("d")})
defer iter.Close()

// Look for older versions that come after the scanner has exhausted its
// next optimization and does a seek. The seek key had a bug that caused the
// scanner to skip keys that it desired to see.
ts := hlc.Timestamp{WallTime: 2}
mvccScanner := pebbleMVCCScanner{
parent: iter,
reverse: false,
start: keys[0],
end: roachpb.Key("d"),
ts: ts,
inconsistent: false,
tombstones: false,
failOnMoreRecent: false,
}
mvccScanner.init(nil /* txn */, hlc.Timestamp{})
_, err = mvccScanner.scan()
require.NoError(t, err)

kvData := mvccScanner.results.finish()
numKeys := mvccScanner.results.count
require.Equal(t, 3, int(numKeys))
type kv struct {
k MVCCKey
v []byte
}
kvs := make([]kv, numKeys)
var i int
require.NoError(t, MVCCScanDecodeKeyValues(kvData, func(k MVCCKey, v []byte) error {
kvs[i].k = k
kvs[i].v = v
i++
return nil
}))
expectedKVs := make([]kv, len(keys))
for i := range expectedKVs {
expectedKVs[i].k = MVCCKey{Key: keys[i], Timestamp: hlc.Timestamp{WallTime: 2}}
expectedKVs[i].v = []byte("2")
}
require.Equal(t, expectedKVs, kvs)
}