Skip to content

Commit

Permalink
Unschedule manual compaction from thread-pool queue (#9625)
Browse files Browse the repository at this point in the history
Summary:
PR #9557 introduced a race condition between manual compaction
foreground thread and background compaction thread.
This PR adds the ability to really unschedule manual compaction from
thread-pool queue by differentiate tag name for manual compaction and
other tasks.
Also fix an issue that db `close()` didn't cancel the manual compaction thread.

Pull Request resolved: #9625

Test Plan: unittest not hang

Reviewed By: ajkr

Differential Revision: D34410811

Pulled By: jay-zhuang

fbshipit-source-id: cb14065eabb8cf1345fa042b5652d4f788c0c40c
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Mar 2, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent d74468e commit db86479
Showing 5 changed files with 190 additions and 102 deletions.
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -10,6 +10,9 @@
### Public API changes
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.

### Bug Fixes
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.

## 7.0.0 (02/20/2022)
### Bug Fixes
* Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.)
61 changes: 59 additions & 2 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
@@ -6951,7 +6951,7 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {

SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::RunManualCompaction:Scheduled",
"DBCompactionTest::DisableManualCompactionThreadQueueFull:"
"DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
"PreDisableManualCompaction"}});
SyncPoint::GetInstance()->EnableProcessing();

@@ -6979,7 +6979,7 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
});

TEST_SYNC_POINT(
"DBCompactionTest::DisableManualCompactionThreadQueueFull:"
"DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
"PreDisableManualCompaction");

// Generate more files to trigger auto compaction which is scheduled after
@@ -7006,6 +7006,63 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
sleeping_task_low.WaitUntilDone();
}

TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
const int kNumL0Files = 4;

SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::RunManualCompaction:Scheduled",
"DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
"PreDisableManualCompaction"}});
SyncPoint::GetInstance()->EnableProcessing();

Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);

// Block compaction queue
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);

// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value2"));
ASSERT_OK(Flush());
}

port::Thread compact_thread([&]() {
CompactRangeOptions cro;
cro.exclusive_manual_compaction = true;
auto s = db_->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
});

TEST_SYNC_POINT(
"DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
"PreDisableManualCompaction");

// Generate more files to trigger auto compaction which is scheduled after
// manual compaction. Has to generate 4 more files because existing files are
// pending compaction
for (int i = 0; i < kNumL0Files; i++) {
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value2"));
ASSERT_OK(Flush());
}
ASSERT_EQ(ToString(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0));

// Close DB with manual compaction and auto triggered compaction in the queue.
auto s = db_->Close();
ASSERT_OK(s);

// manual compaction thread should return with Incomplete().
compact_thread.join();

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
}

