Skip to content

Commit

Permalink
engineccl/mvcc: work around time-bound iterator bug
Browse files Browse the repository at this point in the history
For reasons described in #28358, a time-bound iterator will sometimes
see an unresolved intent where there is none. A normal iterator doesn't
have this problem, so we work around it in MVCCIncrementalIterator by
double checking all non-value keys. If the normal iterator has a
different value for the key, it's used instead. If the normal iterator
doesn't have that key, it's skipped.

This fixes both changefeeds and incremental backup.

Closes #32104
Closes #32799

Release note (bug fix): `CHANGEFEED`s and incremental `BACKUP`s no
longer indefinitely hang under an infrequent condition.
  • Loading branch information
danhhz committed Dec 11, 2018
1 parent 59601df commit 2de146d
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 2 deletions.
54 changes: 53 additions & 1 deletion pkg/ccl/storageccl/engineccl/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package engineccl

import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -49,6 +51,11 @@ type MVCCIncrementalIterator struct {

iter engine.Iterator

// fields used for a workaround for a bug in the time-bound iterator
upperBound roachpb.Key
e engine.Reader
sanityIter engine.Iterator

startTime hlc.Timestamp
endTime hlc.Timestamp
err error
Expand All @@ -72,6 +79,8 @@ type IterOptions struct {
// specified engine and options.
func NewMVCCIncrementalIterator(e engine.Reader, opts IterOptions) *MVCCIncrementalIterator {
return &MVCCIncrementalIterator{
e: e,
upperBound: opts.UpperBound,
iter: e.NewIterator(engine.IterOptions{
// The call to startTime.Next() converts our exclusive start bound into
// the inclusive start bound that MinTimestampHint expects. This is
Expand Down Expand Up @@ -99,6 +108,9 @@ func (i *MVCCIncrementalIterator) Seek(startKey engine.MVCCKey) {
// Close frees up resources held by the iterator.
func (i *MVCCIncrementalIterator) Close() {
i.iter.Close()
if i.sanityIter != nil {
i.sanityIter.Close()
}
}

// Next advances the iterator to the next key/value in the iteration. After this
Expand Down Expand Up @@ -134,7 +146,25 @@ func (i *MVCCIncrementalIterator) advance() {
i.meta.Reset()
i.meta.Timestamp = hlc.LegacyTimestamp(unsafeMetaKey.Timestamp)
} else {
if i.err = i.iter.ValueProto(&i.meta); i.err != nil {
// HACK(dan): Work around a known bug in the time-bound iterator.
// For reasons described in #28358, a time-bound iterator will
// sometimes see an unresolved intent where there is none. A normal
// iterator doesn't have this problem, so we work around it by
// double checking all non-value keys. If sanityCheckMetadataKey
// returns false, then the intent was a phantom and we ignore it.
// NB: this could return a newer intent than the one the time-bound
// iterator saw; this is handled.
unsafeValueBytes, ok, err := i.sanityCheckMetadataKey()
if err != nil {
i.err = err
i.valid = false
return
} else if !ok {
i.iter.Next()
continue
}

if i.err = protoutil.Unmarshal(unsafeValueBytes, &i.meta); i.err != nil {
i.valid = false
return
}
Expand Down Expand Up @@ -179,6 +209,28 @@ func (i *MVCCIncrementalIterator) advance() {
}
}

// sanityCheckMetadataKey looks up the current `i.iter` key using a normal,
// non-time-bound iterator and returns the value if the normal iterator also
// sees that exact key. Otherwise, it returns false. It's used in the workaround
// in `advance` for a time-bound iterator bug.
func (i *MVCCIncrementalIterator) sanityCheckMetadataKey() ([]byte, bool, error) {
if i.sanityIter == nil {
// The common case is that we'll won't need the sanityIter for a given
// MVCCIncrementalIterator, so create it lazily.
i.sanityIter = i.e.NewIterator(engine.IterOptions{UpperBound: i.upperBound})
}
unsafeKey := i.iter.UnsafeKey()
i.sanityIter.Seek(unsafeKey)
if ok, err := i.sanityIter.Valid(); err != nil {
return nil, false, err
} else if !ok {
return nil, false, nil
} else if !i.sanityIter.UnsafeKey().Equal(unsafeKey) {
return nil, false, nil
}
return i.sanityIter.UnsafeValue(), true, nil
}

// Valid must be called after any call to Reset(), Next(), or similar methods.
// It returns (true, nil) if the iterator points to a valid key (it is undefined
// to call Key(), Value(), or similar methods unless Valid() has returned (true,
Expand Down
138 changes: 137 additions & 1 deletion pkg/ccl/storageccl/engineccl/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func iterateExpectErr(
Expand All @@ -34,6 +35,7 @@ func iterateExpectErr(
errString string,
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
iter := NewMVCCIncrementalIterator(e, IterOptions{
StartTime: startTime,
EndTime: endTime,
Expand All @@ -49,7 +51,6 @@ func iterateExpectErr(
if _, err := iter.Valid(); !testutils.IsError(err, errString) {
t.Fatalf("expected error %q but got %v", errString, err)
}

}
}

Expand All @@ -61,6 +62,7 @@ func assertEqualKVs(
expected []engine.MVCCKeyValue,
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
iter := NewMVCCIncrementalIterator(e, IterOptions{
StartTime: startTime,
EndTime: endTime,
Expand Down Expand Up @@ -456,3 +458,137 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
t.Fatalf("expected write intent error, but found success")
}
}

// TestMVCCIncrementalIteratorIntentDeletion checks a workaround in
// MVCCIncrementalIterator for a bug in time-bound iterators, where an intent
// has been deleted, but the time-bound iterator doesn't see the deletion.
func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) {
defer leaktest.AfterTest(t)()

txn := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.Transaction {
return &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Key: key,
ID: uuid.MakeV4(),
Epoch: 1,
Timestamp: ts,
},
}
}
intent := func(txn *roachpb.Transaction) roachpb.Intent {
return roachpb.Intent{
Span: roachpb.Span{Key: txn.Key},
Txn: txn.TxnMeta,
Status: roachpb.COMMITTED,
}
}
slurpKVs := func(
e engine.Reader, prefix roachpb.Key, startTime, endTime hlc.Timestamp,
) ([]engine.MVCCKeyValue, error) {
endKey := prefix.PrefixEnd()
iter := NewMVCCIncrementalIterator(e, IterOptions{
StartTime: startTime,
EndTime: endTime,
UpperBound: endKey,
})
defer iter.Close()
var kvs []engine.MVCCKeyValue
for iter.Seek(engine.MakeMVCCMetadataKey(prefix)); ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 {
break
}
kvs = append(kvs, engine.MVCCKeyValue{Key: iter.Key(), Value: iter.Value()})
}
return kvs, nil
}

ctx := context.Background()
kA := roachpb.Key("kA")
vA1 := roachpb.MakeValueFromString("vA1")
vA2 := roachpb.MakeValueFromString("vA2")
vA3 := roachpb.MakeValueFromString("vA3")
kB := roachpb.Key("kB")
vB1 := roachpb.MakeValueFromString("vB1")
kC := roachpb.Key("kC")
vC1 := roachpb.MakeValueFromString("vC1")
ts0 := hlc.Timestamp{WallTime: 0}
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
ts3 := hlc.Timestamp{WallTime: 3}
txnA1 := txn(kA, ts1)
txnA3 := txn(kA, ts3)
txnB1 := txn(kB, ts1)
txnC1 := txn(kC, ts1)

db := engine.NewInMem(roachpb.Attributes{}, 10<<20)
defer db.Close()

// Set up two sstables very specifically:
//
// sst1 (time-bound metadata ts1->ts1)
// kA -> (intent)
// kA:1 -> vA1
// kB -> (intent)
// kB:1 -> vB1
// kC -> (intent)
// kC:1 -> vC1
//
// sst2 (time-bound metadata ts2->ts3) the intent deletions are for the
// intents at ts1, but there's no way know that when constructing the
// metadata (hence the time-bound iterator bug)
// kA -> (intent) [NB this overwrites the intent deletion]
// kA:3 -> vA3
// kA _> vA2
// kB -> (intent deletion)
require.NoError(t, engine.MVCCPut(ctx, db, nil, kA, txnA1.Timestamp, vA1, txnA1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kB, txnB1.Timestamp, vB1, txnB1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kC, txnC1.Timestamp, vC1, txnC1))
require.NoError(t, db.Flush())
require.NoError(t, db.Compact())
require.NoError(t, engine.MVCCResolveWriteIntent(ctx, db, nil, intent(txnA1)))
require.NoError(t, engine.MVCCResolveWriteIntent(ctx, db, nil, intent(txnB1)))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kA, ts2, vA2, nil))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kA, txnA3.Timestamp, vA3, txnA3))
require.NoError(t, db.Flush())

// Double-check that we've created the SSTs we intended to.
userProps, err := db.GetUserProperties()
require.NoError(t, err)
require.Len(t, userProps.Sst, 2)
require.Equal(t, userProps.Sst[0].TsMin, &ts1)
require.Equal(t, userProps.Sst[0].TsMax, &ts1)
require.Equal(t, userProps.Sst[1].TsMin, &ts2)
require.Equal(t, userProps.Sst[1].TsMax, &ts3)

// The kA ts1 intent has been resolved. There's now a new intent on kA, but
// the timestamp (ts3) is too new so it should be ignored.
kvs, err := slurpKVs(db, kA, ts0, ts1)
require.NoError(t, err)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes},
}, kvs)
// kA has a value at ts2. Again the intent is too new (ts3), so ignore.
kvs, err = slurpKVs(db, kA, ts0, ts2)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kA, Timestamp: ts2}, Value: vA2.RawBytes},
{Key: engine.MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes},
}, kvs)
// At ts3, we should see the new intent
_, err = slurpKVs(db, kA, ts0, ts3)
require.EqualError(t, err, `conflicting intents on "kA"`)

// Similar to the kA ts1 check, but there is no newer intent. We expect to
// pick up the intent deletion and it should cancel out the intent, leaving
// only the value at ts1.
kvs, err = slurpKVs(db, kB, ts0, ts1)
require.NoError(t, err)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kB, Timestamp: ts1}, Value: vB1.RawBytes},
}, kvs)

// Sanity check that we see the still unresolved intent for kC ts1.
_, err = slurpKVs(db, kC, ts0, ts1)
require.EqualError(t, err, `conflicting intents on "kC"`)
}

0 comments on commit 2de146d

Please sign in to comment.