Skip to content

Commit

Permalink
Merge pull request facebook#17 from cockroachdb/rdfix
Browse files Browse the repository at this point in the history
Backport facebook#4432
  • Loading branch information
petermattis authored Oct 17, 2018
2 parents fa3a044 + 6f11959 commit 2d5c6d1
Show file tree
Hide file tree
Showing 16 changed files with 998 additions and 309 deletions.
82 changes: 81 additions & 1 deletion db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,43 @@

namespace rocksdb {

const uint64_t kRangeTombstoneSentinel =
PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);

int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey& b) {
auto c = user_cmp->Compare(a.user_key(), b.user_key());
if (c != 0) {
return c;
}
auto a_footer = ExtractInternalKeyFooter(a.Encode());
auto b_footer = ExtractInternalKeyFooter(b.Encode());
if (a_footer == kRangeTombstoneSentinel) {
if (b_footer != kRangeTombstoneSentinel) {
return -1;
}
} else if (b_footer == kRangeTombstoneSentinel) {
return 1;
}
return 0;
}

int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
const InternalKey& b) {
if (a == nullptr) {
return -1;
}
return sstableKeyCompare(user_cmp, *a, b);
}

int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey* b) {
if (b == nullptr) {
return -1;
}
return sstableKeyCompare(user_cmp, a, *b);
}

uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
uint64_t sum = 0;
for (size_t i = 0; i < files.size() && files[i]; i++) {
Expand Down Expand Up @@ -81,6 +118,49 @@ void Compaction::GetBoundaryKeys(
}
}

std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
for (size_t i = 0; i < inputs.size(); i++) {
if (inputs[i].level == 0 || inputs[i].files.empty()) {
continue;
}
inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
AtomicCompactionUnitBoundary cur_boundary;
size_t first_atomic_idx = 0;
auto add_unit_boundary = [&](size_t to) {
if (first_atomic_idx == to) return;
for (size_t k = first_atomic_idx; k < to; k++) {
inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
}
first_atomic_idx = to;
};
for (size_t j = 0; j < inputs[i].files.size(); j++) {
const auto* f = inputs[i].files[j];
if (j == 0) {
// First file in a level.
cur_boundary.smallest = &f->smallest;
cur_boundary.largest = &f->largest;
} else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
0) {
// SSTs overlap but the end key of the previous file was not
// artificially extended by a range tombstone. Extend the current
// boundary.
cur_boundary.largest = &f->largest;
} else {
// Atomic compaction unit has ended.
add_unit_boundary(j);
cur_boundary.smallest = &f->smallest;
cur_boundary.largest = &f->largest;
}
}
add_unit_boundary(inputs[i].files.size());
assert(inputs[i].files.size() ==
inputs[i].atomic_compaction_unit_boundaries.size());
}
return inputs;
}

// helper function to determine if compaction is creating files at the
// bottommost level
bool Compaction::IsBottommostLevel(
Expand Down Expand Up @@ -151,7 +231,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
output_path_id_(_output_path_id),
output_compression_(_compression),
deletion_compaction_(_deletion_compaction),
inputs_(std::move(_inputs)),
inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
grandparents_(std::move(_grandparents)),
score_(_score),
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
Expand Down
45 changes: 45 additions & 0 deletions db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,43 @@

namespace rocksdb {

// Utility for comparing sstable boundary keys. Returns -1 if either a or b is
// null which provides the property that a==null indicates a key that is less
// than any key and b==null indicates a key that is greater than any key. Note
// that the comparison is performed primarily on the user-key portion of the
// key. If the user-keys compare equal, an additional test is made to sort
// range tombstone sentinel keys before other keys with the same user-key. The
// result is that 2 user-keys will compare equal if they differ purely on
// their sequence number and value, but the range tombstone sentinel for that
// user-key will compare not equal. This is necessary because the range
// tombstone sentinel key is set as the largest key for an sstable even though
// that key never appears in the database. We don't want adjacent sstables to
// be considered overlapping if they are separated by the range tombstone
// sentinel.
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey& b);
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
const InternalKey& b);
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey* b);

