Skip to content

Commit

Permalink
Cherry pick CompactFiles related fixes (#23)
Browse files Browse the repository at this point in the history
* fix CompactFiles inclusion of older L0 files

Summary:
if we're moving any L0 files down, we need to include older L0 files since they may contain older versions of the keys being moved down.
Closes facebook#2845

Differential Revision: D5773800

Pulled By: ajkr

fbshipit-source-id: 9f0770a8eaaeea4c87df2e7a2a1d65bf9d7f4f7e

* fix hanging after CompactFiles with L0 overlap

Summary:
Bug report: https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/

Non-empty `level0_compactions_in_progress_` was aborting `CompactFiles` after incrementing `bg_compaction_scheduled_`, and in that case we never decremented it. This blocked future compactions and prevented DB close as we wait for scheduled compactions to finish/abort during close.

I eliminated `CompactFiles`'s dependency on `level0_compactions_in_progress_`. Since it takes a contiguous span of L0 files -- through the last L0 file if any L1+ files are included -- it's fine to run in parallel with other compactions involving L0. We make the same assumption in intra-L0 compaction.
Closes facebook#2849

Differential Revision: D5780440

Pulled By: ajkr

fbshipit-source-id: 15b15d3faf5a699aed4b82a58352d4a7bb23e027

* Check conflict at output level in CompactFiles (facebook#3926)

Summary:
CompactFiles checked whether the existing files conflicted with the chosen compaction. But it missed checking whether future files would conflict, i.e., when another compaction was simultaneously writing new files to the same range at the same output level.
Closes facebook#3926

Differential Revision: D8218996

Pulled By: ajkr

fbshipit-source-id: 21cb00a6fed4c8c62d3ed2ff810962e6bdc2fdfb
  • Loading branch information
huachaohuang authored Jun 7, 2018
1 parent 8db42be commit 90d77f0
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 17 deletions.
25 changes: 11 additions & 14 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,11 @@ Compaction* CompactionPicker::CompactFiles(
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
uint32_t output_path_id) {
assert(input_files.size());
// This compaction output should not overlap with a running compaction as
// `SanitizeCompactionInputFiles` should've checked earlier and db mutex
// shouldn't have been released since.
assert(!FilesRangeOverlapWithCompaction(input_files, output_level));

// TODO(rven ): we might be able to run concurrent level 0 compaction
// if the key ranges of the two compactions do not overlap, but for now
// we do not allow it.
if ((input_files[0].level == 0) && !level0_compactions_in_progress_.empty()) {
return nullptr;
}
// This compaction output could overlap with a running compaction
if (FilesRangeOverlapWithCompaction(input_files, output_level)) {
return nullptr;
}
auto c =
new Compaction(vstorage, ioptions_, mutable_cf_options, input_files,
output_level, compact_options.output_file_size_limit,
Expand Down Expand Up @@ -731,10 +725,6 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
auto& levels = cf_meta.levels;
auto comparator = icmp_->user_comparator();

// TODO(yhchiang): If there is any input files of L1 or up and there
// is at least one L0 files. All L0 files older than the L0 file needs
// to be included. Otherwise, it is a false conditoin

// TODO(yhchiang): add is_adjustable to CompactionOptions

// the smallest and largest key of the current compaction input
Expand Down Expand Up @@ -795,6 +785,8 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
}
last_included++;
}
} else if (output_level > 0) {
last_included = static_cast<int>(current_files.size() - 1);
}

// include all files between the first and the last compaction input files.
Expand Down Expand Up @@ -854,6 +846,11 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
}
}
}
if (RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) {
return Status::Aborted(
"A running compaction is writing to the same output level in an "
"overlapping key range");
}
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions db/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class CompactionPicker {

// Takes a list of CompactionInputFiles and returns a (manual) Compaction
// object.
//
// Caller must provide a set of input files that has been passed through
// `SanitizeCompactionInputFiles` earlier. The lock should not be released
// between that call and this one.
Compaction* CompactFiles(const CompactionOptions& compact_options,
const std::vector<CompactionInputFiles>& input_files,
int output_level, VersionStorageInfo* vstorage,
Expand Down
133 changes: 133 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2894,6 +2894,139 @@ TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE));
}

TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
// https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/
// CompactFiles() had a bug where it failed to pick a compaction when an L0
// compaction existed, but marked it as scheduled anyways. It'd never be
// unmarked as scheduled, so future compactions or DB close could hang.
const int kNumL0Files = 5;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumL0Files - 1;
options.max_background_compactions = 2;
DestroyAndReopen(options);

rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"LevelCompactionPicker::PickCompaction:Return",
"DBCompactionTest::CompactFilesPendingL0Bug:Picked"},
{"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted",
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

auto schedule_multi_compaction_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();

// Files 0-3 will be included in an L0->L1 compaction.
//
// File 4 will be included in a call to CompactFiles() while the first
// compaction is running.
for (int i = 0; i < kNumL0Files - 1; ++i) {
ASSERT_OK(Put(Key(0), "val")); // sentinel to prevent trivial move
ASSERT_OK(Put(Key(i + 1), "val"));
ASSERT_OK(Flush());
}
TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked");
// file 4 flushed after 0-3 picked
ASSERT_OK(Put(Key(kNumL0Files), "val"));
ASSERT_OK(Flush());

// previously DB close would hang forever as this situation caused scheduled
// compactions count to never decrement to zero.
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
std::vector<std::string> input_filenames;
input_filenames.push_back(cf_meta.levels[0].files.front().name);
ASSERT_OK(dbfull()
->CompactFiles(CompactionOptions(), input_filenames,
0 /* output_level */));
TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
// Regression test for bug of not pulling in L0 files that overlap the user-
// specified input files in time- and key-ranges.
Put(Key(0), "old_val");
Flush();
Put(Key(0), "new_val");
Flush();

ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
ASSERT_GE(cf_meta.levels.size(), 2);
ASSERT_EQ(2, cf_meta.levels[0].files.size());

// Compacting {new L0 file, L1 file} should pull in the old L0 file since it
// overlaps in key-range and time-range.
std::vector<std::string> input_filenames;
input_filenames.push_back(cf_meta.levels[0].files.front().name);
ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
1 /* output_level */));
ASSERT_EQ("new_val", Get(Key(0)));
}

TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
// LSM setup:
// L1: [ba bz]
// L2: [a b] [c d]
// L3: [a b] [c d]
//
// Thread 1: Thread 2:
// Begin compacting all L2->L3
// Compact [ba bz] L1->L3
// End compacting all L2->L3
//
// The compaction operation in thread 2 should be disallowed because the range
// overlaps with the compaction in thread 1, which also covers that range in
// L3.
Options options = CurrentOptions();
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
Reopen(options);

for (int level = 3; level >= 2; --level) {
ASSERT_OK(Put("a", "val"));
ASSERT_OK(Put("b", "val"));
ASSERT_OK(Flush());
ASSERT_OK(Put("c", "val"));
ASSERT_OK(Put("d", "val"));
ASSERT_OK(Flush());
MoveFilesToLevel(level);
}
ASSERT_OK(Put("ba", "val"));
ASSERT_OK(Put("bz", "val"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);

SyncPoint::GetInstance()->LoadDependency({
{"CompactFilesImpl:0",
"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"},
{"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End",
"CompactFilesImpl:1"},
});
SyncPoint::GetInstance()->EnableProcessing();

auto bg_thread = port::Thread([&]() {
// Thread 1
std::vector<std::string> filenames = collector->GetFlushedFiles();
filenames.pop_back();
ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames,
3 /* output_level */));
});

// Thread 2
TEST_SYNC_POINT(
"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin");
std::string filename = collector->GetFlushedFiles().back();
ASSERT_FALSE(
db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */)
.ok());
TEST_SYNC_POINT(
"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End");

bg_thread.join();
}

INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
::testing::Values(std::make_tuple(1, true),
std::make_tuple(1, false),
Expand Down
7 changes: 4 additions & 3 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,10 @@ Status DBImpl::CompactFilesImpl(
c.reset(cfd->compaction_picker()->CompactFiles(
compact_options, input_files, output_level, version->storage_info(),
*cfd->GetLatestMutableCFOptions(), output_path_id));
if (!c) {
return Status::Aborted("Another Level 0 compaction is running");
}
// we already sanitized the set of input files and checked for conflicts
// without releasing the lock, so we're guaranteed a compaction can be formed.
assert(c != nullptr);

c->SetInputVersion(version);
// deletion compaction currently not allowed in CompactFiles.
assert(!c->deletion_compaction());
Expand Down

0 comments on commit 90d77f0

Please sign in to comment.