Skip to content

Commit

Permalink
Truncate range tombstones from sstables
Browse files Browse the repository at this point in the history
When adding range tombstones to a RangeDelAggregator, truncate the
tombstones to the sstable boundaries. This is done as an alternative to
truncating the range tombstones within the sstables themselves as it
properly handles existing data where range tombstones are not truncated
in sstables. Truncating range tombstones to sstable boundaries avoids
having two process all of the sstables with overlapping tombstones as a
unit.

See facebook#4050
  • Loading branch information
petermattis committed Jul 13, 2018
1 parent 450e91a commit ce6b81d
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 32 deletions.
2 changes: 1 addition & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ Status BuildTable(
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, meta->fd,
ReadOptions(), env_options, internal_comparator, *meta,
nullptr /* range_del_agg */, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ Status CompactionJob::FinishCompactionOutputFile(
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
ReadOptions(), env_options_, cfd->internal_comparator(), *meta,
nullptr /* range_del_agg */, nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
Expand Down
8 changes: 4 additions & 4 deletions db/forward_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ForwardLevelIterator : public InternalIterator {
cfd_->internal_comparator(), {} /* snapshots */);
file_iter_ = cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
files_[file_index_]->fd,
*files_[file_index_],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
nullptr /* table_reader_ptr */, nullptr, false);
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
Expand Down Expand Up @@ -610,7 +610,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
continue;
}
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd,
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
read_options_.ignore_range_deletions ? nullptr : &range_del_agg));
}
BuildLevelIterators(vstorage);
Expand Down Expand Up @@ -680,7 +680,7 @@ void ForwardIterator::RenewIterators() {
}
l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
l0_files_new[inew]->fd,
*l0_files_new[inew],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg));
}

Expand Down Expand Up @@ -738,7 +738,7 @@ void ForwardIterator::ResetIncompleteIterators() {
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
l0_files[i]->fd, nullptr /* range_del_agg */);
*l0_files[i], nullptr /* range_del_agg */);
l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
}

Expand Down
27 changes: 26 additions & 1 deletion db/range_del_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,9 @@ bool RangeDelAggregator::IsRangeOverlapped(const Slice& start,
}

