diff --git a/db/compaction.cc b/db/compaction.cc index 9db41139b51..a6f1a09d441 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -23,6 +23,43 @@ namespace rocksdb { +const uint64_t kRangeTombstoneSentinel = + PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion); + +int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, + const InternalKey& b) { + auto c = user_cmp->Compare(a.user_key(), b.user_key()); + if (c != 0) { + return c; + } + auto a_footer = ExtractInternalKeyFooter(a.Encode()); + auto b_footer = ExtractInternalKeyFooter(b.Encode()); + if (a_footer == kRangeTombstoneSentinel) { + if (b_footer != kRangeTombstoneSentinel) { + return -1; + } + } else if (b_footer == kRangeTombstoneSentinel) { + return 1; + } + return 0; +} + +int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a, + const InternalKey& b) { + if (a == nullptr) { + return -1; + } + return sstableKeyCompare(user_cmp, *a, b); +} + +int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, + const InternalKey* b) { + if (b == nullptr) { + return -1; + } + return sstableKeyCompare(user_cmp, a, *b); +} + uint64_t TotalFileSize(const std::vector& files) { uint64_t sum = 0; for (size_t i = 0; i < files.size() && files[i]; i++) { @@ -81,6 +118,49 @@ void Compaction::GetBoundaryKeys( } } +std::vector Compaction::PopulateWithAtomicBoundaries( + VersionStorageInfo* vstorage, std::vector inputs) { + const Comparator* ucmp = vstorage->InternalComparator()->user_comparator(); + for (size_t i = 0; i < inputs.size(); i++) { + if (inputs[i].level == 0 || inputs[i].files.empty()) { + continue; + } + inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size()); + AtomicCompactionUnitBoundary cur_boundary; + size_t first_atomic_idx = 0; + auto add_unit_boundary = [&](size_t to) { + if (first_atomic_idx == to) return; + for (size_t k = first_atomic_idx; k < to; k++) { + inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary); + } + first_atomic_idx = to; + }; + for (size_t j = 0; j < inputs[i].files.size(); j++) { + const auto* f = inputs[i].files[j]; + if (j == 0) { + // First file in a level. + cur_boundary.smallest = &f->smallest; + cur_boundary.largest = &f->largest; + } else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) == + 0) { + // SSTs overlap but the end key of the previous file was not + // artificially extended by a range tombstone. Extend the current + // boundary. + cur_boundary.largest = &f->largest; + } else { + // Atomic compaction unit has ended. + add_unit_boundary(j); + cur_boundary.smallest = &f->smallest; + cur_boundary.largest = &f->largest; + } + } + add_unit_boundary(inputs[i].files.size()); + assert(inputs[i].files.size() == + inputs[i].atomic_compaction_unit_boundaries.size()); + } + return inputs; +} + // helper function to determine if compaction is creating files at the // bottommost level bool Compaction::IsBottommostLevel( @@ -151,7 +231,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage, output_path_id_(_output_path_id), output_compression_(_compression), deletion_compaction_(_deletion_compaction), - inputs_(std::move(_inputs)), + inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))), grandparents_(std::move(_grandparents)), score_(_score), bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), diff --git a/db/compaction.h b/db/compaction.h index 7be6df2c1e8..580ee2e4aea 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -15,11 +15,43 @@ namespace rocksdb { +// Utility for comparing sstable boundary keys. Returns -1 if either a or b is +// null which provides the property that a==null indicates a key that is less +// than any key and b==null indicates a key that is greater than any key. Note +// that the comparison is performed primarily on the user-key portion of the +// key. If the user-keys compare equal, an additional test is made to sort +// range tombstone sentinel keys before other keys with the same user-key. The +// result is that 2 user-keys will compare equal if they differ purely on +// their sequence number and value, but the range tombstone sentinel for that +// user-key will compare not equal. This is necessary because the range +// tombstone sentinel key is set as the largest key for an sstable even though +// that key never appears in the database. We don't want adjacent sstables to +// be considered overlapping if they are separated by the range tombstone +// sentinel. +int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, + const InternalKey& b); +int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a, + const InternalKey& b); +int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, + const InternalKey* b); + +// An AtomicCompactionUnitBoundary represents a range of keys [smallest, +// largest] that exactly spans one ore more neighbouring SSTs on the same +// level. Every pair of SSTs in this range "overlap" (i.e., the largest +// user key of one file is the smallest user key of the next file). These +// boundaries are propagated down to RangeDelAggregator during compaction +// to provide safe truncation boundaries for range tombstones. +struct AtomicCompactionUnitBoundary { + const InternalKey* smallest = nullptr; + const InternalKey* largest = nullptr; +}; + // The structure that manages compaction input files associated // with the same physical level. struct CompactionInputFiles { int level; std::vector files; + std::vector atomic_compaction_unit_boundaries; inline bool empty() const { return files.empty(); } inline size_t size() const { return files.size(); } inline void clear() { files.clear(); } @@ -95,6 +127,12 @@ class Compaction { return inputs_[compaction_input_level][i]; } + const std::vector* boundaries( + size_t compaction_input_level) const { + assert(compaction_input_level < inputs_.size()); + return &inputs_[compaction_input_level].atomic_compaction_unit_boundaries; + } + // Returns the list of file meta data of the specified compaction // input level. // REQUIREMENT: "compaction_input_level" must be >= 0 and @@ -252,6 +290,13 @@ class Compaction { const std::vector& inputs, Slice* smallest_key, Slice* largest_key); + // Get the atomic file boundaries for all files in the compaction. Necessary + // in order to avoid the scenario described in + // https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb + // down appropriate key boundaries to RangeDelAggregator during compaction. + static std::vector PopulateWithAtomicBoundaries( + VersionStorageInfo* vstorage, std::vector inputs); + // helper function to determine if compaction with inputs and storage is // bottommost static bool IsBottommostLevel( diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 390d4333836..d03d203c554 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1092,10 +1092,11 @@ Status CompactionJob::FinishCompactionOutputFile( for (; it->Valid(); it->Next()) { auto tombstone = it->Tombstone(); if (upper_bound != nullptr && - ucmp->Compare(*upper_bound, tombstone.start_key_) <= 0) { - // Tombstones starting at upper_bound or later only need to be included - // in the next table. Break because subsequent tombstones will start - // even later. + ucmp->Compare(*upper_bound, tombstone.start_key_) < 0) { + // Tombstones starting after upper_bound only need to be included in the + // next table (if the SSTs overlap, then upper_bound is contained in + // this SST and hence must be covered). Break because subsequent + // tombstones will start even later. break; } diff --git a/db/dbformat.h b/db/dbformat.h index 72b5ac48833..083c0b10ea1 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -635,13 +635,15 @@ class PartialRangeTombstone { PartialRangeTombstone() : start_key_valid_(false), end_key_valid_(false), seq_(0) {} - PartialRangeTombstone(const Slice* sk, const Slice* ek, SequenceNumber sq) + PartialRangeTombstone(const ParsedInternalKey* sk, + const ParsedInternalKey* ek, + SequenceNumber sq) : seq_(sq) { SetStartKey(sk); SetEndKey(ek); } - void SetStartKey(const Slice* sk) { + void SetStartKey(const ParsedInternalKey* sk) { if (sk != nullptr) { start_key_ = *sk; start_key_valid_ = true; @@ -650,7 +652,7 @@ class PartialRangeTombstone { } } - void SetEndKey(const Slice* ek) { + void SetEndKey(const ParsedInternalKey* ek) { if (ek != nullptr) { end_key_ = *ek; end_key_valid_ = true; @@ -659,15 +661,15 @@ class PartialRangeTombstone { } } - const Slice* start_key() const { + const ParsedInternalKey* start_key() const { return start_key_valid_ ? &start_key_ : nullptr; } - const Slice* end_key() const { return end_key_valid_ ? &end_key_ : nullptr; } + const ParsedInternalKey* end_key() const { return end_key_valid_ ? &end_key_ : nullptr; } SequenceNumber seq() const { return seq_; } private: - Slice start_key_; - Slice end_key_; + ParsedInternalKey start_key_; + ParsedInternalKey end_key_; bool start_key_valid_; bool end_key_valid_; SequenceNumber seq_; diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index f9aa910ffd8..832c763e8f6 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -10,20 +10,39 @@ namespace rocksdb { +namespace { + struct TombstoneStartKeyComparator { - TombstoneStartKeyComparator(const Comparator* c) : cmp(c) {} + explicit TombstoneStartKeyComparator(const InternalKeyComparator* c) + : cmp(c) {} - bool operator()(const RangeTombstone& a, const RangeTombstone& b) const { + bool operator()(const TruncatedRangeTombstone& a, + const TruncatedRangeTombstone& b) const { return cmp->Compare(a.start_key_, b.start_key_) < 0; } - const Comparator* cmp; + const InternalKeyComparator* cmp; +}; + +struct ParsedInternalKeyComparator { + explicit ParsedInternalKeyComparator(const InternalKeyComparator* c) + : cmp(c) {} + + bool operator()(const ParsedInternalKey& a, + const ParsedInternalKey& b) const { + return cmp->Compare(a, b) < 0; + } + + const InternalKeyComparator* cmp; }; +} // namespace + // An UncollapsedRangeDelMap is quick to create but slow to answer ShouldDelete // queries. class UncollapsedRangeDelMap : public RangeDelMap { - typedef std::multiset Rep; + typedef std::multiset + Rep; class Iterator : public RangeDelIterator { const Rep& rep_; @@ -35,30 +54,38 @@ class UncollapsedRangeDelMap : public RangeDelMap { void Next() override { iter_++; } void Seek(const Slice&) override { - fprintf(stderr, "UncollapsedRangeDelMap::Iterator::Seek unimplemented\n"); + fprintf(stderr, + "UncollapsedRangeDelMap::Iterator::Seek(Slice&) unimplemented\n"); + abort(); + } + + void Seek(const ParsedInternalKey&) override { + fprintf(stderr, + "UncollapsedRangeDelMap::Iterator::Seek(ParsedInternalKey&) " + "unimplemented\n"); abort(); } - RangeTombstone Tombstone() const override { return *iter_; } + RangeTombstone Tombstone() const override { return iter_->Tombstone(); } }; Rep rep_; - const Comparator* ucmp_; + const InternalKeyComparator* icmp_; public: - UncollapsedRangeDelMap(const Comparator* ucmp) - : rep_(TombstoneStartKeyComparator(ucmp)), ucmp_(ucmp) {} + explicit UncollapsedRangeDelMap(const InternalKeyComparator* icmp) + : rep_(TombstoneStartKeyComparator(icmp)), icmp_(icmp) {} bool ShouldDelete(const ParsedInternalKey& parsed, - RangeDelPositioningMode mode) { + RangeDelPositioningMode mode) override { (void)mode; assert(mode == RangeDelPositioningMode::kFullScan); for (const auto& tombstone : rep_) { - if (ucmp_->Compare(parsed.user_key, tombstone.start_key_) < 0) { + if (icmp_->Compare(parsed, tombstone.start_key_) < 0) { break; } if (parsed.sequence < tombstone.seq_ && - ucmp_->Compare(parsed.user_key, tombstone.end_key_) < 0) { + icmp_->Compare(parsed, tombstone.end_key_) < 0) { return true; } } @@ -66,7 +93,7 @@ class UncollapsedRangeDelMap : public RangeDelMap { } bool ShouldDeleteRange(const Slice& start, const Slice& end, - SequenceNumber seqno) { + SequenceNumber seqno) override { // Unimplemented, though the lack of implementation only affects // performance (not correctness) for sstable ingestion. Normal read // operations use a CollapsedRangeDelMap. @@ -78,7 +105,7 @@ class UncollapsedRangeDelMap : public RangeDelMap { } PartialRangeTombstone GetTombstone(const Slice& user_key, - SequenceNumber seqno) { + SequenceNumber seqno) override { // Unimplemented, though the lack of implementation only affects // performance (not correctness) for sstable ingestion. Normal // read operations use a CollapsedRangeDelMap. @@ -87,24 +114,27 @@ class UncollapsedRangeDelMap : public RangeDelMap { return PartialRangeTombstone(); } - bool IsRangeOverlapped(const Slice& start, const Slice& end) { + bool IsRangeOverlapped(const ParsedInternalKey& start, + const ParsedInternalKey& end) override { for (const auto& tombstone : rep_) { - if (ucmp_->Compare(start, tombstone.end_key_) < 0 && - ucmp_->Compare(tombstone.start_key_, end) <= 0 && - ucmp_->Compare(tombstone.start_key_, tombstone.end_key_) < 0) { + if (icmp_->Compare(start, tombstone.end_key_) < 0 && + icmp_->Compare(tombstone.start_key_, end) <= 0 && + icmp_->Compare(tombstone.start_key_, tombstone.end_key_) < 0) { return true; } } return false; } - void AddTombstone(RangeTombstone tombstone) { rep_.emplace(tombstone); } + void AddTombstone(TruncatedRangeTombstone tombstone) override { + rep_.emplace(tombstone); + } - size_t Size() const { return rep_.size(); } + size_t Size() const override { return rep_.size(); } - void InvalidatePosition() {} // no-op + void InvalidatePosition() override {} // no-op - std::unique_ptr NewIterator() { + std::unique_ptr NewIterator() override { return std::unique_ptr(new Iterator(this->rep_)); } }; @@ -146,7 +176,9 @@ class UncollapsedRangeDelMap : public RangeDelMap { // compared against the map entry g → 3 and determined to be uncovered. By // contrast, the key h @ 2 would be determined to be covered. class CollapsedRangeDelMap : public RangeDelMap { - typedef std::map Rep; + typedef std::map + Rep; class Iterator : public RangeDelIterator { void MaybeSeekPastSentinel() { @@ -168,7 +200,12 @@ class CollapsedRangeDelMap : public RangeDelMap { MaybeSeekPastSentinel(); } - void Seek(const Slice& target) override { + void Seek(const Slice&) override { + fprintf(stderr, "CollapsedRangeDelMap::Iterator::Seek(Slice&) unimplemented\n"); + abort(); + } + + void Seek(const ParsedInternalKey& target) override { iter_ = rep_.upper_bound(target); if (iter_ != rep_.begin()) { iter_--; @@ -177,9 +214,12 @@ class CollapsedRangeDelMap : public RangeDelMap { } RangeTombstone Tombstone() const override { + assert(Valid()); + assert(std::next(iter_) != rep_.end()); + assert(iter_->second != 0); RangeTombstone tombstone; - tombstone.start_key_ = iter_->first; - tombstone.end_key_ = std::next(iter_)->first; + tombstone.start_key_ = iter_->first.user_key; + tombstone.end_key_ = std::next(iter_)->first.user_key; tombstone.seq_ = iter_->second; return tombstone; } @@ -187,16 +227,17 @@ class CollapsedRangeDelMap : public RangeDelMap { Rep rep_; Rep::iterator iter_; - const Comparator* ucmp_; + const InternalKeyComparator* icmp_; public: - CollapsedRangeDelMap(const Comparator* ucmp) - : rep_(stl_wrappers::LessOfComparator(ucmp)), ucmp_(ucmp) { + explicit CollapsedRangeDelMap(const InternalKeyComparator* icmp) + : rep_(ParsedInternalKeyComparator(icmp)), + icmp_(icmp) { InvalidatePosition(); } bool ShouldDelete(const ParsedInternalKey& parsed, - RangeDelPositioningMode mode) { + RangeDelPositioningMode mode) override { if (iter_ == rep_.end() && (mode == RangeDelPositioningMode::kForwardTraversal || mode == RangeDelPositioningMode::kBackwardTraversal)) { @@ -210,29 +251,29 @@ class CollapsedRangeDelMap : public RangeDelMap { case RangeDelPositioningMode::kForwardTraversal: assert(iter_ != rep_.end()); if (iter_ == rep_.begin() && - ucmp_->Compare(parsed.user_key, iter_->first) < 0) { + icmp_->Compare(parsed, iter_->first) < 0) { // before start of deletion intervals return false; } while (std::next(iter_) != rep_.end() && - ucmp_->Compare(std::next(iter_)->first, parsed.user_key) <= 0) { + icmp_->Compare(std::next(iter_)->first, parsed) <= 0) { ++iter_; } break; case RangeDelPositioningMode::kBackwardTraversal: assert(iter_ != rep_.end()); while (iter_ != rep_.begin() && - ucmp_->Compare(parsed.user_key, iter_->first) < 0) { + icmp_->Compare(parsed, iter_->first) < 0) { --iter_; } if (iter_ == rep_.begin() && - ucmp_->Compare(parsed.user_key, iter_->first) < 0) { + icmp_->Compare(parsed, iter_->first) < 0) { // before start of deletion intervals return false; } break; case RangeDelPositioningMode::kBinarySearch: - iter_ = rep_.upper_bound(parsed.user_key); + iter_ = rep_.upper_bound(parsed); if (iter_ == rep_.begin()) { // before start of deletion intervals return false; @@ -241,14 +282,14 @@ class CollapsedRangeDelMap : public RangeDelMap { break; } assert(iter_ != rep_.end() && - ucmp_->Compare(iter_->first, parsed.user_key) <= 0); + icmp_->Compare(iter_->first, parsed) <= 0); assert(std::next(iter_) == rep_.end() || - ucmp_->Compare(parsed.user_key, std::next(iter_)->first) < 0); + icmp_->Compare(parsed, std::next(iter_)->first) < 0); return parsed.sequence < iter_->second; } bool ShouldDeleteRange(const Slice& start, const Slice& end, - SequenceNumber seqno) { + SequenceNumber seqno) override { ParsedInternalKey parsed_start; if (!ParseInternalKey(start, &parsed_start)) { assert(false); @@ -257,17 +298,17 @@ class CollapsedRangeDelMap : public RangeDelMap { if (!ParseInternalKey(end, &parsed_end)) { assert(false); } - if (ucmp_->Compare(parsed_start.user_key, parsed_end.user_key) > 0) { + if (icmp_->Compare(parsed_start, parsed_end) > 0) { return false; } - auto iter = rep_.upper_bound(parsed_start.user_key); + auto iter = rep_.upper_bound(parsed_start); if (iter == rep_.begin()) { // before start of deletion intervals return false; } --iter; - if (ucmp_->Compare(parsed_start.user_key, iter->first) < 0) { + if (icmp_->Compare(parsed_start, iter->first) < 0) { assert(false); return false; } @@ -275,7 +316,7 @@ class CollapsedRangeDelMap : public RangeDelMap { // number, or we determine that our range is completely covered by newer // tombstones. for (; iter != rep_.end(); ++iter) { - if (ucmp_->Compare(parsed_end.user_key, iter->first) < 0) { + if (icmp_->Compare(parsed_end, iter->first) < 0) { return true; } if (seqno >= iter->second) { @@ -286,9 +327,16 @@ class CollapsedRangeDelMap : public RangeDelMap { return false; } - PartialRangeTombstone GetTombstone(const Slice& user_key, - SequenceNumber seqno) { - auto iter = rep_.upper_bound(user_key); + PartialRangeTombstone GetTombstone(const Slice& key, + SequenceNumber seqno) override { + ParsedInternalKey parsed_key; + if (!ParseInternalKey(key, &parsed_key)) { + assert(false); + // Fail open. + return PartialRangeTombstone(); + } + + auto iter = rep_.upper_bound(parsed_key); if (iter == rep_.begin()) { // before start of deletion intervals return PartialRangeTombstone(nullptr, &iter->first, 0); @@ -306,15 +354,16 @@ class CollapsedRangeDelMap : public RangeDelMap { prev->second > seqno ? prev->second : 0); } - bool IsRangeOverlapped(const Slice&, const Slice&) { + bool IsRangeOverlapped(const ParsedInternalKey&, + const ParsedInternalKey&) override { // Unimplemented because the only client of this method, file ingestion, // uses uncollapsed maps. fprintf(stderr, "CollapsedRangeDelMap::IsRangeOverlapped unimplemented"); abort(); } - void AddTombstone(RangeTombstone t) { - if (ucmp_->Compare(t.start_key_, t.end_key_) >= 0) { + void AddTombstone(TruncatedRangeTombstone t) override { + if (icmp_->Compare(t.start_key_, t.end_key_) >= 0 || t.seq_ == 0) { // The tombstone covers no keys. Nothing to do. return; } @@ -347,7 +396,8 @@ class CollapsedRangeDelMap : public RangeDelMap { end_seq = prev_seq(); Rep::iterator pit; if (it != rep_.begin() && (pit = std::prev(it)) != rep_.begin() && - ucmp_->Compare(pit->first, t.start_key_) == 0 && std::prev(pit)->second == t.seq_) { + icmp_->Compare(pit->first, t.start_key_) == 0 && + std::prev(pit)->second == t.seq_) { // The new tombstone starts at the end of an existing tombstone with an // identical seqno: // @@ -372,7 +422,7 @@ class CollapsedRangeDelMap : public RangeDelMap { } // Look at all the existing transitions that overlap the new tombstone. - while (it != rep_.end() && ucmp_->Compare(it->first, t.end_key_) < 0) { + while (it != rep_.end() && icmp_->Compare(it->first, t.end_key_) < 0) { if (t.seq_ >= it->second) { // The transition is to an existing tombstone that the new tombstone // covers. Save the covered tombstone's seqno. We'll need to return to @@ -418,12 +468,20 @@ class CollapsedRangeDelMap : public RangeDelMap { if (t.seq_ == prev_seq()) { // The new tombstone is unterminated in the map. - if (it != rep_.end() && t.seq_ == it->second && ucmp_->Compare(it->first, t.end_key_) == 0) { + if (it != rep_.end() && t.seq_ == it->second && + icmp_->Compare(it->first, t.end_key_) == 0) { // The new tombstone ends at the start of another tombstone with an // identical seqno. Merge the tombstones by removing the existing // tombstone's start key. rep_.erase(it); - } else if ((it == rep_.end() || end_seq != it->second) && end_seq != prev_seq()) { + } else if (end_seq == prev_seq() || + (it != rep_.end() && end_seq == it->second)) { + // The new tombstone is implicitly ended because its end point is + // contained within an existing tombstone with the same seqno: + // + // 2: ---k--N + // ^ + } else { // The new tombstone needs an explicit end point. // // 3: OR 3: --G OR 3: --G K-- @@ -432,13 +490,8 @@ class CollapsedRangeDelMap : public RangeDelMap { // Install one that returns to the last seqno we covered. Because end // keys are exclusive, if there's an existing transition at t.end_key_, // it takes precedence over the transition that we install here. - rep_.emplace(t.end_key_, end_seq); // emplace is a noop if existing entry - } else { - // The new tombstone is implicitly ended because its end point is - // contained within an existing tombstone with the same seqno: - // - // 2: ---k--N - // ^ + rep_.emplace(t.end_key_, + end_seq); // emplace is a noop if existing entry } } else { // The new tombstone is implicitly ended because its end point is covered @@ -451,16 +504,16 @@ class CollapsedRangeDelMap : public RangeDelMap { } } - size_t Size() const { + size_t Size() const override { if (rep_.empty()) { return 0; } return rep_.size() - 1; } - void InvalidatePosition() { iter_ = rep_.end(); } + void InvalidatePosition() override { iter_ = rep_.end(); } - std::unique_ptr NewIterator() { + std::unique_ptr NewIterator() override { return std::unique_ptr(new Iterator(this->rep_)); } }; @@ -496,9 +549,9 @@ void RangeDelAggregator::InitRep(const std::vector& snapshots) { std::unique_ptr RangeDelAggregator::NewRangeDelMap() { RangeDelMap* tombstone_map; if (collapse_deletions_) { - tombstone_map = new CollapsedRangeDelMap(icmp_.user_comparator()); + tombstone_map = new CollapsedRangeDelMap(&icmp_); } else { - tombstone_map = new UncollapsedRangeDelMap(icmp_.user_comparator()); + tombstone_map = new UncollapsedRangeDelMap(&icmp_); } return std::unique_ptr(tombstone_map); } @@ -509,8 +562,9 @@ bool RangeDelAggregator::ShouldDeleteImpl(const Slice& internal_key, ParsedInternalKey parsed; if (!ParseInternalKey(internal_key, &parsed)) { assert(false); + return false; } - return ShouldDelete(parsed, mode); + return ShouldDeleteImpl(parsed, mode); } bool RangeDelAggregator::ShouldDeleteImpl(const ParsedInternalKey& parsed, @@ -536,7 +590,7 @@ bool RangeDelAggregator::ShouldDeleteRange( return tombstone_map.ShouldDeleteRange(start, end, seqno); } -PartialRangeTombstone RangeDelAggregator::GetTombstone(const Slice& user_key, +PartialRangeTombstone RangeDelAggregator::GetTombstone(const Slice& key, SequenceNumber seqno) { if (rep_ == nullptr) { return PartialRangeTombstone(); @@ -545,7 +599,7 @@ PartialRangeTombstone RangeDelAggregator::GetTombstone(const Slice& user_key, if (tombstone_map.IsEmpty()) { return PartialRangeTombstone(); } - return tombstone_map.GetTombstone(user_key, seqno); + return tombstone_map.GetTombstone(key, seqno); } bool RangeDelAggregator::IsRangeOverlapped(const Slice& start, @@ -556,8 +610,10 @@ bool RangeDelAggregator::IsRangeOverlapped(const Slice& start, if (rep_ == nullptr) { return false; } + ParsedInternalKey start_ikey(start, kMaxSequenceNumber, kMaxValue); + ParsedInternalKey end_ikey(end, 0, static_cast(0)); for (const auto& stripe : rep_->stripe_map_) { - if (stripe.second->IsRangeOverlapped(start, end)) { + if (stripe.second->IsRangeOverlapped(start_ikey, end_ikey)) { return true; } } @@ -583,33 +639,62 @@ Status RangeDelAggregator::AddTombstones( first_iter = false; } ParsedInternalKey parsed_key; - if (!ParseInternalKey(input->key(), &parsed_key)) { + bool parsed; + if (input->IsKeyPinned()) { + parsed = ParseInternalKey(input->key(), &parsed_key); + } else { + // The tombstone map holds slices into the iterator's memory. Make a + // copy of the key if it is not pinned. + rep_->pinned_slices_.emplace_back(input->key().data(), + input->key().size()); + parsed = ParseInternalKey(rep_->pinned_slices_.back(), &parsed_key); + } + if (!parsed) { return Status::Corruption("Unable to parse range tombstone InternalKey"); } - RangeTombstone tombstone(parsed_key, input->value()); + Slice end_user_key; + if (input->IsValuePinned()) { + end_user_key = input->value(); + } else { + // The tombstone map holds slices into the iterator's memory. Make a + // copy of the value if it is not pinned. + rep_->pinned_slices_.emplace_back(input->value().data(), + input->value().size()); + end_user_key = rep_->pinned_slices_.back(); + } + ParsedInternalKey start_key(parsed_key.user_key, kMaxSequenceNumber, + kMaxValue); + ParsedInternalKey end_key(end_user_key, kMaxSequenceNumber, kMaxValue); // Truncate the tombstone to the range [smallest, largest]. if (smallest != nullptr) { - if (icmp_.user_comparator()->Compare( - tombstone.start_key_, smallest->user_key()) < 0) { - tombstone.start_key_ = smallest->user_key(); + ParsedInternalKey parsed_smallest; + if (ParseInternalKey(smallest->Encode(), &parsed_smallest) && + icmp_.Compare(start_key, parsed_smallest) < 0) { + start_key.user_key = parsed_smallest.user_key; + start_key.sequence = parsed_smallest.sequence; } } if (largest != nullptr) { - // To safely truncate the range tombstone's end key, it must extend past - // the largest key in the sstable (which may have been extended to the - // smallest key in the next sstable), and largest must be a tombstone - // sentinel key. A range tombstone may straddle two sstables and not be - // the tombstone sentinel key in the first sstable if a user-key also - // straddles the sstables (possible if there is a snapshot between the - // two versions of the user-key), in which case we cannot truncate the - // range tombstone. - if (icmp_.user_comparator()->Compare(tombstone.end_key_, - largest->user_key()) > 0 && - GetInternalKeySeqno(largest->Encode()) == kMaxSequenceNumber) { - tombstone.end_key_ = largest->user_key(); + ParsedInternalKey parsed_largest; + if (ParseInternalKey(largest->Encode(), &parsed_largest) && + icmp_.Compare(end_key, parsed_largest) > 0) { + end_key.user_key = parsed_largest.user_key; + if (parsed_largest.sequence != kMaxSequenceNumber) { + // The same user key straddles two adjacent sstables. To make sure we + // can truncate to a range that includes the largest point key in the + // first sstable, set the tombstone end key's sequence number to 1 + // less than the largest key. + assert(parsed_largest.sequence != 0); + end_key.sequence = parsed_largest.sequence - 1; + } else { + // The SST file boundary was artificially extended by a range tombstone. + // We will not see any entries in this SST with this user key, so we + // can leave the seqnum at kMaxSequenceNumber. + } } } - GetRangeDelMap(tombstone.seq_).AddTombstone(std::move(tombstone)); + TruncatedRangeTombstone tombstone(start_key, end_key, parsed_key.sequence); + GetRangeDelMap(parsed_key.sequence).AddTombstone(std::move(tombstone)); input->Next(); } if (!first_iter) { @@ -688,6 +773,11 @@ class MergingRangeDelIter : public RangeDelIterator { } void Seek(const Slice& target) override { + ParsedInternalKey ikey(target, kMaxSequenceNumber, kMaxValue); + Seek(ikey); + } + + void Seek(const ParsedInternalKey& target) override { heap_.clear(); for (auto& iter : iters_) { iter->Seek(target); diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 5225428d176..30f6731b499 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -5,6 +5,7 @@ #pragma once +#include #include #include #include @@ -39,6 +40,35 @@ enum class RangeDelPositioningMode { kBinarySearch, }; +// TruncatedRangeTombstones are a slight generalization of regular +// RangeTombstones that can represent truncations caused by SST boundaries. +// Instead of using user keys to represent the start and end keys, they instead +// use internal keys, whose sequence number indicates the sequence number of +// the smallest/largest SST key (in the case where a tombstone is untruncated, +// the sequence numbers will be kMaxSequenceNumber for both start and end +// keys). Like RangeTombstones, TruncatedRangeTombstone are also +// end-key-exclusive. +struct TruncatedRangeTombstone { + TruncatedRangeTombstone(const ParsedInternalKey& sk, + const ParsedInternalKey& ek, SequenceNumber s) + : start_key_(sk), end_key_(ek), seq_(s) {} + + RangeTombstone Tombstone() const { + // The RangeTombstone returned here can cover less than the + // TruncatedRangeTombstone when its end key has a seqnum that is not + // kMaxSequenceNumber. Since this method is only used by RangeDelIterators + // (which in turn are only used during flush/compaction), we avoid this + // problem by using truncation boundaries spanning multiple SSTs, which + // are selected in a way that guarantee a clean break at the end key. + assert(end_key_.sequence == kMaxSequenceNumber); + return RangeTombstone(start_key_.user_key, end_key_.user_key, seq_); + } + + ParsedInternalKey start_key_; + ParsedInternalKey end_key_; + SequenceNumber seq_; +}; + // A RangeDelIterator iterates over range deletion tombstones. class RangeDelIterator { public: @@ -46,7 +76,9 @@ class RangeDelIterator { virtual bool Valid() const = 0; virtual void Next() = 0; + // NOTE: the Slice passed to this method must be a user key. virtual void Seek(const Slice& target) = 0; + virtual void Seek(const ParsedInternalKey& target) = 0; virtual RangeTombstone Tombstone() const = 0; }; @@ -63,15 +95,16 @@ class RangeDelMap { RangeDelPositioningMode mode) = 0; virtual bool ShouldDeleteRange(const Slice& start, const Slice& end, SequenceNumber seqno) = 0; - virtual PartialRangeTombstone GetTombstone(const Slice& user_key, + virtual PartialRangeTombstone GetTombstone(const Slice& key, SequenceNumber seqno) = 0; - virtual bool IsRangeOverlapped(const Slice& start, const Slice& end) = 0; + virtual bool IsRangeOverlapped(const ParsedInternalKey& start, + const ParsedInternalKey& end) = 0; virtual void InvalidatePosition() = 0; virtual size_t Size() const = 0; bool IsEmpty() const { return Size() == 0; } - virtual void AddTombstone(RangeTombstone tombstone) = 0; + virtual void AddTombstone(TruncatedRangeTombstone tombstone) = 0; virtual std::unique_ptr NewIterator() = 0; }; @@ -109,7 +142,7 @@ class RangeDelAggregator { // covered by a range tombstone residing in the same snapshot stripe. // @param mode If collapse_deletions_ is true, this dictates how we will find // the deletion whose interval contains this key. Otherwise, its - // value must be kFullScan indicating linear scan from beginning.. + // value must be kFullScan indicating linear scan from beginning. bool ShouldDelete( const ParsedInternalKey& parsed, RangeDelPositioningMode mode = RangeDelPositioningMode::kFullScan) { @@ -138,11 +171,11 @@ class RangeDelAggregator { bool ShouldDeleteRange(const Slice& start, const Slice& end, SequenceNumber seqno); - // Get the range tombstone at the specified user_key and sequence number. A - // valid tombstone is always returned, though it may cover an empty range of - // keys or the sequence number may be 0 to indicate that no tombstone covers - // the specified key. - PartialRangeTombstone GetTombstone(const Slice& user_key, + // Get the range tombstone at the specified internal key and sequence + // number. A valid tombstone is always returned, though it may cover an + // empty range of keys or the sequence number may be 0 to indicate that no + // tombstone covers the specified key. + PartialRangeTombstone GetTombstone(const Slice& key, SequenceNumber seqno); // Checks whether range deletions cover any keys between `start` and `end`, @@ -191,6 +224,7 @@ class RangeDelAggregator { struct Rep { StripeMap stripe_map_; PinnedIteratorsManager pinned_iters_mgr_; + std::list pinned_slices_; std::set added_files_; }; // Initializes rep_ lazily. This aggregator object is constructed for every diff --git a/db/range_del_aggregator_test.cc b/db/range_del_aggregator_test.cc index 50753f4cde4..dc865332b32 100644 --- a/db/range_del_aggregator_test.cc +++ b/db/range_del_aggregator_test.cc @@ -33,7 +33,13 @@ enum Direction { kReverse, }; -static auto icmp = InternalKeyComparator(BytewiseComparator()); +struct AddTombstonesArgs { + const std::vector tombstones; + const InternalKey* smallest; + const InternalKey* largest; +}; + +static auto bytewise_icmp = InternalKeyComparator(BytewiseComparator()); void AddTombstones(RangeDelAggregator* range_del_agg, const std::vector& range_dels, @@ -60,12 +66,16 @@ void VerifyPartialTombstonesEq(const PartialRangeTombstone& a, const PartialRangeTombstone& b) { ASSERT_EQ(a.seq(), b.seq()); if (a.start_key() != nullptr) { - ASSERT_EQ(*a.start_key(), *b.start_key()); + ASSERT_EQ(a.start_key()->user_key, b.start_key()->user_key); + ASSERT_EQ(a.start_key()->sequence, b.start_key()->sequence); + ASSERT_EQ(a.start_key()->type, b.start_key()->type); } else { ASSERT_EQ(b.start_key(), nullptr); } if (a.end_key() != nullptr) { - ASSERT_EQ(*a.end_key(), *b.end_key()); + ASSERT_EQ(a.end_key()->user_key, b.end_key()->user_key); + ASSERT_EQ(a.end_key()->sequence, b.end_key()->sequence); + ASSERT_EQ(a.end_key()->type, b.end_key()->type); } else { ASSERT_EQ(b.end_key(), nullptr); } @@ -75,8 +85,7 @@ void VerifyRangeDelIter( RangeDelIterator* range_del_iter, const std::vector& expected_range_dels) { size_t i = 0; - for (; range_del_iter->Valid() && i < expected_range_dels.size(); - range_del_iter->Next(), i++) { + for (; range_del_iter->Valid(); range_del_iter->Next(), i++) { VerifyTombstonesEq(expected_range_dels[i], range_del_iter->Tombstone()); } ASSERT_EQ(expected_range_dels.size(), i); @@ -84,22 +93,26 @@ void VerifyRangeDelIter( } void VerifyRangeDels( - const std::vector& range_dels_in, + const std::vector& all_args, const std::vector& expected_points, const std::vector& expected_collapsed_range_dels, - const InternalKey* smallest = nullptr, - const InternalKey* largest = nullptr) { + const InternalKeyComparator& icmp = bytewise_icmp) { // Test same result regardless of which order the range deletions are added // and regardless of collapsed mode. for (bool collapsed : {false, true}) { for (Direction dir : {kForward, kReverse}) { RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, collapsed); + std::vector all_range_dels; - std::vector range_dels = range_dels_in; - if (dir == kReverse) { - std::reverse(range_dels.begin(), range_dels.end()); + for (const auto& args : all_args) { + std::vector range_dels = args.tombstones; + if (dir == kReverse) { + std::reverse(range_dels.begin(), range_dels.end()); + } + all_range_dels.insert(all_range_dels.end(), range_dels.begin(), + range_dels.end()); + AddTombstones(&range_del_agg, range_dels, args.smallest, args.largest); } - AddTombstones(&range_del_agg, range_dels, smallest, largest); auto mode = RangeDelPositioningMode::kFullScan; if (collapsed) { @@ -111,38 +124,45 @@ void VerifyRangeDels( parsed_key.user_key = expected_point.begin; parsed_key.sequence = expected_point.seq; parsed_key.type = kTypeValue; - ASSERT_FALSE(range_del_agg.ShouldDelete(parsed_key, mode)); + std::string ikey; + AppendInternalKey(&ikey, parsed_key); + ASSERT_FALSE(range_del_agg.ShouldDelete(ikey, mode)); if (parsed_key.sequence > 0) { --parsed_key.sequence; + ikey.clear(); + AppendInternalKey(&ikey, parsed_key); if (expected_point.expectAlive) { - ASSERT_FALSE(range_del_agg.ShouldDelete(parsed_key, mode)); + ASSERT_FALSE(range_del_agg.ShouldDelete(ikey, mode)); } else { - ASSERT_TRUE(range_del_agg.ShouldDelete(parsed_key, mode)); + ASSERT_TRUE(range_del_agg.ShouldDelete(ikey, mode)); } } } if (collapsed) { - range_dels = expected_collapsed_range_dels; - VerifyRangeDelIter(range_del_agg.NewIterator().get(), range_dels); - } else if (smallest == nullptr && largest == nullptr) { + all_range_dels = expected_collapsed_range_dels; + VerifyRangeDelIter(range_del_agg.NewIterator().get(), all_range_dels); + } else if (all_args.size() == 1 && all_args[0].smallest == nullptr && + all_args[0].largest == nullptr) { // Tombstones in an uncollapsed map are presented in start key // order. Tombstones with the same start key are presented in // insertion order. We don't handle tombstone truncation here, so the // verification is only performed if no truncation was requested. - std::stable_sort(range_dels.begin(), range_dels.end(), + std::stable_sort(all_range_dels.begin(), all_range_dels.end(), [&](const RangeTombstone& a, const RangeTombstone& b) { return icmp.user_comparator()->Compare( a.start_key_, b.start_key_) < 0; }); - VerifyRangeDelIter(range_del_agg.NewIterator().get(), range_dels); + VerifyRangeDelIter(range_del_agg.NewIterator().get(), all_range_dels); } } } RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, false /* collapse_deletions */); - AddTombstones(&range_del_agg, range_dels_in); + for (const auto& args : all_args) { + AddTombstones(&range_del_agg, args.tombstones, args.smallest, args.largest); + } for (size_t i = 1; i < expected_points.size(); ++i) { bool overlapped = range_del_agg.IsRangeOverlapped( expected_points[i - 1].begin, expected_points[i].begin); @@ -154,18 +174,11 @@ void VerifyRangeDels( } } -bool ShouldDeleteRange(const std::vector& range_dels, +bool ShouldDeleteRange(const AddTombstonesArgs& range_dels, const ExpectedRange& expected_range) { - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, true); - std::vector keys, values; - for (const auto& range_del : range_dels) { - auto key_and_value = range_del.Serialize(); - keys.push_back(key_and_value.first.Encode().ToString()); - values.push_back(key_and_value.second.ToString()); - } - std::unique_ptr range_del_iter( - new test::VectorIterator(keys, values)); - range_del_agg.AddTombstones(std::move(range_del_iter)); + RangeDelAggregator range_del_agg(bytewise_icmp, {} /* snapshots */, true); + AddTombstones(&range_del_agg, range_dels.tombstones, + range_dels.smallest, range_dels.largest); std::string begin, end; AppendInternalKey(&begin, {expected_range.begin, expected_range.seq, kTypeValue}); @@ -173,22 +186,16 @@ bool ShouldDeleteRange(const std::vector& range_dels, return range_del_agg.ShouldDeleteRange(begin, end, expected_range.seq); } -void VerifyGetTombstone(const std::vector& range_dels, +void VerifyGetTombstone(const AddTombstonesArgs& range_dels, const ExpectedPoint& expected_point, const PartialRangeTombstone& expected_tombstone) { - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, true); + RangeDelAggregator range_del_agg(bytewise_icmp, {} /* snapshots */, true); ASSERT_TRUE(range_del_agg.IsEmpty()); - std::vector keys, values; - for (const auto& range_del : range_dels) { - auto key_and_value = range_del.Serialize(); - keys.push_back(key_and_value.first.Encode().ToString()); - values.push_back(key_and_value.second.ToString()); - } - std::unique_ptr range_del_iter( - new test::VectorIterator(keys, values)); - range_del_agg.AddTombstones(std::move(range_del_iter)); + AddTombstones(&range_del_agg, range_dels.tombstones, + range_dels.smallest, range_dels.largest); - auto tombstone = range_del_agg.GetTombstone(expected_point.begin, expected_point.seq); + auto key = InternalKey(expected_point.begin, kMaxSequenceNumber, kTypeValue); + auto tombstone = range_del_agg.GetTombstone(key.Encode(), expected_point.seq); VerifyPartialTombstonesEq(expected_tombstone, tombstone); } @@ -197,57 +204,64 @@ void VerifyGetTombstone(const std::vector& range_dels, TEST_F(RangeDelAggregatorTest, Empty) { VerifyRangeDels({}, {{"a", 0}}, {}); } TEST_F(RangeDelAggregatorTest, SameStartAndEnd) { - VerifyRangeDels({{"a", "a", 5}}, {{" ", 0}, {"a", 0}, {"b", 0}}, {}); + VerifyRangeDels({{{{"a", "a", 5}}}}, {{" ", 0}, {"a", 0}, {"b", 0}}, {}); } TEST_F(RangeDelAggregatorTest, Single) { - VerifyRangeDels({{"a", "b", 10}}, {{" ", 0}, {"a", 10}, {"b", 0}}, + VerifyRangeDels({{{{"a", "b", 10}}}}, {{" ", 0}, {"a", 10}, {"b", 0}}, {{"a", "b", 10}}); } TEST_F(RangeDelAggregatorTest, OverlapAboveLeft) { - VerifyRangeDels({{"a", "c", 10}, {"b", "d", 5}}, + VerifyRangeDels({{{{"a", "c", 10}, {"b", "d", 5}}}}, {{" ", 0}, {"a", 10}, {"c", 5}, {"d", 0}}, {{"a", "c", 10}, {"c", "d", 5}}); } TEST_F(RangeDelAggregatorTest, OverlapAboveRight) { - VerifyRangeDels({{"a", "c", 5}, {"b", "d", 10}}, + VerifyRangeDels({{{{"a", "c", 5}, {"b", "d", 10}}}}, {{" ", 0}, {"a", 5}, {"b", 10}, {"d", 0}}, {{"a", "b", 5}, {"b", "d", 10}}); } TEST_F(RangeDelAggregatorTest, OverlapAboveMiddle) { - VerifyRangeDels({{"a", "d", 5}, {"b", "c", 10}}, + VerifyRangeDels({{{{"a", "d", 5}, {"b", "c", 10}}}}, {{" ", 0}, {"a", 5}, {"b", 10}, {"c", 5}, {"d", 0}}, {{"a", "b", 5}, {"b", "c", 10}, {"c", "d", 5}}); } +TEST_F(RangeDelAggregatorTest, OverlapAboveMiddleReverse) { + VerifyRangeDels({{{{"d", "a", 5}, {"c", "b", 10}}}}, + {{"z", 0}, {"d", 5}, {"c", 10}, {"b", 5}, {"a", 0}}, + {{"d", "c", 5}, {"c", "b", 10}, {"b", "a", 5}}, + InternalKeyComparator(ReverseBytewiseComparator())); +} + TEST_F(RangeDelAggregatorTest, OverlapFully) { - VerifyRangeDels({{"a", "d", 10}, {"b", "c", 5}}, + VerifyRangeDels({{{{"a", "d", 10}, {"b", "c", 5}}}}, {{" ", 0}, {"a", 10}, {"d", 0}}, {{"a", "d", 10}}); } TEST_F(RangeDelAggregatorTest, OverlapPoint) { - VerifyRangeDels({{"a", "b", 5}, {"b", "c", 10}}, + VerifyRangeDels({{{{"a", "b", 5}, {"b", "c", 10}}}}, {{" ", 0}, {"a", 5}, {"b", 10}, {"c", 0}}, {{"a", "b", 5}, {"b", "c", 10}}); } TEST_F(RangeDelAggregatorTest, SameStartKey) { - VerifyRangeDels({{"a", "c", 5}, {"a", "b", 10}}, + VerifyRangeDels({{{{"a", "c", 5}, {"a", "b", 10}}}}, {{" ", 0}, {"a", 10}, {"b", 5}, {"c", 0}}, {{"a", "b", 10}, {"b", "c", 5}}); } TEST_F(RangeDelAggregatorTest, SameEndKey) { - VerifyRangeDels({{"a", "d", 5}, {"b", "d", 10}}, + VerifyRangeDels({{{{"a", "d", 5}, {"b", "d", 10}}}}, {{" ", 0}, {"a", 5}, {"b", 10}, {"d", 0}}, {{"a", "b", 5}, {"b", "d", 10}}); } TEST_F(RangeDelAggregatorTest, GapsBetweenRanges) { - VerifyRangeDels({{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}, + VerifyRangeDels({{{{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}}}, {{" ", 0}, {"a", 5}, {"b", 0}, @@ -260,25 +274,25 @@ TEST_F(RangeDelAggregatorTest, GapsBetweenRanges) { } TEST_F(RangeDelAggregatorTest, IdenticalSameSeqNo) { - VerifyRangeDels({{"a", "b", 5}, {"a", "b", 5}}, + VerifyRangeDels({{{{"a", "b", 5}, {"a", "b", 5}}}}, {{" ", 0}, {"a", 5}, {"b", 0}}, {{"a", "b", 5}}); } TEST_F(RangeDelAggregatorTest, ContiguousSameSeqNo) { - VerifyRangeDels({{"a", "b", 5}, {"b", "c", 5}}, + VerifyRangeDels({{{{"a", "b", 5}, {"b", "c", 5}}}}, {{" ", 0}, {"a", 5}, {"b", 5}, {"c", 0}}, {{"a", "c", 5}}); } TEST_F(RangeDelAggregatorTest, OverlappingSameSeqNo) { - VerifyRangeDels({{"a", "c", 5}, {"b", "d", 5}}, + VerifyRangeDels({{{{"a", "c", 5}, {"b", "d", 5}}}}, {{" ", 0}, {"a", 5}, {"b", 5}, {"c", 5}, {"d", 0}}, {{"a", "d", 5}}); } TEST_F(RangeDelAggregatorTest, CoverSameSeqNo) { - VerifyRangeDels({{"a", "d", 5}, {"b", "c", 5}}, + VerifyRangeDels({{{{"a", "d", 5}, {"b", "c", 5}}}}, {{" ", 0}, {"a", 5}, {"b", 5}, {"c", 5}, {"d", 0}}, {{"a", "d", 5}}); } @@ -287,27 +301,27 @@ TEST_F(RangeDelAggregatorTest, CoverSameSeqNo) { // larger one when VerifyRangeDels() runs them in reverse TEST_F(RangeDelAggregatorTest, CoverMultipleFromLeft) { VerifyRangeDels( - {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"a", "f", 20}}, + {{{{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"a", "f", 20}}}}, {{" ", 0}, {"a", 20}, {"f", 15}, {"g", 0}}, {{"a", "f", 20}, {"f", "g", 15}}); } TEST_F(RangeDelAggregatorTest, CoverMultipleFromRight) { VerifyRangeDels( - {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"c", "h", 20}}, + {{{{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"c", "h", 20}}}}, {{" ", 0}, {"b", 5}, {"c", 20}, {"h", 0}}, {{"b", "c", 5}, {"c", "h", 20}}); } TEST_F(RangeDelAggregatorTest, CoverMultipleFully) { VerifyRangeDels( - {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"a", "h", 20}}, + {{{{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"a", "h", 20}}}}, {{" ", 0}, {"a", 20}, {"h", 0}}, {{"a", "h", 20}}); } TEST_F(RangeDelAggregatorTest, AlternateMultipleAboveBelow) { VerifyRangeDels( - {{"b", "d", 15}, {"c", "f", 10}, {"e", "g", 20}, {"a", "h", 5}}, + {{{{"b", "d", 15}, {"c", "f", 10}, {"e", "g", 20}, {"a", "h", 5}}}}, {{" ", 0}, {"a", 5}, {"b", 15}, {"d", 10}, {"e", 20}, {"g", 5}, {"h", 0}}, {{"a", "b", 5}, {"b", "d", 15}, @@ -318,14 +332,14 @@ TEST_F(RangeDelAggregatorTest, AlternateMultipleAboveBelow) { TEST_F(RangeDelAggregatorTest, MergingIteratorAllEmptyStripes) { for (bool collapsed : {true, false}) { - RangeDelAggregator range_del_agg(icmp, {1, 2}, collapsed); + RangeDelAggregator range_del_agg(bytewise_icmp, {1, 2}, collapsed); VerifyRangeDelIter(range_del_agg.NewIterator().get(), {}); } } TEST_F(RangeDelAggregatorTest, MergingIteratorOverlappingStripes) { for (bool collapsed : {true, false}) { - RangeDelAggregator range_del_agg(icmp, {5, 15, 25, 35}, collapsed); + RangeDelAggregator range_del_agg(bytewise_icmp, {5, 15, 25, 35}, collapsed); AddTombstones( &range_del_agg, {{"d", "e", 10}, {"aa", "b", 20}, {"c", "d", 30}, {"a", "b", 10}}); @@ -336,7 +350,8 @@ TEST_F(RangeDelAggregatorTest, MergingIteratorOverlappingStripes) { } TEST_F(RangeDelAggregatorTest, MergingIteratorSeek) { - RangeDelAggregator range_del_agg(icmp, {5, 15}, true /* collapsed */); + RangeDelAggregator range_del_agg(bytewise_icmp, {5, 15}, + true /* collapsed */); AddTombstones(&range_del_agg, {{"a", "c", 10}, {"b", "c", 11}, {"f", "g", 10}, @@ -371,120 +386,260 @@ TEST_F(RangeDelAggregatorTest, MergingIteratorSeek) { } TEST_F(RangeDelAggregatorTest, ShouldDeleteRange) { + const InternalKey b8("b", 8, kTypeValue); + const InternalKey b9("b", 9, kTypeValue); + const InternalKey b10("b", 10, kTypeValue); + const InternalKey c9("c", 9, kTypeValue); + const InternalKey c10("c", 10, kTypeValue); + ASSERT_TRUE(ShouldDeleteRange( - {{"a", "c", 10}}, + {{{"a", "c", 10}}}, {"a", "b", 9})); ASSERT_TRUE(ShouldDeleteRange( - {{"a", "c", 10}}, + {{{"a", "c", 10}}, nullptr, &b9}, + {"a", "b", 9})); + ASSERT_FALSE(ShouldDeleteRange( + {{{"a", "c", 10}}, nullptr, &b10}, + {"a", "b", 9})); + ASSERT_TRUE(ShouldDeleteRange( + {{{"a", "c", 10}}}, {"a", "a", 9})); ASSERT_FALSE(ShouldDeleteRange( - {{"a", "c", 10}}, + {{{"a", "c", 10}}}, {"b", "a", 9})); ASSERT_FALSE(ShouldDeleteRange( - {{"a", "c", 10}}, + {{{"a", "c", 10}}}, {"a", "b", 10})); ASSERT_FALSE(ShouldDeleteRange( - {{"a", "c", 10}}, + {{{"a", "c", 10}}}, {"a", "c", 9})); ASSERT_FALSE(ShouldDeleteRange( - {{"b", "c", 10}}, + {{{"b", "c", 10}}}, {"a", "b", 9})); ASSERT_TRUE(ShouldDeleteRange( - {{"a", "b", 10}, {"b", "d", 20}}, + {{{"a", "b", 10}, {"b", "d", 20}}}, + {"a", "c", 9})); + ASSERT_TRUE(ShouldDeleteRange( + {{{"a", "b", 10}, {"b", "d", 20}}, nullptr, &c9}, {"a", "c", 9})); ASSERT_FALSE(ShouldDeleteRange( - {{"a", "b", 10}, {"b", "d", 20}}, + {{{"a", "b", 10}, {"b", "d", 20}}, nullptr, &c10}, + {"a", "c", 9})); + ASSERT_TRUE(ShouldDeleteRange( + {{{"a", "b", 10}, {"b", "d", 20}}, &b9, nullptr}, + {"b", "c", 9})); + ASSERT_FALSE(ShouldDeleteRange( + {{{"a", "b", 10}, {"b", "d", 20}}, &b8, nullptr}, + {"b", "c", 9})); + ASSERT_FALSE(ShouldDeleteRange( + {{{"a", "b", 10}, {"b", "d", 20}}}, {"a", "c", 15})); ASSERT_FALSE(ShouldDeleteRange( - {{"a", "b", 10}, {"c", "e", 20}}, + {{{"a", "b", 10}, {"c", "e", 20}}}, {"a", "d", 9})); ASSERT_TRUE(ShouldDeleteRange( - {{"a", "b", 10}, {"c", "e", 20}}, + {{{"a", "b", 10}, {"c", "e", 20}}}, {"c", "d", 15})); ASSERT_FALSE(ShouldDeleteRange( - {{"a", "b", 10}, {"c", "e", 20}}, + {{{"a", "b", 10}, {"c", "e", 20}}}, {"c", "d", 20})); } TEST_F(RangeDelAggregatorTest, GetTombstone) { - Slice a = "a", b = "b", c = "c", d = "d", e = "e", h = "h"; - VerifyGetTombstone({{"b", "d", 10}}, {"b", 9}, + const ParsedInternalKey a = {"a", kMaxSequenceNumber, kMaxValue}; + const ParsedInternalKey b = {"b", kMaxSequenceNumber, kMaxValue}; + const ParsedInternalKey b10 = {"b", 10, kMaxValue}; + const InternalKey ib10("b", 10, kTypeValue); + const ParsedInternalKey c = {"c", kMaxSequenceNumber, kMaxValue}; + const ParsedInternalKey c8 = {"c", 8, kMaxValue}; + const InternalKey ic9("c", 9, kTypeValue); + const ParsedInternalKey d = {"d", kMaxSequenceNumber, kMaxValue}; + const ParsedInternalKey e = {"e", kMaxSequenceNumber, kMaxValue}; + const ParsedInternalKey h = {"h", kMaxSequenceNumber, kMaxValue}; + VerifyGetTombstone({{{"b", "d", 10}}}, {"b", 9}, PartialRangeTombstone(&b, &d, 10)); - VerifyGetTombstone({{"b", "d", 10}}, {"b", 10}, + VerifyGetTombstone({{{"b", "d", 10}}, nullptr, &ic9}, {"b", 9}, + PartialRangeTombstone(&b, &c8, 10)); + VerifyGetTombstone({{{"a", "d", 10}}, &ib10, nullptr}, {"c", 9}, + PartialRangeTombstone(&b10, &d, 10)); + VerifyGetTombstone({{{"b", "d", 10}}}, {"b", 10}, PartialRangeTombstone(&b, &d, 0)); - VerifyGetTombstone({{"b", "d", 10}}, {"b", 20}, + VerifyGetTombstone({{{"b", "d", 10}}}, {"b", 20}, PartialRangeTombstone(&b, &d, 0)); - VerifyGetTombstone({{"b", "d", 10}}, {"a", 9}, + VerifyGetTombstone({{{"b", "d", 10}}}, {"a", 9}, PartialRangeTombstone(nullptr, &b, 0)); - VerifyGetTombstone({{"b", "d", 10}}, {"d", 9}, + VerifyGetTombstone({{{"b", "d", 10}}}, {"d", 9}, PartialRangeTombstone(&d, nullptr, 0)); - VerifyGetTombstone({{"a", "c", 10}, {"e", "h", 20}}, {"d", 9}, + VerifyGetTombstone({{{"a", "c", 10}, {"e", "h", 20}}}, {"d", 9}, PartialRangeTombstone(&c, &e, 0)); - VerifyGetTombstone({{"a", "c", 10}, {"e", "h", 20}}, {"b", 9}, + VerifyGetTombstone({{{"a", "c", 10}, {"e", "h", 20}}}, {"b", 9}, PartialRangeTombstone(&a, &c, 10)); - VerifyGetTombstone({{"a", "c", 10}, {"e", "h", 20}}, {"b", 10}, + VerifyGetTombstone({{{"a", "c", 10}, {"e", "h", 20}}}, {"b", 10}, PartialRangeTombstone(&a, &c, 0)); - VerifyGetTombstone({{"a", "c", 10}, {"e", "h", 20}}, {"e", 19}, + VerifyGetTombstone({{{"a", "c", 10}, {"e", "h", 20}}}, {"e", 19}, PartialRangeTombstone(&e, &h, 20)); - VerifyGetTombstone({{"a", "c", 10}, {"e", "h", 20}}, {"e", 20}, + VerifyGetTombstone({{{"a", "c", 10}, {"e", "h", 20}}}, {"e", 20}, PartialRangeTombstone(&e, &h, 0)); } TEST_F(RangeDelAggregatorTest, AddGetTombstoneInterleaved) { - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, + RangeDelAggregator range_del_agg(bytewise_icmp, {} /* snapshots */, true /* collapsed */); AddTombstones(&range_del_agg, {{"b", "c", 10}}); - auto tombstone = range_del_agg.GetTombstone("b", 5); + auto key = InternalKey("b", kMaxSequenceNumber, kTypeValue); + auto tombstone = range_del_agg.GetTombstone(key.Encode(), 5); AddTombstones(&range_del_agg, {{"a", "d", 20}}); - Slice b = "b", c = "c"; + ParsedInternalKey b = {"b", kMaxSequenceNumber, kMaxValue}; + ParsedInternalKey c {"c", kMaxSequenceNumber, kMaxValue}; VerifyPartialTombstonesEq(PartialRangeTombstone(&b, &c, 10), tombstone); } TEST_F(RangeDelAggregatorTest, TruncateTombstones) { - const InternalKey smallest("b", 1, kTypeRangeDeletion); + const InternalKey smallest("b", kMaxSequenceNumber, kTypeRangeDeletion); const InternalKey largest("e", kMaxSequenceNumber, kTypeRangeDeletion); VerifyRangeDels( - {{"a", "c", 10}, {"d", "f", 10}}, + {{{{"a", "c", 10}, {"d", "f", 10}}, &smallest, &largest}}, {{"a", 10, true}, // truncated {"b", 10, false}, // not truncated {"d", 10, false}, // not truncated {"e", 10, true}}, // truncated - {{"b", "c", 10}, {"d", "e", 10}}, - &smallest, &largest); + {{"b", "c", 10}, {"d", "e", 10}}); } TEST_F(RangeDelAggregatorTest, IsEmpty) { const std::vector snapshots; RangeDelAggregator range_del_agg1( - icmp, snapshots, false /* collapse_deletions */); + bytewise_icmp, snapshots, false /* collapse_deletions */); ASSERT_TRUE(range_del_agg1.IsEmpty()); RangeDelAggregator range_del_agg2( - icmp, snapshots, true /* collapse_deletions */); + bytewise_icmp, snapshots, true /* collapse_deletions */); ASSERT_TRUE(range_del_agg2.IsEmpty()); RangeDelAggregator range_del_agg3( - icmp, kMaxSequenceNumber, false /* collapse_deletions */); + bytewise_icmp, kMaxSequenceNumber, false /* collapse_deletions */); ASSERT_TRUE(range_del_agg3.IsEmpty()); RangeDelAggregator range_del_agg4( - icmp, kMaxSequenceNumber, true /* collapse_deletions */); + bytewise_icmp, kMaxSequenceNumber, true /* collapse_deletions */); ASSERT_TRUE(range_del_agg4.IsEmpty()); } -TEST_F(RangeDelAggregatorTest, OverlappingLargestKeyTruncateTombstones) { - const InternalKey smallest("b", 1, kTypeRangeDeletion); +TEST_F(RangeDelAggregatorTest, OverlappingLargestKeyTruncateBelowTombstone) { + const InternalKey smallest("b", kMaxSequenceNumber, kTypeRangeDeletion); const InternalKey largest( "e", 3, // could happen if "e" is in consecutive sstables kTypeValue); VerifyRangeDels( - {{"a", "c", 10}, {"d", "f", 10}}, + {{{{"a", "c", 10}, {"d", "f", 10}}, &smallest, &largest}}, + {{"a", 10, true}, // truncated + {"b", 10, false}, // not truncated + {"d", 10, false}, // not truncated + {"e", 10, false}, // not truncated + {"e", 2, true}}, // truncated here + {{"b", "c", 10}, {"d", "e", 10}}); +} + +TEST_F(RangeDelAggregatorTest, OverlappingLargestKeyTruncateAboveTombstone) { + const InternalKey smallest("b", kMaxSequenceNumber, kTypeRangeDeletion); + const InternalKey largest( + "e", 15, // could happen if "e" is in consecutive sstables + kTypeValue); + VerifyRangeDels( + {{{{"a", "c", 10}, {"d", "f", 10}}, &smallest, &largest}}, {{"a", 10, true}, // truncated {"b", 10, false}, // not truncated {"d", 10, false}, // not truncated - {"e", 10, false}}, // not truncated - {{"b", "c", 10}, {"d", "f", 10}}, - &smallest, &largest); + {"e", kMaxSequenceNumber, true}}, // truncated + {{"b", "c", 10}, {"d", "e", 10}}); +} + +TEST_F(RangeDelAggregatorTest, OverlappingSmallestKeyTruncateBelowTombstone) { + const InternalKey smallest("b", 5, kTypeValue); + const InternalKey largest("e", kMaxSequenceNumber, kTypeRangeDeletion); + VerifyRangeDels( + {{{{"a", "c", 10}, {"d", "f", 10}}, &smallest, &largest}}, + {{"a", 10, true}, // truncated + {"b", 10, true}, // truncated + {"b", 6, false}, // not truncated; start boundary moved + {"d", 10, false}, // not truncated + {"e", kMaxSequenceNumber, true}}, // truncated + {{"b", "c", 10}, {"d", "e", 10}}); +} + +TEST_F(RangeDelAggregatorTest, OverlappingSmallestKeyTruncateAboveTombstone) { + const InternalKey smallest("b", 15, kTypeValue); + const InternalKey largest("e", kMaxSequenceNumber, kTypeRangeDeletion); + VerifyRangeDels( + {{{{"a", "c", 10}, {"d", "f", 10}}, &smallest, &largest}}, + {{"a", 10, true}, // truncated + {"b", 15, true}, // truncated + {"b", 10, false}, // not truncated + {"d", 10, false}, // not truncated + {"e", kMaxSequenceNumber, true}}, // truncated + {{"b", "c", 10}, {"d", "e", 10}}); +} + +TEST_F(RangeDelAggregatorTest, OverlappingBoundaryGapAboveTombstone) { + const InternalKey smallest1("b", kMaxSequenceNumber, kTypeRangeDeletion); + const InternalKey largest1("c", 20, kTypeValue); + const InternalKey smallest2("c", 10, kTypeValue); + const InternalKey largest2("e", kMaxSequenceNumber, kTypeRangeDeletion); + VerifyRangeDels( + {{{{"b", "d", 5}}, &smallest1, &largest1}, + {{{"b", "d", 5}}, &smallest2, &largest2}}, + {{"b", 5, false}, // not truncated + {"c", 5, false}}, // not truncated + {{"b", "c", 5}, {"c", "d", 5}}); // not collapsed due to boundaries +} + +TEST_F(RangeDelAggregatorTest, OverlappingBoundaryGapBelowTombstone) { + const InternalKey smallest1("b", kMaxSequenceNumber, kTypeRangeDeletion); + const InternalKey largest1("c", 20, kTypeValue); + const InternalKey smallest2("c", 10, kTypeValue); + const InternalKey largest2("e", kMaxSequenceNumber, kTypeRangeDeletion); + VerifyRangeDels( + {{{{"b", "d", 30}}, &smallest1, &largest1}, + {{{"b", "d", 30}}, &smallest2, &largest2}}, + {{"b", 30, false}, // not truncated + {"c", 30, false}, // not truncated + {"c", 19, true}, // truncated here (keys in this range should not exist) + {"c", 11, false}}, // not truncated again + {{"b", "c", 30}, {"c", "d", 30}}); // not collapsed due to boundaries +} + +TEST_F(RangeDelAggregatorTest, OverlappingBoundaryGapContainsTombstone) { + const InternalKey smallest1("b", kMaxSequenceNumber, kTypeRangeDeletion); + const InternalKey largest1("c", 20, kTypeValue); + const InternalKey smallest2("c", 10, kTypeValue); + const InternalKey largest2("e", kMaxSequenceNumber, kTypeRangeDeletion); + VerifyRangeDels( + {{{{"b", "d", 15}}, &smallest1, &largest1}, + {{{"b", "d", 15}}, &smallest2, &largest2}}, + {{"b", 15, false}, // not truncated + {"c", 15, true}, // truncated (keys in this range should not exist) + {"c", 11, false}}, // not truncated here + {{"b", "c", 15}, {"c", "d", 15}}); // not collapsed due to boundaries +} + +TEST_F(RangeDelAggregatorTest, FileCoversOneKeyAndTombstoneAbove) { + const InternalKey smallest("a", kMaxSequenceNumber, kTypeRangeDeletion); + const InternalKey largest("a", 20, kTypeValue); + VerifyRangeDels( + {{{{"a", "b", 35}}, &smallest, &largest}}, + {{"a", 40, true}, // not truncated + {"a", 35, false}}, // not truncated + {{"a", "a", 35}}); // empty tombstone but can't occur during a compaction +} + +TEST_F(RangeDelAggregatorTest, FileCoversOneKeyAndTombstoneBelow) { + const InternalKey smallest("a", kMaxSequenceNumber, kTypeRangeDeletion); + const InternalKey largest("a", 20, kTypeValue); + VerifyRangeDels( + {{{{"a", "b", 15}}, &smallest, &largest}}, + {{"a", 20, true}, // truncated here + {"a", 15, true}}, // truncated + {{"a", "a", 15}}); // empty tombstone but can't occur during a compaction } } // namespace rocksdb diff --git a/db/table_cache.cc b/db/table_cache.cc index dfacd31db0c..47e09bd2d42 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -173,7 +173,8 @@ InternalIterator* TableCache::NewIterator( const InternalKeyComparator& icomparator, const FileMetaData& file_meta, RangeDelAggregator* range_del_agg, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, bool for_compaction, Arena* arena, - bool skip_filters, int level) { + bool skip_filters, int level, const InternalKey* smallest_compaction_key, + const InternalKey* largest_compaction_key) { PERF_TIMER_GUARD(new_table_iterator_nanos); Status s; @@ -257,10 +258,16 @@ InternalIterator* TableCache::NewIterator( s = range_del_iter->status(); } if (s.ok()) { - s = range_del_agg->AddTombstones( - std::move(range_del_iter), - &file_meta.smallest, - &file_meta.largest); + const InternalKey* smallest = &file_meta.smallest; + const InternalKey* largest = &file_meta.largest; + if (smallest_compaction_key != nullptr) { + smallest = smallest_compaction_key; + } + if (largest_compaction_key != nullptr) { + largest = largest_compaction_key; + } + s = range_del_agg->AddTombstones(std::move(range_del_iter), smallest, + largest); } } } diff --git a/db/table_cache.h b/db/table_cache.h index 9fe8c227b70..52eecb194ad 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -56,7 +56,9 @@ class TableCache { const FileMetaData& file_meta, RangeDelAggregator* range_del_agg, TableReader** table_reader_ptr = nullptr, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, - Arena* arena = nullptr, bool skip_filters = false, int level = -1); + Arena* arena = nullptr, bool skip_filters = false, int level = -1, + const InternalKey* smallest_compaction_key = nullptr, + const InternalKey* largest_compaction_key = nullptr); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until diff --git a/db/version_set.cc b/db/version_set.cc index f22aafcd38f..475df587cd0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -451,6 +451,7 @@ bool SomeFileOverlapsRange( } namespace { + class LevelIterator final : public InternalIterator { public: LevelIterator(TableCache* table_cache, const ReadOptions& read_options, @@ -458,7 +459,9 @@ class LevelIterator final : public InternalIterator { const InternalKeyComparator& icomparator, const LevelFilesBrief* flevel, bool should_sample, HistogramImpl* file_read_hist, bool for_compaction, - bool skip_filters, int level, RangeDelAggregator* range_del_agg) + bool skip_filters, int level, RangeDelAggregator* range_del_agg, + const std::vector* + compaction_boundaries = nullptr) : table_cache_(table_cache), read_options_(read_options), env_options_(env_options), @@ -471,7 +474,8 @@ class LevelIterator final : public InternalIterator { file_index_(flevel_->num_files), level_(level), range_del_agg_(range_del_agg), - pinned_iters_mgr_(nullptr) { + pinned_iters_mgr_(nullptr), + compaction_boundaries_(compaction_boundaries) { // Empty level is not supported. assert(flevel_ != nullptr && flevel_->num_files > 0); } @@ -552,11 +556,18 @@ class LevelIterator final : public InternalIterator { sample_file_read_inc(file_meta.file_metadata); } + const InternalKey* smallest_compaction_key = nullptr; + const InternalKey* largest_compaction_key = nullptr; + if (compaction_boundaries_ != nullptr) { + smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest; + largest_compaction_key = (*compaction_boundaries_)[file_index_].largest; + } return table_cache_->NewIterator( read_options_, env_options_, icomparator_, *file_meta.file_metadata, range_del_agg_, nullptr /* don't need reference to table */, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_, level_); + for_compaction_, nullptr /* arena */, skip_filters_, level_, + smallest_compaction_key, largest_compaction_key); } TableCache* table_cache_; @@ -576,6 +587,10 @@ class LevelIterator final : public InternalIterator { IteratorWrapper file_iter_; // May be nullptr PinnedIteratorsManager* pinned_iters_mgr_; Status status_; + + // To be propagated to RangeDelAggregator in order to safely truncate range + // tombstones. + const std::vector* compaction_boundaries_; }; void LevelIterator::Seek(const Slice& target) { @@ -2119,60 +2134,6 @@ void VersionStorageInfo::GetCleanInputsWithinInterval( true /* within_interval */); } -namespace { - -const uint64_t kRangeTombstoneSentinel = - PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion); - -// Utility for comparing sstable boundary keys. Returns -1 if either a or b is -// null which provides the property that a==null indicates a key that is less -// than any key and b==null indicates a key that is greater than any key. Note -// that the comparison is performed primarily on the user-key portion of the -// key. If the user-keys compare equal, an additional test is made to sort -// range tombstone sentinel keys before other keys with the same user-key. The -// result is that 2 user-keys will compare equal if they differ purely on -// their sequence number and value, but the range tombstone sentinel for that -// user-key will compare not equal. This is necessary because the range -// tombstone sentinel key is set as the largest key for an sstable even though -// that key never appears in the database. We don't want adjacent sstables to -// be considered overlapping if they are separated by the range tombstone -// sentinel. -int sstableKeyCompare(const Comparator* user_cmp, - const InternalKey& a, const InternalKey& b) { - auto c = user_cmp->Compare(a.user_key(), b.user_key()); - if (c != 0) { - return c; - } - auto a_footer = ExtractInternalKeyFooter(a.Encode()); - auto b_footer = ExtractInternalKeyFooter(b.Encode()); - if (a_footer == kRangeTombstoneSentinel) { - if (b_footer != kRangeTombstoneSentinel) { - return -1; - } - } else if (b_footer == kRangeTombstoneSentinel) { - return 1; - } - return 0; -} - -int sstableKeyCompare(const Comparator* user_cmp, - const InternalKey* a, const InternalKey& b) { - if (a == nullptr) { - return -1; - } - return sstableKeyCompare(user_cmp, *a, b); -} - -int sstableKeyCompare(const Comparator* user_cmp, - const InternalKey& a, const InternalKey* b) { - if (b == nullptr) { - return -1; - } - return sstableKeyCompare(user_cmp, a, *b); -} - -} // namespace - // Store in "*inputs" all files in "level" that overlap [begin,end] // Employ binary search to find at least one file that overlaps the // specified range. From that file, iterate backwards and @@ -4033,7 +3994,7 @@ InternalIterator* VersionSet::MakeInputIterator( false /* should_sample */, nullptr /* no per level latency histogram */, true /* for_compaction */, false /* skip_filters */, - (int)which /* level */, range_del_agg); + (int)which /* level */, range_del_agg, c->boundaries(which)); } } } diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index b9eb1035fb2..907c9daf20c 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -210,6 +210,15 @@ class LDBCommand { bool ParseStringOption(const std::map& options, const std::string& option, std::string* value); + /** + * Returns the value of the specified option as a boolean. + * default_val is used if the option is not found in options. + * Throws an exception if the value of the option is not + * "true" or "false" (case insensitive). + */ + bool ParseBooleanOption(const std::map& options, + const std::string& option, bool default_val); + Options options_; std::vector column_families_; LDBOptions ldb_options_; @@ -229,15 +238,6 @@ class LDBCommand { bool IsValueHex(const std::map& options, const std::vector& flags); - /** - * Returns the value of the specified option as a boolean. - * default_val is used if the option is not found in options. - * Throws an exception if the value of the option is not - * "true" or "false" (case insensitive). - */ - bool ParseBooleanOption(const std::map& options, - const std::string& option, bool default_val); - /** * Converts val to a boolean. * val must be either true or false (case insensitive). diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 8e99ecca2fe..504e2e5aeb5 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1787,7 +1787,7 @@ void BlockBasedTableIterator::Seek(const Slice& const_target) { } // Before we seek the iterator, find the next non-deleted key. - InitRangeTombstone(ExtractUserKey(target)); + InitRangeTombstone(target); std::string tmp_target; if (range_tombstone_.seq() > 0) { tmp_target = tombstone_internal_end_key(); @@ -1820,7 +1820,7 @@ void BlockBasedTableIterator::SeekForPrev(const Slice& const_target) { } // Before we seek the iterator, find the previous non-deleted key. - InitRangeTombstone(ExtractUserKey(target)); + InitRangeTombstone(target); std::string tmp_target; if (range_tombstone_.seq() > 0) { tmp_target = tombstone_internal_start_key(); @@ -1872,7 +1872,7 @@ void BlockBasedTableIterator::SeekToFirst() { InitDataBlock(); data_block_iter_.SeekToFirst(); if (Valid()) { - InitRangeTombstone(user_key()); + InitRangeTombstone(key()); } FindKeyForward(); } @@ -1887,7 +1887,7 @@ void BlockBasedTableIterator::SeekToLast() { InitDataBlock(); data_block_iter_.SeekToLast(); if (Valid()) { - InitRangeTombstone(user_key()); + InitRangeTombstone(key()); } FindKeyBackward(); } @@ -2006,14 +2006,18 @@ void BlockBasedTableIterator::FindKeyForward() { return; } - auto ukey = user_key(); - if (icomp_.user_comparator()->Compare(ukey, *range_tombstone_.end_key()) >= + ParsedInternalKey parsed; + if (!ParseInternalKey(key(), &parsed)) { + assert(false); + return; + } + if (icomp_.Compare(parsed, *range_tombstone_.end_key()) >= 0) { // The key is past the tombstone. Grab the tombstone covering the // current key. The new tombstone might cover the existing key, so loop // so that we can have the proper check for whether the tombstone covers // the key and is a deletion tombstone or not. - InitRangeTombstone(ukey); + InitRangeTombstone(key()); continue; } // The key is contained within the current tombstone. @@ -2032,7 +2036,7 @@ void BlockBasedTableIterator::FindKeyForward() { InitDataBlock(); data_block_iter_.Seek(tombstone_internal_end_key()); if (Valid()) { - InitRangeTombstone(user_key()); + InitRangeTombstone(key()); } } } @@ -2082,14 +2086,18 @@ void BlockBasedTableIterator::FindKeyBackward() { return; } - auto ukey = user_key(); - if (icomp_.user_comparator()->Compare(ukey, *range_tombstone_.start_key()) < + ParsedInternalKey parsed; + if (!ParseInternalKey(key(), &parsed)) { + assert(false); + return; + } + if (icomp_.Compare(parsed, *range_tombstone_.start_key()) < 0) { // The key is past the tombstone. Grab the tombstone covering the // current key. The new tombstone might cover the existing key, so loop // so that we can have the proper check for whether the tombstone covers // the key and is a deletion tombstone or not. - InitRangeTombstone(ukey); + InitRangeTombstone(key()); continue; } // The key is contained within the current tombstone. @@ -2112,7 +2120,7 @@ void BlockBasedTableIterator::FindKeyBackward() { InitDataBlock(); data_block_iter_.SeekForPrev(tombstone_internal_start_key()); if (Valid()) { - InitRangeTombstone(user_key()); + InitRangeTombstone(key()); } } } @@ -2127,18 +2135,22 @@ void BlockBasedTableIterator::InitRangeTombstone(const Slice& target) { // Clear the start key if it is less than the smallest key in the // sstable. This allows us to avoid comparisons during Prev() in the common // case. - if (range_tombstone_.start_key() != nullptr && - icomp_.user_comparator()->Compare(*range_tombstone_.start_key(), - file_meta_->smallest.user_key()) < 0) { - range_tombstone_.SetStartKey(nullptr); + if (range_tombstone_.start_key() != nullptr) { + ParsedInternalKey smallest; + if (!ParseInternalKey(file_meta_->smallest.Encode(), &smallest) || + (icomp_.Compare(*range_tombstone_.start_key(), smallest) < 0)) { + range_tombstone_.SetStartKey(nullptr); + } } // Clear the end key if it is larger than the largest key in the // sstable. This allows us to avoid comparisons during Next() in the common // case. - if (range_tombstone_.end_key() != nullptr && - icomp_.user_comparator()->Compare(*range_tombstone_.end_key(), - file_meta_->largest.user_key()) > 0) { - range_tombstone_.SetEndKey(nullptr); + if (range_tombstone_.end_key() != nullptr) { + ParsedInternalKey largest; + if (!ParseInternalKey(file_meta_->largest.Encode(), &largest) || + (icomp_.Compare(*range_tombstone_.end_key(), largest) > 0)) { + range_tombstone_.SetEndKey(nullptr); + } } } @@ -2148,8 +2160,8 @@ std::string BlockBasedTableIterator::tombstone_internal_start_key() const { AppendInternalKey(&internal_key, {file_meta_->smallest.user_key(), range_tombstone_.seq(), kTypeValue}); } else { - AppendInternalKey(&internal_key, {*range_tombstone_.start_key(), - range_tombstone_.seq(), kTypeValue}); + AppendInternalKey(&internal_key, { range_tombstone_.start_key()->user_key, + range_tombstone_.seq(), kTypeValue }); } return internal_key; } @@ -2165,8 +2177,8 @@ std::string BlockBasedTableIterator::tombstone_internal_end_key() const { AppendInternalKey(&internal_key, {file_meta_->largest.user_key(), kMaxSequenceNumber, kTypeValue}); } else { - AppendInternalKey(&internal_key, {*range_tombstone_.end_key(), - kMaxSequenceNumber, kTypeValue}); + AppendInternalKey(&internal_key, { range_tombstone_.end_key()->user_key, + kMaxSequenceNumber, kTypeValue }); } return internal_key; } diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index a22e6135908..59aefccd22d 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -242,6 +242,14 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { } else if (parsed_params.cmd == RestoreCommand::Name()) { return new RestoreCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); + } else if (parsed_params.cmd == WriteExternalSstFilesCommand::Name()) { + return new WriteExternalSstFilesCommand(parsed_params.cmd_params, + parsed_params.option_map, + parsed_params.flags); + } else if (parsed_params.cmd == IngestExternalSstFilesCommand::Name()) { + return new IngestExternalSstFilesCommand(parsed_params.cmd_params, + parsed_params.option_map, + parsed_params.flags); } return nullptr; } @@ -2917,5 +2925,201 @@ void DBFileDumperCommand::DoCommand() { } } +void WriteExternalSstFilesCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(WriteExternalSstFilesCommand::Name()); + ret.append(" "); + ret.append("\n"); +} + +WriteExternalSstFilesCommand::WriteExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags) + : LDBCommand( + options, flags, false /* is_read_only */, + BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX, ARG_FROM, + ARG_TO, ARG_CREATE_IF_MISSING})) { + create_if_missing_ = + IsFlagPresent(flags, ARG_CREATE_IF_MISSING) || + ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false); + if (params.size() != 1) { + exec_state_ = LDBCommandExecuteResult::Failed( + "output SST file path must be specified"); + } else { + output_sst_path_ = params.at(0); + } +} + +void WriteExternalSstFilesCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + ColumnFamilyHandle* cfh = GetCfHandle(); + SstFileWriter sst_file_writer(EnvOptions(), db_->GetOptions(), cfh); + Status status = sst_file_writer.Open(output_sst_path_); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed("failed to open SST file: " + + status.ToString()); + return; + } + + int bad_lines = 0; + std::string line; + std::ifstream ifs_stdin("/dev/stdin"); + std::istream* istream_p = ifs_stdin.is_open() ? &ifs_stdin : &std::cin; + while (getline(*istream_p, line, '\n')) { + std::string key; + std::string value; + if (ParseKeyValue(line, &key, &value, is_key_hex_, is_value_hex_)) { + status = sst_file_writer.Put(key, value); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "failed to write record to file: " + status.ToString()); + return; + } + } else if (0 == line.find("Keys in range:")) { + // ignore this line + } else if (0 == line.find("Created bg thread 0x")) { + // ignore this line + } else { + bad_lines++; + } + } + + status = sst_file_writer.Finish(); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "Failed to finish writing to file: " + status.ToString()); + return; + } + + if (bad_lines > 0) { + fprintf(stderr, "Warning: %d bad lines ignored.\n", bad_lines); + } + exec_state_ = LDBCommandExecuteResult::Succeed( + "external SST file written to " + output_sst_path_); +} + +Options WriteExternalSstFilesCommand::PrepareOptionsForOpenDB() { + Options opt = LDBCommand::PrepareOptionsForOpenDB(); + opt.create_if_missing = create_if_missing_; + return opt; +} + +const std::string IngestExternalSstFilesCommand::ARG_MOVE_FILES = "move_files"; +const std::string IngestExternalSstFilesCommand::ARG_SNAPSHOT_CONSISTENCY = + "snapshot_consistency"; +const std::string IngestExternalSstFilesCommand::ARG_ALLOW_GLOBAL_SEQNO = + "allow_global_seqno"; +const std::string IngestExternalSstFilesCommand::ARG_ALLOW_BLOCKING_FLUSH = + "allow_blocking_flush"; +const std::string IngestExternalSstFilesCommand::ARG_INGEST_BEHIND = + "ingest_behind"; +const std::string IngestExternalSstFilesCommand::ARG_WRITE_GLOBAL_SEQNO = + "write_global_seqno"; + +void IngestExternalSstFilesCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(IngestExternalSstFilesCommand::Name()); + ret.append(" "); + ret.append(" [--" + ARG_MOVE_FILES + "] "); + ret.append(" [--" + ARG_SNAPSHOT_CONSISTENCY + "] "); + ret.append(" [--" + ARG_ALLOW_GLOBAL_SEQNO + "] "); + ret.append(" [--" + ARG_ALLOW_BLOCKING_FLUSH + "] "); + ret.append(" [--" + ARG_INGEST_BEHIND + "] "); + ret.append(" [--" + ARG_WRITE_GLOBAL_SEQNO + "] "); + ret.append("\n"); +} + +IngestExternalSstFilesCommand::IngestExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags) + : LDBCommand( + options, flags, false /* is_read_only */, + BuildCmdLineOptions({ARG_MOVE_FILES, ARG_SNAPSHOT_CONSISTENCY, + ARG_ALLOW_GLOBAL_SEQNO, ARG_CREATE_IF_MISSING, + ARG_ALLOW_BLOCKING_FLUSH, ARG_INGEST_BEHIND, + ARG_WRITE_GLOBAL_SEQNO})), + move_files_(false), + snapshot_consistency_(true), + allow_global_seqno_(true), + allow_blocking_flush_(true), + ingest_behind_(false) { + create_if_missing_ = + IsFlagPresent(flags, ARG_CREATE_IF_MISSING) || + ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false); + move_files_ = IsFlagPresent(flags, ARG_MOVE_FILES) || + ParseBooleanOption(options, ARG_MOVE_FILES, false); + snapshot_consistency_ = + IsFlagPresent(flags, ARG_SNAPSHOT_CONSISTENCY) || + ParseBooleanOption(options, ARG_SNAPSHOT_CONSISTENCY, true); + allow_global_seqno_ = + IsFlagPresent(flags, ARG_ALLOW_GLOBAL_SEQNO) || + ParseBooleanOption(options, ARG_ALLOW_GLOBAL_SEQNO, true); + allow_blocking_flush_ = + IsFlagPresent(flags, ARG_ALLOW_BLOCKING_FLUSH) || + ParseBooleanOption(options, ARG_ALLOW_BLOCKING_FLUSH, true); + ingest_behind_ = IsFlagPresent(flags, ARG_INGEST_BEHIND) || + ParseBooleanOption(options, ARG_INGEST_BEHIND, false); + write_global_seqno_ = + IsFlagPresent(flags, ARG_WRITE_GLOBAL_SEQNO) || + ParseBooleanOption(options, ARG_WRITE_GLOBAL_SEQNO, true); + + if (allow_global_seqno_) { + if (!write_global_seqno_) { + fprintf(stderr, + "Warning: not writing global_seqno to the ingested SST can\n" + "prevent older versions of RocksDB from being able to open it\n"); + } + } else { + if (write_global_seqno_) { + exec_state_ = LDBCommandExecuteResult::Failed( + "ldb cannot write global_seqno to the ingested SST when global_seqno " + "is not allowed"); + } + } + + if (params.size() != 1) { + exec_state_ = + LDBCommandExecuteResult::Failed("input SST path must be specified"); + } else { + input_sst_path_ = params.at(0); + } +} + +void IngestExternalSstFilesCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + if (GetExecuteState().IsFailed()) { + return; + } + ColumnFamilyHandle* cfh = GetCfHandle(); + IngestExternalFileOptions ifo; + ifo.move_files = move_files_; + ifo.snapshot_consistency = snapshot_consistency_; + ifo.allow_global_seqno = allow_global_seqno_; + ifo.allow_blocking_flush = allow_blocking_flush_; + ifo.ingest_behind = ingest_behind_; + Status status = db_->IngestExternalFile(cfh, {input_sst_path_}, ifo); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "failed to ingest external SST: " + status.ToString()); + } else { + exec_state_ = + LDBCommandExecuteResult::Succeed("external SST files ingested"); + } +} + +Options IngestExternalSstFilesCommand::PrepareOptionsForOpenDB() { + Options opt = LDBCommand::PrepareOptionsForOpenDB(); + opt.create_if_missing = create_if_missing_; + return opt; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index 91afd2674c2..a51e9076a21 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -520,4 +520,57 @@ class RestoreCommand : public BackupableCommand { static void Help(std::string& ret); }; +class WriteExternalSstFilesCommand : public LDBCommand { + public: + static std::string Name() { return "write_extern_sst"; } + WriteExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags); + + virtual void DoCommand() override; + + virtual bool NoDBOpen() override { return false; } + + virtual Options PrepareOptionsForOpenDB() override; + + static void Help(std::string& ret); + + private: + std::string output_sst_path_; +}; + +class IngestExternalSstFilesCommand : public LDBCommand { + public: + static std::string Name() { return "ingest_extern_sst"; } + IngestExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags); + + virtual void DoCommand() override; + + virtual bool NoDBOpen() override { return false; } + + virtual Options PrepareOptionsForOpenDB() override; + + static void Help(std::string& ret); + + private: + std::string input_sst_path_; + bool move_files_; + bool snapshot_consistency_; + bool allow_global_seqno_; + bool allow_blocking_flush_; + bool ingest_behind_; + bool write_global_seqno_; + + static const std::string ARG_MOVE_FILES; + static const std::string ARG_SNAPSHOT_CONSISTENCY; + static const std::string ARG_ALLOW_GLOBAL_SEQNO; + static const std::string ARG_ALLOW_BLOCKING_FLUSH; + static const std::string ARG_INGEST_BEHIND; + static const std::string ARG_WRITE_GLOBAL_SEQNO; +}; + } // namespace rocksdb diff --git a/tools/ldb_test.py b/tools/ldb_test.py index fa0ded4382d..2200fb464b7 100644 --- a/tools/ldb_test.py +++ b/tools/ldb_test.py @@ -76,7 +76,7 @@ def assertRunFAILFull(self, params): my_check_output("./ldb %s >/dev/null 2>&1 |grep -v \"Created bg \ thread\"" % params, shell=True) - except Exception, e: + except Exception: return self.fail( "Exception should have been raised for command with params: %s" % @@ -146,6 +146,14 @@ def dumpDb(self, params, dumpFile): def loadDb(self, params, dumpFile): return 0 == run_err_null("cat %s | ./ldb load %s" % (dumpFile, params)) + def writeExternSst(self, params, inputDumpFile, outputSst): + return 0 == run_err_null("cat %s | ./ldb write_extern_sst %s %s" + % (inputDumpFile, outputSst, params)) + + def ingestExternSst(self, params, inputSst): + return 0 == run_err_null("./ldb ingest_extern_sst %s %s" + % (inputSst, params)) + def testStringBatchPut(self): print "Running testStringBatchPut..." self.assertRunOK("batchput x1 y1 --create_if_missing", "OK") @@ -547,5 +555,38 @@ def testColumnFamilies(self): # non-existing column family. self.assertRunFAIL("get cf3_1 --column_family=four") + def testIngestExternalSst(self): + print "Running testIngestExternalSst..." + + # Dump, load, write external sst and ingest it in another db + dbPath = os.path.join(self.TMP_DIR, "db1") + self.assertRunOK( + "batchput --db=%s --create_if_missing x1 y1 x2 y2 x3 y3 x4 y4" + % dbPath, + "OK") + self.assertRunOK("scan --db=%s" % dbPath, + "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4") + dumpFilePath = os.path.join(self.TMP_DIR, "dump1") + with open(dumpFilePath, 'w') as f: + f.write("x1 ==> y10\nx2 ==> y20\nx3 ==> y30\nx4 ==> y40") + externSstPath = os.path.join(self.TMP_DIR, "extern_data1.sst") + self.assertTrue(self.writeExternSst("--create_if_missing --db=%s" + % dbPath, + dumpFilePath, + externSstPath)) + # cannot ingest if allow_global_seqno is false + self.assertFalse( + self.ingestExternSst( + "--create_if_missing --allow_global_seqno=false --db=%s" + % dbPath, + externSstPath)) + self.assertTrue( + self.ingestExternSst( + "--create_if_missing --allow_global_seqno --db=%s" + % dbPath, + externSstPath)) + self.assertRunOKFull("scan --db=%s" % dbPath, + "x1 : y10\nx2 : y20\nx3 : y30\nx4 : y40") + if __name__ == "__main__": unittest.main() diff --git a/tools/ldb_tool.cc b/tools/ldb_tool.cc index b09076ecc61..fe307eab7dc 100644 --- a/tools/ldb_tool.cc +++ b/tools/ldb_tool.cc @@ -88,6 +88,8 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options, BackupCommand::Help(ret); RestoreCommand::Help(ret); CheckPointCommand::Help(ret); + WriteExternalSstFilesCommand::Help(ret); + IngestExternalSstFilesCommand::Help(ret); fprintf(stderr, "%s\n", ret.c_str()); }