TEST_F(DBCompactionTest,
DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) {
// When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait
15 changes: 12 additions & 3 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
@@ -539,10 +539,19 @@ Status DBImpl::CloseHelper() {
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
CancelAllBackgroundWork(false);

// Cancel manual compaction if there's any
if (HasPendingManualCompaction()) {
DisableManualCompaction();
}
mutex_.Lock();
env_->UnSchedule(this, Env::Priority::BOTTOM);
env_->UnSchedule(this, Env::Priority::LOW);
env_->UnSchedule(this, Env::Priority::HIGH);
// Unschedule all tasks for this DB
for (uint8_t i = 0; i < static_cast<uint8_t>(TaskType::kCount); i++) {
env_->UnSchedule(GetTaskTag(i), Env::Priority::BOTTOM);
env_->UnSchedule(GetTaskTag(i), Env::Priority::LOW);
env_->UnSchedule(GetTaskTag(i), Env::Priority::HIGH);
}

Status ret = Status::OK();

// Wait for background work to finish
20 changes: 19 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
@@ -1539,7 +1539,6 @@ class DBImpl : public DB {
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
// task limiter token is requested during compaction picking.
std::unique_ptr<TaskLimiterToken> task_token;
bool is_canceled = false;
};

struct CompactionArg {
@@ -1734,6 +1733,25 @@ class DBImpl : public DB {
}
}

// TaskType is used to identify tasks in thread-pool, currently only
// differentiate manual compaction, which could be unscheduled from the
// thread-pool.
enum class TaskType : uint8_t {
kDefault = 0,
kManualCompaction = 1,
kCount = 2,
};

// Task tag is used to identity tasks in thread-pool, which is
// dbImpl obj address + type
inline void* GetTaskTag(TaskType type) {
return GetTaskTag(static_cast<uint8_t>(type));
}

inline void* GetTaskTag(uint8_t type) {
return static_cast<uint8_t*>(static_cast<void*>(this)) + type;
}

// REQUIRES: mutex locked and in write thread.
void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);

193 changes: 97 additions & 96 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
@@ -1763,6 +1763,7 @@ Status DBImpl::RunManualCompaction(
CompactionArg* ca = nullptr;

bool scheduled = false;
Env::Priority thread_pool_priority = Env::Priority::TOTAL;
bool manual_conflict = false;
ManualCompactionState manual;
manual.cfd = cfd;
@@ -1884,9 +1885,9 @@ Status DBImpl::RunManualCompaction(
manual.done = true;
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
if (ca && ca->prepicked_compaction) {
ca->prepicked_compaction->is_canceled = true;
}
assert(thread_pool_priority != Env::Priority::TOTAL);
env_->UnSchedule(GetTaskTag(TaskType::kManualCompaction),
thread_pool_priority);
break;
}
if (scheduled && manual.incomplete == true) {
@@ -1916,13 +1917,17 @@ Status DBImpl::RunManualCompaction(
bg_bottom_compaction_scheduled_++;
ca->compaction_pri_ = Env::Priority::BOTTOM;
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca,
Env::Priority::BOTTOM, this,
Env::Priority::BOTTOM,
GetTaskTag(TaskType::kManualCompaction),
&DBImpl::UnscheduleCompactionCallback);
thread_pool_priority = Env::Priority::BOTTOM;
} else {
bg_compaction_scheduled_++;
ca->compaction_pri_ = Env::Priority::LOW;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW,
GetTaskTag(TaskType::kManualCompaction),
&DBImpl::UnscheduleCompactionCallback);
thread_pool_priority = Env::Priority::LOW;
}
scheduled = true;
TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled");
@@ -1933,6 +1938,13 @@ Status DBImpl::RunManualCompaction(
assert(!manual.in_progress);
assert(HasPendingManualCompaction());
RemoveManualCompaction(&manual);
// if the manual job is unscheduled, try schedule other jobs in case there's
// any unscheduled compaction job which was blocked by exclusive manual
// compaction.
if (manual.status.IsIncomplete() &&
manual.status.subcode() == Status::SubCode::kManualCompactionPaused) {
MaybeScheduleFlushOrCompaction();
}
bg_cv_.SignalAll();
return manual.status;
}
@@ -2661,6 +2673,8 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) {
delete reinterpret_cast<CompactionArg*>(arg);
if (ca.prepicked_compaction != nullptr) {
if (ca.prepicked_compaction->compaction != nullptr) {
ca.prepicked_compaction->compaction->ReleaseCompactionFiles(
Status::Incomplete(Status::SubCode::kManualCompactionPaused));
delete ca.prepicked_compaction->compaction;
}
delete ca.prepicked_compaction;
@@ -2851,106 +2865,93 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Env::Priority bg_thread_pri) {
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT("BackgroundCallCompaction:0");
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
{
InstrumentedMutexLock l(&mutex_);

if (prepicked_compaction && prepicked_compaction->is_canceled) {
assert(prepicked_compaction->compaction);
ROCKS_LOG_BUFFER(&log_buffer, "[%s] Skip canceled manual compaction job",
prepicked_compaction->compaction->column_family_data()
->GetName()
.c_str());
prepicked_compaction->compaction->ReleaseCompactionFiles(
Status::Incomplete(Status::SubCode::kManualCompactionPaused));
delete prepicked_compaction->compaction;
} else {
JobContext job_context(next_job_id_.fetch_add(1), true);
// This call will unlock/lock the mutex to wait for current running
// IngestExternalFile() calls to finish.
WaitForIngestFile();

num_running_compactions_++;

std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));

assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction, bg_thread_pri);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
immutable_db_options_.clock->SleepForMicroseconds(
10000); // prevent hot loop
mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt =
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
log_buffer.FlushBufferToLog();
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Waiting after background compaction error: %s, "
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
LogFlush(immutable_db_options_.info_log);
immutable_db_options_.clock->SleepForMicroseconds(1000000);
mutex_.Lock();
} else if (s.IsManualCompactionPaused()) {
assert(prepicked_compaction);
ManualCompactionState* m =
prepicked_compaction->manual_compaction_state;
assert(m);
ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
m->cfd->GetName().c_str(), job_context.job_id);
}

ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

// If compaction failed, we want to delete all temporary files that we
// might have created (they might not be all recorded in job_context in
// case of a failure). Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() &&
!s.IsColumnFamilyDropped() &&
!s.IsBusy());
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");

// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
TEST_SYNC_POINT(
"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
}
job_context.Clean();
mutex_.Lock();
}
// This call will unlock/lock the mutex to wait for current running
// IngestExternalFile() calls to finish.
WaitForIngestFile();

num_running_compactions_++;

std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));

assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction, bg_thread_pri);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
immutable_db_options_.clock->SleepForMicroseconds(
10000); // prevent hot loop
mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt =
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
log_buffer.FlushBufferToLog();
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Waiting after background compaction error: %s, "
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
LogFlush(immutable_db_options_.info_log);
immutable_db_options_.clock->SleepForMicroseconds(1000000);
mutex_.Lock();
} else if (s.IsManualCompactionPaused()) {
assert(prepicked_compaction);
ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
assert(m);
ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
m->cfd->GetName().c_str(), job_context.job_id);
}

ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

assert(num_running_compactions_ > 0);
num_running_compactions_--;
// If compaction failed, we want to delete all temporary files that we
// might have created (they might not be all recorded in job_context in
// case of a failure). Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() &&
!s.IsColumnFamilyDropped() &&
!s.IsBusy());
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");

// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
}
job_context.Clean();
mutex_.Lock();
}

assert(num_running_compactions_ > 0);
num_running_compactions_--;

if (bg_thread_pri == Env::Priority::LOW) {
bg_compaction_scheduled_--;
} else {

0 comments on commit db86479

Please sign in to comment.