Skip to content

Commit

Permalink
engineccl/mvcc: work around time-bound iterator bug
Browse files Browse the repository at this point in the history
For reasons described in cockroachdb#28358, a time-bound iterator will sometimes
see an unresolved intent where there is none. A normal iterator doesn't
have this problem, so we work around it in MVCCIncrementalIterator by
double checking all non-value keys. If the normal iterator has a
different value for the key, it's used instead. If the normal iterator
doesn't have that key, it's skipped.

This fixes both changefeeds and incremental backup.

Closes cockroachdb#32104
Closes cockroachdb#32799

Release note (bug fix): `CHANGEFEED`s and incremental `BACKUP`s no
longer indefinitely hang under an infrequent condition.
  • Loading branch information
danhhz committed Dec 13, 2018
1 parent d5975fb commit 27fcd1a
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 5 deletions.
54 changes: 53 additions & 1 deletion pkg/ccl/storageccl/engineccl/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -49,6 +50,12 @@ type MVCCIncrementalIterator struct {

iter engine.Iterator

// fields used for a workaround for a bug in the time-bound iterator
// (#28358)
upperBound roachpb.Key
e engine.Reader
sanityIter engine.Iterator

startTime hlc.Timestamp
endTime hlc.Timestamp
err error
Expand All @@ -72,6 +79,8 @@ type IterOptions struct {
// specified engine and options.
func NewMVCCIncrementalIterator(e engine.Reader, opts IterOptions) *MVCCIncrementalIterator {
return &MVCCIncrementalIterator{
e: e,
upperBound: opts.UpperBound,
iter: e.NewIterator(engine.IterOptions{
// The call to startTime.Next() converts our exclusive start bound into
// the inclusive start bound that MinTimestampHint expects. This is
Expand Down Expand Up @@ -99,6 +108,9 @@ func (i *MVCCIncrementalIterator) Seek(startKey engine.MVCCKey) {
// Close frees up resources held by the iterator.
func (i *MVCCIncrementalIterator) Close() {
i.iter.Close()
if i.sanityIter != nil {
i.sanityIter.Close()
}
}

// Next advances the iterator to the next key/value in the iteration. After this
Expand Down Expand Up @@ -134,7 +146,25 @@ func (i *MVCCIncrementalIterator) advance() {
i.meta.Reset()
i.meta.Timestamp = hlc.LegacyTimestamp(unsafeMetaKey.Timestamp)
} else {
if i.err = i.iter.ValueProto(&i.meta); i.err != nil {
// HACK(dan): Work around a known bug in the time-bound iterator.
// For reasons described in #28358, a time-bound iterator will
// sometimes see an unresolved intent where there is none. A normal
// iterator doesn't have this problem, so we work around it by
// double checking all non-value keys. If sanityCheckMetadataKey
// returns false, then the intent was a phantom and we ignore it.
// NB: this could return a newer intent than the one the time-bound
// iterator saw; this is handled.
unsafeValueBytes, ok, err := i.sanityCheckMetadataKey()
if err != nil {
i.err = err
i.valid = false
return
} else if !ok {
i.iter.Next()
continue
}

if i.err = protoutil.Unmarshal(unsafeValueBytes, &i.meta); i.err != nil {
i.valid = false
return
}
Expand Down Expand Up @@ -179,6 +209,28 @@ func (i *MVCCIncrementalIterator) advance() {
}
}

// sanityCheckMetadataKey looks up the current `i.iter` key using a normal,
// non-time-bound iterator and returns the value if the normal iterator also
// sees that exact key. Otherwise, it returns false. It's used in the workaround
// in `advance` for a time-bound iterator bug.
func (i *MVCCIncrementalIterator) sanityCheckMetadataKey() ([]byte, bool, error) {
if i.sanityIter == nil {
// The common case is that we'll won't need the sanityIter for a given
// MVCCIncrementalIterator, so create it lazily.
i.sanityIter = i.e.NewIterator(engine.IterOptions{UpperBound: i.upperBound})
}
unsafeKey := i.iter.UnsafeKey()
i.sanityIter.Seek(unsafeKey)
if ok, err := i.sanityIter.Valid(); err != nil {
return nil, false, err
} else if !ok {
return nil, false, nil
} else if !i.sanityIter.UnsafeKey().Equal(unsafeKey) {
return nil, false, nil
}
return i.sanityIter.UnsafeValue(), true, nil
}

// Valid must be called after any call to Reset(), Next(), or similar methods.
// It returns (true, nil) if the iterator points to a valid key (it is undefined
// to call Key(), Value(), or similar methods unless Valid() has returned (true,
Expand Down
139 changes: 138 additions & 1 deletion pkg/ccl/storageccl/engineccl/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func iterateExpectErr(
Expand All @@ -34,6 +35,7 @@ func iterateExpectErr(
errString string,
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
iter := NewMVCCIncrementalIterator(e, IterOptions{
StartTime: startTime,
EndTime: endTime,
Expand All @@ -49,7 +51,6 @@ func iterateExpectErr(
if _, err := iter.Valid(); !testutils.IsError(err, errString) {
t.Fatalf("expected error %q but got %v", errString, err)
}

}
}

Expand All @@ -61,6 +62,7 @@ func assertEqualKVs(
expected []engine.MVCCKeyValue,
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
iter := NewMVCCIncrementalIterator(e, IterOptions{
StartTime: startTime,
EndTime: endTime,
Expand Down Expand Up @@ -456,3 +458,138 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
t.Fatalf("expected write intent error, but found success")
}
}

// TestMVCCIncrementalIteratorIntentDeletion checks a workaround in
// MVCCIncrementalIterator for a bug in time-bound iterators, where an intent
// has been deleted, but the time-bound iterator doesn't see the deletion.
func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) {
defer leaktest.AfterTest(t)()

txn := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.Transaction {
return &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Key: key,
ID: uuid.MakeV4(),
Epoch: 1,
Timestamp: ts,
},
}
}
intent := func(txn *roachpb.Transaction) roachpb.Intent {
return roachpb.Intent{
Span: roachpb.Span{Key: txn.Key},
Txn: txn.TxnMeta,
Status: roachpb.COMMITTED,
}
}
slurpKVs := func(
e engine.Reader, prefix roachpb.Key, startTime, endTime hlc.Timestamp,
) ([]engine.MVCCKeyValue, error) {
endKey := prefix.PrefixEnd()
iter := NewMVCCIncrementalIterator(e, IterOptions{
StartTime: startTime,
EndTime: endTime,
UpperBound: endKey,
})
defer iter.Close()
var kvs []engine.MVCCKeyValue
for iter.Seek(engine.MakeMVCCMetadataKey(prefix)); ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 {
break
}
kvs = append(kvs, engine.MVCCKeyValue{Key: iter.Key(), Value: iter.Value()})
}
return kvs, nil
}

ctx := context.Background()
kA := roachpb.Key("kA")
vA1 := roachpb.MakeValueFromString("vA1")
vA2 := roachpb.MakeValueFromString("vA2")
vA3 := roachpb.MakeValueFromString("vA3")
kB := roachpb.Key("kB")
vB1 := roachpb.MakeValueFromString("vB1")
kC := roachpb.Key("kC")
vC1 := roachpb.MakeValueFromString("vC1")
ts0 := hlc.Timestamp{WallTime: 0}
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
ts3 := hlc.Timestamp{WallTime: 3}
txnA1 := txn(kA, ts1)
txnA3 := txn(kA, ts3)
txnB1 := txn(kB, ts1)
txnC1 := txn(kC, ts1)

db := engine.NewInMem(roachpb.Attributes{}, 10<<20)
defer db.Close()

// Set up two sstables very specifically:
//
// sst1 (time-bound metadata ts1->ts1)
// kA -> (intent)
// kA:1 -> vA1
// kB -> (intent)
// kB:1 -> vB1
// kC -> (intent)
// kC:1 -> vC1
//
// sst2 (time-bound metadata ts2->ts3) the intent deletions are for the
// intents at ts1, but there's no way know that when constructing the
// metadata (hence the time-bound iterator bug)
// kA -> (intent) [NB this overwrites the intent deletion]
// kA:3 -> vA3
// kA:2 -> vA2
// kB -> (intent deletion)
require.NoError(t, engine.MVCCPut(ctx, db, nil, kA, txnA1.Timestamp, vA1, txnA1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kB, txnB1.Timestamp, vB1, txnB1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kC, txnC1.Timestamp, vC1, txnC1))
require.NoError(t, db.Flush())
require.NoError(t, db.Compact())
require.NoError(t, engine.MVCCResolveWriteIntent(ctx, db, nil, intent(txnA1)))
require.NoError(t, engine.MVCCResolveWriteIntent(ctx, db, nil, intent(txnB1)))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kA, ts2, vA2, nil))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kA, txnA3.Timestamp, vA3, txnA3))
require.NoError(t, db.Flush())

// Double-check that we've created the SSTs we intended to.
userProps, err := db.GetUserProperties()
require.NoError(t, err)
require.Len(t, userProps.Sst, 2)
require.Equal(t, userProps.Sst[0].TsMin, &ts1)
require.Equal(t, userProps.Sst[0].TsMax, &ts1)
require.Equal(t, userProps.Sst[1].TsMin, &ts2)
require.Equal(t, userProps.Sst[1].TsMax, &ts3)

// The kA ts1 intent has been resolved. There's now a new intent on kA, but
// the timestamp (ts3) is too new so it should be ignored.
kvs, err := slurpKVs(db, kA, ts0, ts1)
require.NoError(t, err)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes},
}, kvs)
// kA has a value at ts2. Again the intent is too new (ts3), so ignore.
kvs, err = slurpKVs(db, kA, ts0, ts2)
require.NoError(t, err)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kA, Timestamp: ts2}, Value: vA2.RawBytes},
{Key: engine.MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes},
}, kvs)
// At ts3, we should see the new intent
_, err = slurpKVs(db, kA, ts0, ts3)
require.EqualError(t, err, `conflicting intents on "kA"`)

