Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-2.1: ccl/storageccl/engineccl: properly handle intents which straddle sstables #31316

Merged
merged 1 commit into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions c-deps/libroach/timebound.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@ class TimeBoundTblPropCollector : public rocksdb::TablePropertiesCollector {
const char* Name() const override { return "TimeBoundTblPropCollector"; }

rocksdb::Status Finish(rocksdb::UserCollectedProperties* properties) override {
if (!last_value_.empty()) {
// Check to see if an intent was the last key in the SSTable. If
// it was, we need to extract the timestamp from the intent and
// update the bounds to include that timestamp.
cockroach::storage::engine::enginepb::MVCCMetadata meta;
if (!meta.ParseFromArray(last_value_.data(), last_value_.size())) {
// We're unable to parse the MVCCMetadata. Fail open by not
// setting the min/max timestamp properties.
return rocksdb::Status::OK();
}
if (meta.has_txn()) {
// We have an intent, use the intent's timestamp to update the
// timestamp bounds.
std::string ts;
EncodeTimestamp(ts, meta.timestamp().wall_time(), meta.timestamp().logical());
UpdateBounds(ts);
}
}

*properties = rocksdb::UserCollectedProperties{
{"crdb.ts.min", ts_min_},
{"crdb.ts.max", ts_max_},
Expand All @@ -40,25 +59,39 @@ class TimeBoundTblPropCollector : public rocksdb::TablePropertiesCollector {
uint64_t file_size) override {
rocksdb::Slice unused;
rocksdb::Slice ts;
if (SplitKey(user_key, &unused, &ts) && !ts.empty()) {
if (!SplitKey(user_key, &unused, &ts)) {
return rocksdb::Status::OK();
}

if (!ts.empty()) {
last_value_.clear();
ts.remove_prefix(1); // The NUL prefix.
if (ts_max_.empty() || ts.compare(ts_max_) > 0) {
ts_max_.assign(ts.data(), ts.size());
}
if (ts_min_.empty() || ts.compare(ts_min_) < 0) {
ts_min_.assign(ts.data(), ts.size());
}
UpdateBounds(ts);
return rocksdb::Status::OK();
}

last_value_.assign(value.data(), value.size());
return rocksdb::Status::OK();
}

virtual rocksdb::UserCollectedProperties GetReadableProperties() const override {
return rocksdb::UserCollectedProperties{};
}

private:
void UpdateBounds(rocksdb::Slice ts) {
if (ts_max_.empty() || ts.compare(ts_max_) > 0) {
ts_max_.assign(ts.data(), ts.size());
}
if (ts_min_.empty() || ts.compare(ts_min_) < 0) {
ts_min_.assign(ts.data(), ts.size());
}
}

private:
std::string ts_min_;
std::string ts_max_;
std::string last_value_;
};

class TimeBoundTblPropCollectorFactory : public rocksdb::TablePropertiesCollectorFactory {
Expand Down
116 changes: 116 additions & 0 deletions pkg/ccl/storageccl/engineccl/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,119 @@ func TestMVCCIterateTimeBound(t *testing.T) {
})
}
}

func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
defer leaktest.AfterTest(t)()

// Create a DB containing 2 keys, a and b, where b has an intent. We use the
// regular MVCCPut operation to generate these keys, which we'll later be
// copying into manually created sstables.
ctx := context.Background()
db1 := engine.NewInMem(roachpb.Attributes{}, 10<<20 /* 10 MB */)
defer db1.Close()

put := func(key, value string, ts int64, txn *roachpb.Transaction) {
v := roachpb.MakeValueFromString(value)
if err := engine.MVCCPut(
ctx, db1, nil, roachpb.Key(key), hlc.Timestamp{WallTime: ts}, v, txn,
); err != nil {
t.Fatal(err)
}
}

put("a", "a value", 1, nil)
put("b", "b value", 2, &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Key: roachpb.Key("b"),
ID: uuid.MakeV4(),
Epoch: 1,
Timestamp: hlc.Timestamp{WallTime: 2},
},
})

// Create a second DB in which we'll create a specific SSTable structure: the
// first SSTable contains 2 KVs where the first is a regular versioned key
// and the second is the MVCC metadata entry (i.e. an intent). The next
// SSTable contains the provisional value for the intent. The effect is that
// the metadata entry is separated from the entry it is metadata for.
//
// SSTable 1:
// a@1
// b@<meta>
//
// SSTable 2:
// b@2
db2 := engine.NewInMem(roachpb.Attributes{}, 10<<20 /* 10 MB */)
defer db2.Close()

ingest := func(it engine.Iterator, count int) {
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
t.Fatal(sst)
}
defer sst.Close()

for i := 0; i < count; i++ {
ok, err := it.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("expected key")
}
if err := sst.Add(engine.MVCCKeyValue{Key: it.Key(), Value: it.Value()}); err != nil {
t.Fatal(err)
}
it.Next()
}
sstContents, err := sst.Finish()
if err != nil {
t.Fatal(err)
}
if err := db2.WriteFile(`ingest`, sstContents); err != nil {
t.Fatal(err)
}
if err := db2.IngestExternalFiles(ctx, []string{`ingest`}, true); err != nil {
t.Fatal(err)
}
}

{
// Iterate over the entries in the first DB, ingesting them into SSTables
// in the second DB.
it := db1.NewIterator(engine.IterOptions{
UpperBound: keys.MaxKey,
})
it.Seek(engine.MVCCKey{Key: keys.MinKey})
ingest(it, 2)
ingest(it, 1)
it.Close()
}

{
// Use an incremental iterator to simulate an incremental backup from (1,
// 2]. Note that incremental iterators are exclusive on the start time and
// inclusive on the end time. The expectation is that we'll see a write
// intent error.
it := NewMVCCIncrementalIterator(db2, IterOptions{
StartTime: hlc.Timestamp{WallTime: 1},
EndTime: hlc.Timestamp{WallTime: 2},
UpperBound: keys.MaxKey,
})
defer it.Close()
for it.Seek(engine.MVCCKey{Key: keys.MinKey}); ; it.Next() {
ok, err := it.Valid()
if err != nil {
if _, ok = err.(*roachpb.WriteIntentError); ok {
// This is the write intent error we were expecting.
return
}
t.Fatalf("%T: %s", err, err)
}
if !ok {
break
}
}
t.Fatalf("expected write intent error, but found success")
}
}