// An AtomicCompactionUnitBoundary represents a range of keys [smallest,
// largest] that exactly spans one ore more neighbouring SSTs on the same
// level. Every pair of SSTs in this range "overlap" (i.e., the largest
// user key of one file is the smallest user key of the next file). These
// boundaries are propagated down to RangeDelAggregator during compaction
// to provide safe truncation boundaries for range tombstones.
struct AtomicCompactionUnitBoundary {
const InternalKey* smallest = nullptr;
const InternalKey* largest = nullptr;
};

// The structure that manages compaction input files associated
// with the same physical level.
struct CompactionInputFiles {
int level;
std::vector<FileMetaData*> files;
std::vector<AtomicCompactionUnitBoundary> atomic_compaction_unit_boundaries;
inline bool empty() const { return files.empty(); }
inline size_t size() const { return files.size(); }
inline void clear() { files.clear(); }
Expand Down Expand Up @@ -95,6 +127,12 @@ class Compaction {
return inputs_[compaction_input_level][i];
}

const std::vector<AtomicCompactionUnitBoundary>* boundaries(
size_t compaction_input_level) const {
assert(compaction_input_level < inputs_.size());
return &inputs_[compaction_input_level].atomic_compaction_unit_boundaries;
}

// Returns the list of file meta data of the specified compaction
// input level.
// REQUIREMENT: "compaction_input_level" must be >= 0 and
Expand Down Expand Up @@ -252,6 +290,13 @@ class Compaction {
const std::vector<CompactionInputFiles>& inputs,
Slice* smallest_key, Slice* largest_key);

// Get the atomic file boundaries for all files in the compaction. Necessary
// in order to avoid the scenario described in
// https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb
// down appropriate key boundaries to RangeDelAggregator during compaction.
static std::vector<CompactionInputFiles> PopulateWithAtomicBoundaries(
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs);

// helper function to determine if compaction with inputs and storage is
// bottommost
static bool IsBottommostLevel(
Expand Down
9 changes: 5 additions & 4 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1092,10 +1092,11 @@ Status CompactionJob::FinishCompactionOutputFile(
for (; it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
if (upper_bound != nullptr &&
ucmp->Compare(*upper_bound, tombstone.start_key_) <= 0) {
// Tombstones starting at upper_bound or later only need to be included
// in the next table. Break because subsequent tombstones will start
// even later.
ucmp->Compare(*upper_bound, tombstone.start_key_) < 0) {
// Tombstones starting after upper_bound only need to be included in the
// next table (if the SSTs overlap, then upper_bound is contained in
// this SST and hence must be covered). Break because subsequent
// tombstones will start even later.
break;
}

Expand Down
16 changes: 9 additions & 7 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,15 @@ class PartialRangeTombstone {
PartialRangeTombstone()
: start_key_valid_(false), end_key_valid_(false), seq_(0) {}

PartialRangeTombstone(const Slice* sk, const Slice* ek, SequenceNumber sq)
PartialRangeTombstone(const ParsedInternalKey* sk,
const ParsedInternalKey* ek,
SequenceNumber sq)
: seq_(sq) {
SetStartKey(sk);
SetEndKey(ek);
}

void SetStartKey(const Slice* sk) {
void SetStartKey(const ParsedInternalKey* sk) {
if (sk != nullptr) {
start_key_ = *sk;
start_key_valid_ = true;
Expand All @@ -650,7 +652,7 @@ class PartialRangeTombstone {
}
}

void SetEndKey(const Slice* ek) {
void SetEndKey(const ParsedInternalKey* ek) {
if (ek != nullptr) {
end_key_ = *ek;
end_key_valid_ = true;
Expand All @@ -659,15 +661,15 @@ class PartialRangeTombstone {
}
}

const Slice* start_key() const {
const ParsedInternalKey* start_key() const {
return start_key_valid_ ? &start_key_ : nullptr;
}
const Slice* end_key() const { return end_key_valid_ ? &end_key_ : nullptr; }
const ParsedInternalKey* end_key() const { return end_key_valid_ ? &end_key_ : nullptr; }
SequenceNumber seq() const { return seq_; }

private:
Slice start_key_;
Slice end_key_;
ParsedInternalKey start_key_;
ParsedInternalKey end_key_;
bool start_key_valid_;
bool end_key_valid_;
SequenceNumber seq_;
Expand Down
Loading

0 comments on commit 2d5c6d1

Please sign in to comment.