Skip to content

Commit

Permalink
Merge #60460
Browse files Browse the repository at this point in the history
60460: storage: fix seek bug in pebbleMVCCScanner r=sumeerbhola a=sumeerbhola

The seek key in pebbleMVCCScanner.seekVersion was an
unsafe key that preceded a number of calls to Next.
This used to work because the byte slice in the
MVCCKey stayed the same as we were at a different
version of the key. With the intentInterleavingIter
this is not always true: the unsafe byte slice may
have been constructed using the intentIter, which may
no longer be in the same position.

This was responsible for failures in the cdc/bank
roachtest and occasional failures in
TestChangefeedNemeses/sinkless, when separated
intents were enabled.

Some testing with mangling of unsafe keys revealed
another bug, in rangefeed.registration, that is also
fixed here.

Informs #41720

Release note: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Feb 12, 2021
2 parents 5db2e58 + b0c70b6 commit 2ff1de1
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 35 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ func (r *registration) maybeRunCatchupScan() error {
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
// immediately after) the provisional key.
//
// Make a copy since should not pass an unsafe key from the iterator
// that provided it, when asking it to seek.
a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0)
catchupIter.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ go_test(
"mvcc_stats_test.go",
"mvcc_test.go",
"pebble_file_registry_test.go",
"pebble_mvcc_scanner_test.go",
"pebble_test.go",
"slice_test.go",
"sst_info_test.go",
Expand Down
88 changes: 53 additions & 35 deletions pkg/storage/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ type pebbleMVCCScanner struct {
// cur* variables store the "current" record we're pointing to. Updated in
// updateCurrent. Note that the timestamp can be clobbered in the case of
// adding an intent from the intent history but is otherwise meaningful.
curKey MVCCKey
curRawKey []byte
curValue []byte
results pebbleResults
intents pebble.Batch
curUnsafeKey MVCCKey
curRawKey []byte
curValue []byte
results pebbleResults
intents pebble.Batch
// mostRecentTS stores the largest timestamp observed that is equal to or
// above the scan timestamp. Only applicable if failOnMoreRecent is true. If
// set and no other error is hit, a WriteToOld error will be returned from
Expand Down Expand Up @@ -208,7 +208,7 @@ func (p *pebbleMVCCScanner) scan() (*roachpb.Span, error) {
// NB: this is equivalent to:
// append(roachpb.Key(nil), p.curKey.Key...).Next()
// but with half the allocations.
curKey := p.curKey.Key
curKey := p.curUnsafeKey.Key
curKeyCopy := make(roachpb.Key, len(curKey), len(curKey)+1)
copy(curKeyCopy, curKey)
resume = &roachpb.Span{
Expand All @@ -217,7 +217,7 @@ func (p *pebbleMVCCScanner) scan() (*roachpb.Span, error) {
}
} else {
resume = &roachpb.Span{
Key: append(roachpb.Key(nil), p.curKey.Key...),
Key: append(roachpb.Key(nil), p.curUnsafeKey.Key...),
EndKey: p.end,
}
}
Expand Down Expand Up @@ -304,24 +304,24 @@ func (p *pebbleMVCCScanner) uncertaintyError(ts hlc.Timestamp) bool {
// Emit a tuple and return true if we have reason to believe iteration can
// continue.
func (p *pebbleMVCCScanner) getAndAdvance() bool {
if !p.curKey.Timestamp.IsEmpty() {
if !p.curUnsafeKey.Timestamp.IsEmpty() {
// ts < read_ts
if p.curKey.Timestamp.Less(p.ts) {
if p.curUnsafeKey.Timestamp.Less(p.ts) {
// 1. Fast path: there is no intent and our read timestamp is newer
// than the most recent version's timestamp.
return p.addAndAdvance(p.curRawKey, p.curValue)
}

// ts == read_ts
if p.curKey.Timestamp.EqOrdering(p.ts) {
if p.curUnsafeKey.Timestamp.EqOrdering(p.ts) {
if p.failOnMoreRecent {
// 2. Our txn's read timestamp is equal to the most recent
// version's timestamp and the scanner has been configured to
// throw a write too old error on equal or more recent versions.
// Merge the current timestamp with the maximum timestamp we've
// seen so we know to return an error, but then keep scanning so
// that we can return the largest possible time.
p.mostRecentTS.Forward(p.curKey.Timestamp)
p.mostRecentTS.Forward(p.curUnsafeKey.Timestamp)
return p.advanceKey()
}

Expand All @@ -338,16 +338,16 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool {
// Merge the current timestamp with the maximum timestamp we've
// seen so we know to return an error, but then keep scanning so
// that we can return the largest possible time.
p.mostRecentTS.Forward(p.curKey.Timestamp)
p.mostRecentTS.Forward(p.curUnsafeKey.Timestamp)
return p.advanceKey()
}

if p.checkUncertainty {
// 5. Our txn's read timestamp is less than the max timestamp
// seen by the txn. We need to check for clock uncertainty
// errors.
if p.isUncertainValue(p.curKey.Timestamp) {
return p.uncertaintyError(p.curKey.Timestamp)
if p.isUncertainValue(p.curUnsafeKey.Timestamp) {
return p.uncertaintyError(p.curUnsafeKey.Timestamp)
}

// This value is not within the reader's uncertainty window, but
Expand Down Expand Up @@ -484,8 +484,8 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool {
// Note: this assumes that it is safe to corrupt curKey here because we're
// about to advance. If this proves to be a problem later, we can extend
// addAndAdvance to take an MVCCKey explicitly.
p.curKey.Timestamp = metaTS
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curKey)
p.curUnsafeKey.Timestamp = metaTS
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curUnsafeKey)
return p.addAndAdvance(p.keyBuf, value)
}
// 13. If no value in the intent history has a sequence number equal to
Expand Down Expand Up @@ -516,13 +516,13 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool {

// nextKey advances to the next user key.
func (p *pebbleMVCCScanner) nextKey() bool {
p.keyBuf = append(p.keyBuf[:0], p.curKey.Key...)
p.keyBuf = append(p.keyBuf[:0], p.curUnsafeKey.Key...)

for i := 0; i < p.itersBeforeSeek; i++ {
if !p.iterNext() {
return false
}
if !bytes.Equal(p.curKey.Key, p.keyBuf) {
if !bytes.Equal(p.curUnsafeKey.Key, p.keyBuf) {
p.incrementItersBeforeSeek()
return true
}
Expand All @@ -531,6 +531,10 @@ func (p *pebbleMVCCScanner) nextKey() bool {
p.decrementItersBeforeSeek()
// We're pointed at a different version of the same key. Fall back to
// seeking to the next key. We append a NUL to account for the "next-key".
// Note that we cannot rely on curUnsafeKey.Key being unchanged even though
// we are at a different version of the same key -- the underlying
// MVCCIterator is free to mutate the backing for p.curUnsafeKey.Key
// arbitrarily. Therefore we use p.keyBuf here which we have handy.
p.keyBuf = append(p.keyBuf, 0)
return p.iterSeek(MVCCKey{Key: p.keyBuf})
}
Expand Down Expand Up @@ -558,6 +562,11 @@ func (p *pebbleMVCCScanner) backwardLatestVersion(key []byte, i int) bool {
}
}

// We're still not pointed to the latest version of the key. Fall back to
// seeking to the latest version. Note that we cannot rely on key being
// unchanged even though we are at a different version of the same key --
// the underlying MVCCIterator is free to mutate the backing for key
// arbitrarily. Therefore we use p.keyBuf here which we have handy.
p.decrementItersBeforeSeek()
return p.iterSeek(MVCCKey{Key: p.keyBuf})
}
Expand Down Expand Up @@ -591,7 +600,7 @@ func (p *pebbleMVCCScanner) advanceKey() bool {
return false
}
if p.reverse {
return p.prevKey(p.curKey.Key)
return p.prevKey(p.curUnsafeKey.Key)
}
return p.nextKey()
}
Expand Down Expand Up @@ -652,21 +661,26 @@ func (p *pebbleMVCCScanner) addAndAdvance(rawKey []byte, val []byte) bool {
// equal to the specified timestamp, adds it to the result set, then moves onto
// the next user key.
func (p *pebbleMVCCScanner) seekVersion(seekTS hlc.Timestamp, uncertaintyCheck bool) bool {
seekKey := MVCCKey{Key: p.curKey.Key, Timestamp: seekTS}
seekKey := MVCCKey{Key: p.curUnsafeKey.Key, Timestamp: seekTS}
p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], seekKey)
origKey := p.keyBuf[:len(p.curKey.Key)]
origKey := p.keyBuf[:len(p.curUnsafeKey.Key)]
// We will need seekKey below, if the next's don't suffice. Even though the
// MVCCIterator will be at a different version of the same key, it is free
// to mutate the backing for p.curUnsafeKey.Key in an arbitrary manner. So
// assign to this copy, to make it stable.
seekKey.Key = origKey

for i := 0; i < p.itersBeforeSeek; i++ {
if !p.iterNext() {
return p.advanceKeyAtEnd()
}
if !bytes.Equal(p.curKey.Key, origKey) {
if !bytes.Equal(p.curUnsafeKey.Key, origKey) {
p.incrementItersBeforeSeek()
return p.advanceKeyAtNewKey(origKey)
}
if p.curKey.Timestamp.LessEq(seekTS) {
if p.curUnsafeKey.Timestamp.LessEq(seekTS) {
p.incrementItersBeforeSeek()
if !uncertaintyCheck || p.curKey.Timestamp.LessEq(p.ts) {
if !uncertaintyCheck || p.curUnsafeKey.Timestamp.LessEq(p.ts) {
return p.addAndAdvance(p.curRawKey, p.curValue)
}
// Iterate through uncertainty interval. Though we found a value in
Expand All @@ -677,8 +691,8 @@ func (p *pebbleMVCCScanner) seekVersion(seekTS hlc.Timestamp, uncertaintyCheck b
// are only uncertain if their timestamps are synthetic. Meanwhile,
// any value with a time in the range (ts, localUncertaintyLimit]
// is uncertain.
if p.isUncertainValue(p.curKey.Timestamp) {
return p.uncertaintyError(p.curKey.Timestamp)
if p.isUncertainValue(p.curUnsafeKey.Timestamp) {
return p.uncertaintyError(p.curUnsafeKey.Timestamp)
}
}
}
Expand All @@ -688,17 +702,17 @@ func (p *pebbleMVCCScanner) seekVersion(seekTS hlc.Timestamp, uncertaintyCheck b
return p.advanceKeyAtEnd()
}
for {
if !bytes.Equal(p.curKey.Key, origKey) {
if !bytes.Equal(p.curUnsafeKey.Key, origKey) {
return p.advanceKeyAtNewKey(origKey)
}
if !uncertaintyCheck || p.curKey.Timestamp.LessEq(p.ts) {
if !uncertaintyCheck || p.curUnsafeKey.Timestamp.LessEq(p.ts) {
return p.addAndAdvance(p.curRawKey, p.curValue)
}
// Iterate through uncertainty interval. See the comment above about why
// a value in this interval is not necessarily cause for an uncertainty
// error.
if p.isUncertainValue(p.curKey.Timestamp) {
return p.uncertaintyError(p.curKey.Timestamp)
if p.isUncertainValue(p.curUnsafeKey.Timestamp) {
return p.uncertaintyError(p.curUnsafeKey.Timestamp)
}
if !p.iterNext() {
return p.advanceKeyAtEnd()
Expand All @@ -715,7 +729,7 @@ func (p *pebbleMVCCScanner) updateCurrent() bool {
p.curRawKey = p.parent.UnsafeRawMVCCKey()

var err error
p.curKey, err = DecodeMVCCKey(p.curRawKey)
p.curUnsafeKey, err = DecodeMVCCKey(p.curRawKey)
if err != nil {
panic(err)
}
Expand All @@ -725,7 +739,11 @@ func (p *pebbleMVCCScanner) updateCurrent() bool {

func (p *pebbleMVCCScanner) iterValid() bool {
if valid, err := p.parent.Valid(); !valid {
p.err = err
// Defensive: unclear if p.err can already be non-nil here, but
// regardless, don't overwrite unless we have a non-nil err.
if err != nil {
p.err = err
}
return false
}
return true
Expand All @@ -747,13 +765,13 @@ func (p *pebbleMVCCScanner) iterSeekReverse(key MVCCKey) bool {
return false
}

if p.curKey.Timestamp.IsEmpty() {
if p.curUnsafeKey.Timestamp.IsEmpty() {
// We landed on an intent or inline value.
return true
}
// We landed on a versioned value, we need to back up to find the
// latest version.
return p.backwardLatestVersion(p.curKey.Key, 0)
return p.backwardLatestVersion(p.curUnsafeKey.Key, 0)
}

// iterNext advances to the next MVCC key.
Expand Down Expand Up @@ -806,7 +824,7 @@ func (p *pebbleMVCCScanner) iterPeekPrev() ([]byte, bool) {
p.curValue = p.savedBuf[len(p.curRawKey):]
// The raw key is always a prefix of the encoded MVCC key. Take advantage of this to
// sub-slice the raw key directly, instead of calling SplitMVCCKey.
p.curKey.Key = p.curRawKey[:len(p.curKey.Key)]
p.curUnsafeKey.Key = p.curRawKey[:len(p.curUnsafeKey.Key)]

// With the current iterator state saved we can move the iterator to the
// previous entry.
Expand Down
115 changes: 115 additions & 0 deletions pkg/storage/pebble_mvcc_scanner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func TestMVCCScanWithManyVersionsAndSeparatedIntents(t *testing.T) {
defer leaktest.AfterTest(t)()

eng := createTestPebbleEngine().(*Pebble)
defer eng.Close()
// Force separated intents for writing.
eng.wrappedIntentWriter, eng.useWrappedIntentWriter =
intentDemuxWriter{w: eng, enabledSeparatedIntents: true}, true

keys := []roachpb.Key{roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")}
// Many versions of each key.
for i := 1; i < 10; i++ {
for _, k := range keys {
require.NoError(t, eng.PutMVCC(MVCCKey{Key: k, Timestamp: hlc.Timestamp{WallTime: int64(i)}},
[]byte(fmt.Sprintf("%d", i))))
}
}
// Write a separated lock for the latest version of each key, to make it provisional.
uuid := uuid.FromUint128(uint128.FromInts(1, 1))
meta := enginepb.MVCCMetadata{
Txn: &enginepb.TxnMeta{
ID: uuid,
WriteTimestamp: hlc.Timestamp{WallTime: 9},
},
Timestamp: hlc.LegacyTimestamp{WallTime: 9},
Deleted: false,
KeyBytes: 2, // arbitrary
ValBytes: 2, // arbitrary
RawBytes: nil,
IntentHistory: nil,
MergeTimestamp: nil,
TxnDidNotUpdateMeta: nil,
}

metaBytes, err := protoutil.Marshal(&meta)
require.NoError(t, err)

for _, k := range keys {
_, err = eng.PutIntent(k, metaBytes, NoExistingIntent, true, uuid)
require.NoError(t, err)
}

reader := eng.NewReadOnly().(*pebbleReadOnly)
defer reader.Close()
iter := intentInterleavingReader{wrappableReader: reader}.NewMVCCIterator(
MVCCKeyAndIntentsIterKind,
IterOptions{LowerBound: keys[0], UpperBound: roachpb.Key("d")})
defer iter.Close()

// Look for older versions that come after the scanner has exhausted its
// next optimization and does a seek. The seek key had a bug that caused the
// scanner to skip keys that it desired to see.
ts := hlc.Timestamp{WallTime: 2}
mvccScanner := pebbleMVCCScanner{
parent: iter,
reverse: false,
start: keys[0],
end: roachpb.Key("d"),
ts: ts,
inconsistent: false,
tombstones: false,
failOnMoreRecent: false,
}
mvccScanner.init(nil /* txn */, hlc.Timestamp{})
_, err = mvccScanner.scan()
require.NoError(t, err)

kvData := mvccScanner.results.finish()
numKeys := mvccScanner.results.count
require.Equal(t, 3, int(numKeys))
type kv struct {
k MVCCKey
v []byte
}
kvs := make([]kv, numKeys)
var i int
require.NoError(t, MVCCScanDecodeKeyValues(kvData, func(k MVCCKey, v []byte) error {
kvs[i].k = k
kvs[i].v = v
i++
return nil
}))
expectedKVs := make([]kv, len(keys))
for i := range expectedKVs {
expectedKVs[i].k = MVCCKey{Key: keys[i], Timestamp: hlc.Timestamp{WallTime: 2}}
expectedKVs[i].v = []byte("2")
}
require.Equal(t, expectedKVs, kvs)
}

0 comments on commit 2ff1de1

Please sign in to comment.