// Similar to the kA ts1 check, but there is no newer intent. We expect to
// pick up the intent deletion and it should cancel out the intent, leaving
// only the value at ts1.
kvs, err = slurpKVs(db, kB, ts0, ts1)
require.NoError(t, err)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kB, Timestamp: ts1}, Value: vB1.RawBytes},
}, kvs)

// Sanity check that we see the still unresolved intent for kC ts1.
_, err = slurpKVs(db, kC, ts0, ts1)
require.EqualError(t, err, `conflicting intents on "kC"`)
}
4 changes: 2 additions & 2 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,9 +1138,9 @@ func (r *RocksDB) GetSortedWALFiles() ([]WALFileInfo, error) {
return res, nil
}

// getUserProperties fetches the user properties stored in each sstable's
// GetUserProperties fetches the user properties stored in each sstable's
// metadata.
func (r *RocksDB) getUserProperties() (enginepb.SSTUserPropertiesCollection, error) {
func (r *RocksDB) GetUserProperties() (enginepb.SSTUserPropertiesCollection, error) {
buf := cStringToGoBytes(C.DBGetUserProperties(r.rdb))
var ssts enginepb.SSTUserPropertiesCollection
if err := protoutil.Unmarshal(buf, &ssts); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ func TestRocksDBTimeBound(t *testing.T) {
t.Fatal(err)
}

ssts, err := rocksdb.getUserProperties()
ssts, err := rocksdb.GetUserProperties()
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 27fcd1a

Please sign in to comment.