Status RangeDelAggregator::AddTombstones(
std::unique_ptr<InternalIterator> input) {
std::unique_ptr<InternalIterator> input,
const InternalKey* smallest,
const InternalKey* largest) {
if (input == nullptr) {
return Status::OK();
}
Expand All @@ -541,6 +543,29 @@ Status RangeDelAggregator::AddTombstones(
return Status::Corruption("Unable to parse range tombstone InternalKey");
}
RangeTombstone tombstone(parsed_key, input->value());
// Truncate the tombstone to the range [smallest, largest].
if (smallest != nullptr) {
if (icmp_.user_comparator()->Compare(
tombstone.start_key_, smallest->user_key()) < 0) {
tombstone.start_key_ = smallest->user_key();
}
}
if (largest != nullptr) {
// This is subtly correct despite the discrepancy between
// FileMetaData::largest being inclusive while RangeTombstone::end_key_
// is exclusive. A tombstone will only extend past the bounds of an
// sstable if its end-key is the largest key in the table. If that
// occurs, the largest key for the table is set based on the smallest
// key in the next table in the level. In that case, largest->user_key()
// is not actually a key in the current table and thus we can use it as
// the exclusive end-key for the tombstone.
if (icmp_.user_comparator()->Compare(
tombstone.end_key_, largest->user_key()) > 0) {
// The largest key should be a tombstone sentinel key.
assert(GetInternalKeySeqno(largest->Encode()) == kMaxSequenceNumber);
tombstone.end_key_ = largest->user_key();
}
}
GetRangeDelMap(tombstone.seq_).AddTombstone(std::move(tombstone));
input->Next();
}
Expand Down
10 changes: 8 additions & 2 deletions db/range_del_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,15 @@ class RangeDelAggregator {
bool IsRangeOverlapped(const Slice& start, const Slice& end);

// Adds tombstones to the tombstone aggregation structure maintained by this
// object.
// object. Tombstones are truncated to smallest and largest. If smallest (or
// largest) is null, it is not used for truncation. When adding range
// tombstones present in an sstable, smallest and largest should be set to
// the smallest and largest keys from the sstable file metadata. Note that
// tombstones end keys are exclusive while largest is inclusive.
// @return non-OK status if any of the tombstone keys are corrupted.
Status AddTombstones(std::unique_ptr<InternalIterator> input);
Status AddTombstones(std::unique_ptr<InternalIterator> input,
const InternalKey* smallest = nullptr,
const InternalKey* largest = nullptr);

// Resets iterators maintained across calls to ShouldDelete(). This may be
// called when the tombstones change, or the owner may call explicitly, e.g.,
Expand Down
43 changes: 34 additions & 9 deletions db/range_del_aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace {
struct ExpectedPoint {
Slice begin;
SequenceNumber seq;
bool expectAlive;
};

struct ExpectedRange {
Expand All @@ -35,7 +36,9 @@ enum Direction {
static auto icmp = InternalKeyComparator(BytewiseComparator());

void AddTombstones(RangeDelAggregator* range_del_agg,
const std::vector<RangeTombstone>& range_dels) {
const std::vector<RangeTombstone>& range_dels,
const InternalKey* smallest = nullptr,
const InternalKey* largest = nullptr) {
std::vector<std::string> keys, values;
for (const auto& range_del : range_dels) {
auto key_and_value = range_del.Serialize();
Expand All @@ -44,7 +47,7 @@ void AddTombstones(RangeDelAggregator* range_del_agg,
}
std::unique_ptr<test::VectorIterator> range_del_iter(
new test::VectorIterator(keys, values));
range_del_agg->AddTombstones(std::move(range_del_iter));
range_del_agg->AddTombstones(std::move(range_del_iter), smallest, largest);
}

void VerifyTombstonesEq(const RangeTombstone& a, const RangeTombstone& b) {
Expand All @@ -68,7 +71,9 @@ void VerifyRangeDelIter(
void VerifyRangeDels(
const std::vector<RangeTombstone>& range_dels_in,
const std::vector<ExpectedPoint>& expected_points,
const std::vector<RangeTombstone>& expected_collapsed_range_dels) {
const std::vector<RangeTombstone>& expected_collapsed_range_dels,
const InternalKey* smallest = nullptr,
const InternalKey* largest = nullptr) {
// Test same result regardless of which order the range deletions are added
// and regardless of collapsed mode.
for (bool collapsed : {false, true}) {
Expand All @@ -79,7 +84,7 @@ void VerifyRangeDels(
if (dir == kReverse) {
std::reverse(range_dels.begin(), range_dels.end());
}
AddTombstones(&range_del_agg, range_dels);
AddTombstones(&range_del_agg, range_dels, smallest, largest);

auto mode = RangeDelPositioningMode::kFullScan;
if (collapsed) {
Expand All @@ -94,22 +99,29 @@ void VerifyRangeDels(
ASSERT_FALSE(range_del_agg.ShouldDelete(parsed_key, mode));
if (parsed_key.sequence > 0) {
--parsed_key.sequence;
ASSERT_TRUE(range_del_agg.ShouldDelete(parsed_key, mode));
if (expected_point.expectAlive) {
ASSERT_FALSE(range_del_agg.ShouldDelete(parsed_key, mode));
} else {
ASSERT_TRUE(range_del_agg.ShouldDelete(parsed_key, mode));
}
}
}

if (collapsed) {
range_dels = expected_collapsed_range_dels;
} else {
// Tombstones in an uncollapsed map are presented in start key order.
// Tombstones with the same start key are presented in insertion order.
VerifyRangeDelIter(range_del_agg.NewIterator().get(), range_dels);
} else if (smallest == nullptr && largest == nullptr) {
// Tombstones in an uncollapsed map are presented in start key
// order. Tombstones with the same start key are presented in
// insertion order. We don't handle tombstone truncation here, so the
// verification is only performed if no truncation was requested.
std::stable_sort(range_dels.begin(), range_dels.end(),
[&](const RangeTombstone& a, const RangeTombstone& b) {
return icmp.user_comparator()->Compare(
a.start_key_, b.start_key_) < 0;
});
VerifyRangeDelIter(range_del_agg.NewIterator().get(), range_dels);
}
VerifyRangeDelIter(range_del_agg.NewIterator().get(), range_dels);
}
}

Expand Down Expand Up @@ -391,6 +403,19 @@ TEST_F(RangeDelAggregatorTest, GetTombstone) {
{"e", "h", 20});
}

TEST_F(RangeDelAggregatorTest, TruncateTombstones) {
const InternalKey smallest("b", 1, kTypeRangeDeletion);
const InternalKey largest("e", kMaxSequenceNumber, kTypeRangeDeletion);
VerifyRangeDels(
{{"a", "c", 10}, {"d", "f", 10}},
{{"a", 10, true}, // truncated
{"b", 10, false}, // not truncated
{"d", 10, false}, // not truncated
{"e", 10, true}}, // truncated
{{"b", "c", 10}, {"d", "e", 10}},
&smallest, &largest);
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
2 changes: 1 addition & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ class Repairer {
}
if (status.ok()) {
InternalIterator* iter = table_cache_->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd,
ReadOptions(), env_options_, cfd->internal_comparator(), t->meta,
nullptr /* range_del_agg */);
bool empty = true;
ParsedInternalKey parsed;
Expand Down
15 changes: 11 additions & 4 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Status TableCache::FindTable(const EnvOptions& env_options,

InternalIterator* TableCache::NewIterator(
const ReadOptions& options, const EnvOptions& env_options,
const InternalKeyComparator& icomparator, const FileDescriptor& fd,
const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
RangeDelAggregator* range_del_agg, TableReader** table_reader_ptr,
HistogramImpl* file_read_hist, bool for_compaction, Arena* arena,
bool skip_filters, int level) {
Expand Down Expand Up @@ -201,6 +201,7 @@ InternalIterator* TableCache::NewIterator(
create_new_table_reader = readahead > 0;
}

auto& fd = file_meta.fd;
if (create_new_table_reader) {
unique_ptr<TableReader> table_reader_unique_ptr;
s = GetTableReader(
Expand Down Expand Up @@ -256,7 +257,10 @@ InternalIterator* TableCache::NewIterator(
s = range_del_iter->status();
}
if (s.ok()) {
s = range_del_agg->AddTombstones(std::move(range_del_iter));
s = range_del_agg->AddTombstones(
std::move(range_del_iter),
&file_meta.smallest,
&file_meta.largest);
}
}
}
Expand All @@ -273,9 +277,10 @@ InternalIterator* TableCache::NewIterator(

Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, const Slice& k,
const FileMetaData& file_meta, const Slice& k,
GetContext* get_context, HistogramImpl* file_read_hist,
bool skip_filters, int level) {
auto& fd = file_meta.fd;
std::string* row_cache_entry = nullptr;
bool done = false;
#ifndef ROCKSDB_LITE
Expand Down Expand Up @@ -356,7 +361,9 @@ Status TableCache::Get(const ReadOptions& options,
}
if (s.ok()) {
s = get_context->range_del_agg()->AddTombstones(
std::move(range_del_iter));
std::move(range_del_iter),
&file_meta.smallest,
&file_meta.largest);
}
}
if (s.ok()) {
Expand Down
4 changes: 2 additions & 2 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class TableCache {
InternalIterator* NewIterator(
const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, RangeDelAggregator* range_del_agg,
const FileMetaData& file_meta, RangeDelAggregator* range_del_agg,
TableReader** table_reader_ptr = nullptr,
HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,
Arena* arena = nullptr, bool skip_filters = false, int level = -1);
Expand All @@ -68,7 +68,7 @@ class TableCache {
// @param level The level this table is at, -1 for "not set / don't know"
Status Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, const Slice& k,
const FileMetaData& file_meta, const Slice& k,
GetContext* get_context, HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false, int level = -1);

Expand Down
15 changes: 8 additions & 7 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ class LevelIterator final : public InternalIterator {
}

return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, file_meta.fd, range_del_agg_,
read_options_, env_options_, icomparator_, *file_meta.file_metadata,
range_del_agg_,
nullptr /* don't need reference to table */, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_, level_);
}
Expand Down Expand Up @@ -1034,7 +1035,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
if (!range_del_agg->ShouldDeleteRange(
file.smallest_key, file.largest_key, file.file_metadata->largest_seqno)) {
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd,
read_options, soptions, cfd_->internal_comparator(), *file.file_metadata,
range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0),
false, arena, false /* skip_filters */, 0 /* level */));
}
Expand Down Expand Up @@ -1087,7 +1088,7 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
continue;
}
ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
read_options, env_options, cfd_->internal_comparator(), file->fd,
read_options, env_options, cfd_->internal_comparator(), *file->file_metadata,
&range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0),
false, &arena, false /* skip_filters */, 0 /* level */));
status = OverlapWithIterator(
Expand Down Expand Up @@ -1228,7 +1229,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
}

*status = table_cache_->Get(
read_options, *internal_comparator(), f->fd, ikey, &get_context,
read_options, *internal_comparator(), *f->file_metadata, ikey, &get_context,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
Expand Down Expand Up @@ -3886,8 +3887,8 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
// approximate offset of "key" within the table.
TableReader* table_reader_ptr;
InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
ReadOptions(), v->env_options_, v->cfd_->internal_comparator(), f.fd,
nullptr /* range_del_agg */, &table_reader_ptr);
ReadOptions(), v->env_options_, v->cfd_->internal_comparator(),
*f.file_metadata, nullptr /* range_del_agg */, &table_reader_ptr);
if (table_reader_ptr != nullptr) {
result = table_reader_ptr->ApproximateOffsetOf(key);
}
Expand Down Expand Up @@ -3966,7 +3967,7 @@ InternalIterator* VersionSet::MakeInputIterator(
for (size_t i = 0; i < flevel->num_files; i++) {
list[num++] = cfd->table_cache()->NewIterator(
read_options, env_options_compactions, cfd->internal_comparator(),
flevel->files[i].fd, range_del_agg,
*flevel->files[i].file_metadata, range_del_agg,
nullptr /* table_reader_ptr */,
nullptr /* no per level latency histogram */,
true /* for_compaction */, nullptr /* arena */,
Expand Down

0 comments on commit ce6b81d

Please sign in to comment.