Skip to content

Commit

Permalink
[yugabyte#6747] Don't block thread while waiting for rebuild YCQL sys…
Browse files Browse the repository at this point in the history
…tem table cache

Summary:
`RebuildYQLSystemPartitions` blocks thread while waiting for rebuild interval.
It prevents master shutdown until sleep finishes.
So restarting cluster with 3 masters could take several minutes.

Fixed by replacing with wait using scheduler.

Test Plan: ybd --gtest_filter BackupTxnTest.DeleteTableWithMastersRestart -n 20

Reviewers: bogdan, zyu, nicolas

Reviewed By: zyu, nicolas

Subscribers: dsrinivasan, zyu, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D10220
  • Loading branch information
spolitov committed Jan 6, 2021
1 parent 9bb89e5 commit 8c98f6c
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 52 deletions.
41 changes: 10 additions & 31 deletions src/yb/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ RaftConsensus::RaftConsensus(
queue_(std::move(queue)),
rng_(GetRandomSeed32()),
withhold_votes_until_(MonoTime::Min()),
step_down_check_tracker_(&peer_proxy_factory_->messenger()->scheduler()),
mark_dirty_clbk_(std::move(mark_dirty_clbk)),
shutdown_(false),
follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter(
Expand Down Expand Up @@ -707,19 +708,13 @@ Status RaftConsensus::StartStepDownUnlocked(const RaftPeerPB& peer, bool gracefu
graceful ? std::string() : peer.permanent_uuid(), MonoDelta());
}

void RaftConsensus::CheckDelayedStepDown(rpc::ScheduledTaskId task_id, const Status& status) {
void RaftConsensus::CheckDelayedStepDown(const Status& status) {
ReplicaState::UniqueLock lock;
auto lock_status = state_->LockForConfigChange(&lock);
if (!lock_status.ok()) {
LOG_WITH_PREFIX(INFO) << "Failed to check delayed election: " << lock_status;
lock = state_->LockForRead();
--num_scheduled_step_down_checks_;
return;
}
--num_scheduled_step_down_checks_;
if (task_id == last_scheduled_step_down_check_task_id_) {
last_scheduled_step_down_check_task_id_ = rpc::kInvalidTaskId;
}

if (state_->GetCurrentTermUnlocked() != delayed_step_down_.term) {
return;
Expand Down Expand Up @@ -846,13 +841,8 @@ Status RaftConsensus::StepDown(const LeaderStepDownRequestPB* req, LeaderStepDow
.graceful = graceful_stepdown,
};
LOG_WITH_PREFIX(INFO) << "Delay step down: " << delayed_step_down_.ToString();
auto& scheduler = peer_proxy_factory_->messenger()->scheduler();
if (last_scheduled_step_down_check_task_id_ != rpc::kInvalidTaskId) {
scheduler.Abort(last_scheduled_step_down_check_task_id_);
}
++num_scheduled_step_down_checks_;
last_scheduled_step_down_check_task_id_ = scheduler.Schedule(
std::bind(&RaftConsensus::CheckDelayedStepDown, this, _1, _2),
step_down_check_tracker_.Schedule(
std::bind(&RaftConsensus::CheckDelayedStepDown, this, _1),
1ms * timeout_ms);
return Status::OK();
}
Expand Down Expand Up @@ -2495,24 +2485,13 @@ void RaftConsensus::Shutdown() {

CHECK_OK(ExecuteHook(PRE_SHUTDOWN));

for (;;) {
{
ReplicaState::UniqueLock lock;
// Transition to kShuttingDown state.
CHECK_OK(state_->LockForShutdown(&lock));
if (num_scheduled_step_down_checks_ == 0) {
LOG_WITH_PREFIX(INFO) << "Raft consensus shutting down.";
break;
}
YB_LOG_EVERY_N_SECS(INFO, 1) << LogPrefix() << "Waiting " << num_scheduled_step_down_checks_
<< " step down checks to complete";
if (last_scheduled_step_down_check_task_id_ != rpc::kInvalidTaskId) {
peer_proxy_factory_->messenger()->scheduler().Abort(
last_scheduled_step_down_check_task_id_);
}
}
std::this_thread::sleep_for(1ms);
{
ReplicaState::UniqueLock lock;
// Transition to kShuttingDown state.
CHECK_OK(state_->LockForShutdown(&lock));
step_down_check_tracker_.StartShutdown();
}
step_down_check_tracker_.CompleteShutdown();

// Close the peer manager.
peer_manager_->Close();
Expand Down
7 changes: 4 additions & 3 deletions src/yb/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
#include "yb/consensus/consensus_meta.h"
#include "yb/consensus/consensus_queue.h"

#include "yb/rpc/scheduler.h"

#include "yb/util/opid.h"
#include "yb/util/random.h"
#include "yb/util/result.h"
Expand Down Expand Up @@ -627,7 +629,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
CHECKED_STATUS StartStepDownUnlocked(const RaftPeerPB& peer, bool graceful);

// Checked whether we should start step down when protege did not synchronize before timeout.
void CheckDelayedStepDown(rpc::ScheduledTaskId task_id, const Status& status);
void CheckDelayedStepDown(const Status& status);

// Threadpool token for constructing requests to peers, handling RPC callbacks,
// etc.
Expand Down Expand Up @@ -674,8 +676,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
};

DelayedStepDown delayed_step_down_;
int num_scheduled_step_down_checks_ = 0;
rpc::ScheduledTaskId last_scheduled_step_down_check_task_id_ = rpc::kInvalidTaskId;
rpc::ScheduledTaskTracker step_down_check_tracker_;

// The number of times this node has called and lost a leader election since
// the last time it saw a stable leader (either itself or another node).
Expand Down
7 changes: 7 additions & 0 deletions src/yb/integration-tests/mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "yb/tserver/tablet_server.h"
#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/debug/long_operation_tracker.h"
#include "yb/util/path_util.h"
#include "yb/util/random_util.h"
#include "yb/util/scope_exit.h"
Expand Down Expand Up @@ -265,9 +266,15 @@ Status MiniCluster::RestartSync() {
}
LOG(INFO) << "Restart master server(s)...";
for (auto& master_server : mini_masters_) {
LOG(INFO) << "Restarting master " << master_server->permanent_uuid();
LongOperationTracker long_operation_tracker("Master restart", 5s);
CHECK_OK(master_server->Restart());
LOG(INFO) << "Waiting for catalog manager at " << master_server->permanent_uuid();
CHECK_OK(master_server->WaitForCatalogManagerInit());
}
LOG(INFO) << string(80, '-');
LOG(INFO) << __FUNCTION__ << " done";
LOG(INFO) << string(80, '-');

RETURN_NOT_OK_PREPEND(WaitForAllTabletServers(),
"Waiting for tablet servers to start");
Expand Down
32 changes: 15 additions & 17 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@
#include "yb/master/yql_views_vtable.h"

#include "yb/tserver/ts_tablet_manager.h"
#include "yb/rpc/messenger.h"

#include "yb/tablet/operations/change_metadata_operation.h"
#include "yb/tablet/tablet.h"
Expand Down Expand Up @@ -553,6 +552,8 @@ bool IsIndexBackfillEnabled(TableType table_type, bool is_transactional) {
return !disabled;
}

constexpr auto kDefaultYQLPartitionsRefreshBgTaskSleep = 10s;

} // anonymous namespace

////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1460,6 +1461,8 @@ bool CatalogManager::StartShutdown() {
state_ = kClosing;
}

refresh_yql_partitions_task_.StartShutdown();

if (sys_catalog_) {
sys_catalog_->StartShutdown();
}
Expand All @@ -1469,6 +1472,7 @@ bool CatalogManager::StartShutdown() {

void CatalogManager::CompleteShutdown() {
// Shutdown the Catalog Manager background thread (load balancing).
refresh_yql_partitions_task_.CompleteShutdown();
if (background_tasks_) {
background_tasks_->Shutdown();
}
Expand Down Expand Up @@ -6869,6 +6873,7 @@ Status CatalogManager::EnableBgTasks() {
RETURN_NOT_OK_PREPEND(background_tasks_->Init(),
"Failed to initialize catalog manager background tasks");
// Add bg thread to rebuild the system partitions thread.
refresh_yql_partitions_task_.Bind(&master_->messenger()->scheduler());
RETURN_NOT_OK(background_tasks_thread_pool_->SubmitFunc(
std::bind(&CatalogManager::RebuildYQLSystemPartitions, this)));
return Status::OK();
Expand Down Expand Up @@ -8742,23 +8747,16 @@ void CatalogManager::RebuildYQLSystemPartitions() {
}
}

while (true) {
// Allow for FLAGS_partitions_vtable_cache_refresh_secs to be changed on the fly. If set to 0,
// then this thread will sleep and awake every kDefaultYQLPartitionsRefreshBgTaskSleepSecs to
// check if the flag has changed again.
int sleep_secs = FLAGS_partitions_vtable_cache_refresh_secs;
if (sleep_secs <= 0) {
sleep_secs = kDefaultYQLPartitionsRefreshBgTaskSleepSecs;
}
SleepFor(MonoDelta::FromSeconds(sleep_secs));
const auto s = background_tasks_thread_pool_->SubmitFunc(
std::bind(&CatalogManager::RebuildYQLSystemPartitions, this));
if (s.ok() || s.IsServiceUnavailable()) {
// Either succesfully started new task, or we are shutting down.
return;
}
LOG(WARNING) << "Could not submit RebuildYQLSystemPartitions to thread pool: " << s.ToString();
auto wait_time = FLAGS_partitions_vtable_cache_refresh_secs * 1s;
if (wait_time <= 0s) {
wait_time = kDefaultYQLPartitionsRefreshBgTaskSleep;
}
refresh_yql_partitions_task_.Schedule([this](const Status& status) {
WARN_NOT_OK(
background_tasks_thread_pool_->SubmitFunc(
std::bind(&CatalogManager::RebuildYQLSystemPartitions, this)),
"Failed to schedule: RebuildYQLSystemPartitions");
}, wait_time);
}

} // namespace master
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
// Should be bumped up when tablet locations are changed.
std::atomic<uintptr_t> tablet_locations_version_{0};

static constexpr int kDefaultYQLPartitionsRefreshBgTaskSleepSecs = 10;
rpc::ScheduledTaskTracker refresh_yql_partitions_task_;

DISALLOW_COPY_AND_ASSIGN(CatalogManager);
};
Expand Down
37 changes: 37 additions & 0 deletions src/yb/rpc/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#include "yb/rpc/scheduler.h"

#include <thread>

#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>

Expand All @@ -26,8 +28,10 @@
#include <glog/logging.h>

#include "yb/util/errno.h"
#include "yb/util/logging.h"
#include "yb/util/status.h"

using namespace std::literals;
using namespace std::placeholders;
using boost::multi_index::const_mem_fun;
using boost::multi_index::hashed_unique;
Expand All @@ -36,6 +40,12 @@ using boost::multi_index::ordered_non_unique;
namespace yb {
namespace rpc {

namespace {

constexpr int64_t kShutdownMark = -(1ULL << 32U);

}

class Scheduler::Impl {
public:
explicit Impl(IoService* io_service)
Expand Down Expand Up @@ -186,5 +196,32 @@ IoService& Scheduler::io_service() {
return impl_->io_service();
}

void ScheduledTaskTracker::Abort() {
auto last_scheduled_task_id = last_scheduled_task_id_.load(std::memory_order_acquire);
if (last_scheduled_task_id != rpc::kInvalidTaskId) {
scheduler_->Abort(last_scheduled_task_id);
}
}

void ScheduledTaskTracker::StartShutdown() {
auto num_scheduled = num_scheduled_.load(std::memory_order_acquire);
while (num_scheduled >= 0) {
num_scheduled_.compare_exchange_strong(num_scheduled, num_scheduled + kShutdownMark);
}
}

void ScheduledTaskTracker::CompleteShutdown() {
for (;;) {
auto left = num_scheduled_.load(std::memory_order_acquire) - kShutdownMark;
if (left <= 0) {
LOG_IF(DFATAL, left < 0) << "Negative number of tasks left: " << left;
break;
}
YB_LOG_EVERY_N_SECS(INFO, 1) << "Waiting " << left << " tasks to complete";
Abort();
std::this_thread::sleep_for(1ms);
}
}

} // namespace rpc
} // namespace yb
46 changes: 46 additions & 0 deletions src/yb/rpc/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,52 @@ class Scheduler {
std::unique_ptr<Impl> impl_;
};

class ScheduledTaskTracker {
public:
ScheduledTaskTracker() = default;

explicit ScheduledTaskTracker(Scheduler* scheduler) : scheduler_(DCHECK_NOTNULL(scheduler)) {}

void Bind(Scheduler* scheduler) {
scheduler_ = scheduler;
}

template <class F>
void Schedule(const F& f, std::chrono::steady_clock::duration delay) {
Schedule(f, std::chrono::steady_clock::now() + delay);
}

template <class F>
void Schedule(const F& f, std::chrono::steady_clock::time_point time) {
Abort();
if (++num_scheduled_ < 0) { // Shutting down
--num_scheduled_;
return;
}
last_scheduled_task_id_ = scheduler_->Schedule(
[this, f](ScheduledTaskId task_id, const Status& status) {
last_scheduled_task_id_.compare_exchange_strong(task_id, rpc::kInvalidTaskId);
f(status);
--num_scheduled_;
}, time);
}

void Abort();

void StartShutdown();
void CompleteShutdown();

void Shutdown() {
StartShutdown();
CompleteShutdown();
}

private:
Scheduler* scheduler_ = nullptr;
std::atomic<int64_t> num_scheduled_{0};
std::atomic<rpc::ScheduledTaskId> last_scheduled_task_id_{rpc::kInvalidTaskId};
};

} // namespace rpc
} // namespace yb

Expand Down
2 changes: 2 additions & 0 deletions src/yb/util/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "yb/gutil/strings/substitute.h"
#include "yb/gutil/sysinfo.h"

#include "yb/util/debug/long_operation_tracker.h"
#include "yb/util/errno.h"
#include "yb/util/logging.h"
#include "yb/util/metrics.h"
Expand Down Expand Up @@ -612,6 +613,7 @@ void ThreadPool::DispatchThread(bool permanent) {

// Execute the task
{
LongOperationTracker long_operation_tracker("Thread Pool Task", std::chrono::seconds(5));
MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
task.runnable->Run();
int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
Expand Down

0 comments on commit 8c98f6c

Please sign in to comment.