From 8c98f6c61e55d0f54aebac6dcf7ac864507b6191 Mon Sep 17 00:00:00 2001 From: Sergei Politov Date: Wed, 6 Jan 2021 12:42:40 +0300 Subject: [PATCH] [#6747] Don't block thread while waiting for rebuild YCQL system table cache Summary: `RebuildYQLSystemPartitions` blocks thread while waiting for rebuild interval. It prevents master shutdown until sleep finishes. So restarting cluster with 3 masters could take several minutes. Fixed by replacing with wait using scheduler. Test Plan: ybd --gtest_filter BackupTxnTest.DeleteTableWithMastersRestart -n 20 Reviewers: bogdan, zyu, nicolas Reviewed By: zyu, nicolas Subscribers: dsrinivasan, zyu, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D10220 --- src/yb/consensus/raft_consensus.cc | 41 ++++++--------------- src/yb/consensus/raft_consensus.h | 7 ++-- src/yb/integration-tests/mini_cluster.cc | 7 ++++ src/yb/master/catalog_manager.cc | 32 ++++++++--------- src/yb/master/catalog_manager.h | 2 +- src/yb/rpc/scheduler.cc | 37 +++++++++++++++++++ src/yb/rpc/scheduler.h | 46 ++++++++++++++++++++++++ src/yb/util/threadpool.cc | 2 ++ 8 files changed, 122 insertions(+), 52 deletions(-) diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index 0dea6efaff15..09a883a87f71 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -360,6 +360,7 @@ RaftConsensus::RaftConsensus( queue_(std::move(queue)), rng_(GetRandomSeed32()), withhold_votes_until_(MonoTime::Min()), + step_down_check_tracker_(&peer_proxy_factory_->messenger()->scheduler()), mark_dirty_clbk_(std::move(mark_dirty_clbk)), shutdown_(false), follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter( @@ -707,19 +708,13 @@ Status RaftConsensus::StartStepDownUnlocked(const RaftPeerPB& peer, bool gracefu graceful ? std::string() : peer.permanent_uuid(), MonoDelta()); } -void RaftConsensus::CheckDelayedStepDown(rpc::ScheduledTaskId task_id, const Status& status) { +void RaftConsensus::CheckDelayedStepDown(const Status& status) { ReplicaState::UniqueLock lock; auto lock_status = state_->LockForConfigChange(&lock); if (!lock_status.ok()) { LOG_WITH_PREFIX(INFO) << "Failed to check delayed election: " << lock_status; - lock = state_->LockForRead(); - --num_scheduled_step_down_checks_; return; } - --num_scheduled_step_down_checks_; - if (task_id == last_scheduled_step_down_check_task_id_) { - last_scheduled_step_down_check_task_id_ = rpc::kInvalidTaskId; - } if (state_->GetCurrentTermUnlocked() != delayed_step_down_.term) { return; @@ -846,13 +841,8 @@ Status RaftConsensus::StepDown(const LeaderStepDownRequestPB* req, LeaderStepDow .graceful = graceful_stepdown, }; LOG_WITH_PREFIX(INFO) << "Delay step down: " << delayed_step_down_.ToString(); - auto& scheduler = peer_proxy_factory_->messenger()->scheduler(); - if (last_scheduled_step_down_check_task_id_ != rpc::kInvalidTaskId) { - scheduler.Abort(last_scheduled_step_down_check_task_id_); - } - ++num_scheduled_step_down_checks_; - last_scheduled_step_down_check_task_id_ = scheduler.Schedule( - std::bind(&RaftConsensus::CheckDelayedStepDown, this, _1, _2), + step_down_check_tracker_.Schedule( + std::bind(&RaftConsensus::CheckDelayedStepDown, this, _1), 1ms * timeout_ms); return Status::OK(); } @@ -2495,24 +2485,13 @@ void RaftConsensus::Shutdown() { CHECK_OK(ExecuteHook(PRE_SHUTDOWN)); - for (;;) { - { - ReplicaState::UniqueLock lock; - // Transition to kShuttingDown state. - CHECK_OK(state_->LockForShutdown(&lock)); - if (num_scheduled_step_down_checks_ == 0) { - LOG_WITH_PREFIX(INFO) << "Raft consensus shutting down."; - break; - } - YB_LOG_EVERY_N_SECS(INFO, 1) << LogPrefix() << "Waiting " << num_scheduled_step_down_checks_ - << " step down checks to complete"; - if (last_scheduled_step_down_check_task_id_ != rpc::kInvalidTaskId) { - peer_proxy_factory_->messenger()->scheduler().Abort( - last_scheduled_step_down_check_task_id_); - } - } - std::this_thread::sleep_for(1ms); + { + ReplicaState::UniqueLock lock; + // Transition to kShuttingDown state. + CHECK_OK(state_->LockForShutdown(&lock)); + step_down_check_tracker_.StartShutdown(); } + step_down_check_tracker_.CompleteShutdown(); // Close the peer manager. peer_manager_->Close(); diff --git a/src/yb/consensus/raft_consensus.h b/src/yb/consensus/raft_consensus.h index ee2ecd34fc0e..4dcc1704be1e 100644 --- a/src/yb/consensus/raft_consensus.h +++ b/src/yb/consensus/raft_consensus.h @@ -48,6 +48,8 @@ #include "yb/consensus/consensus_meta.h" #include "yb/consensus/consensus_queue.h" +#include "yb/rpc/scheduler.h" + #include "yb/util/opid.h" #include "yb/util/random.h" #include "yb/util/result.h" @@ -627,7 +629,7 @@ class RaftConsensus : public std::enable_shared_from_this, CHECKED_STATUS StartStepDownUnlocked(const RaftPeerPB& peer, bool graceful); // Checked whether we should start step down when protege did not synchronize before timeout. - void CheckDelayedStepDown(rpc::ScheduledTaskId task_id, const Status& status); + void CheckDelayedStepDown(const Status& status); // Threadpool token for constructing requests to peers, handling RPC callbacks, // etc. @@ -674,8 +676,7 @@ class RaftConsensus : public std::enable_shared_from_this, }; DelayedStepDown delayed_step_down_; - int num_scheduled_step_down_checks_ = 0; - rpc::ScheduledTaskId last_scheduled_step_down_check_task_id_ = rpc::kInvalidTaskId; + rpc::ScheduledTaskTracker step_down_check_tracker_; // The number of times this node has called and lost a leader election since // the last time it saw a stable leader (either itself or another node). diff --git a/src/yb/integration-tests/mini_cluster.cc b/src/yb/integration-tests/mini_cluster.cc index eead3c392722..a2e92c81138f 100644 --- a/src/yb/integration-tests/mini_cluster.cc +++ b/src/yb/integration-tests/mini_cluster.cc @@ -57,6 +57,7 @@ #include "yb/tserver/tablet_server.h" #include "yb/tserver/ts_tablet_manager.h" +#include "yb/util/debug/long_operation_tracker.h" #include "yb/util/path_util.h" #include "yb/util/random_util.h" #include "yb/util/scope_exit.h" @@ -265,9 +266,15 @@ Status MiniCluster::RestartSync() { } LOG(INFO) << "Restart master server(s)..."; for (auto& master_server : mini_masters_) { + LOG(INFO) << "Restarting master " << master_server->permanent_uuid(); + LongOperationTracker long_operation_tracker("Master restart", 5s); CHECK_OK(master_server->Restart()); + LOG(INFO) << "Waiting for catalog manager at " << master_server->permanent_uuid(); CHECK_OK(master_server->WaitForCatalogManagerInit()); } + LOG(INFO) << string(80, '-'); + LOG(INFO) << __FUNCTION__ << " done"; + LOG(INFO) << string(80, '-'); RETURN_NOT_OK_PREPEND(WaitForAllTabletServers(), "Waiting for tablet servers to start"); diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 7e9a317cd26f..0f8149487a3f 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -125,7 +125,6 @@ #include "yb/master/yql_views_vtable.h" #include "yb/tserver/ts_tablet_manager.h" -#include "yb/rpc/messenger.h" #include "yb/tablet/operations/change_metadata_operation.h" #include "yb/tablet/tablet.h" @@ -553,6 +552,8 @@ bool IsIndexBackfillEnabled(TableType table_type, bool is_transactional) { return !disabled; } +constexpr auto kDefaultYQLPartitionsRefreshBgTaskSleep = 10s; + } // anonymous namespace //////////////////////////////////////////////////////////// @@ -1460,6 +1461,8 @@ bool CatalogManager::StartShutdown() { state_ = kClosing; } + refresh_yql_partitions_task_.StartShutdown(); + if (sys_catalog_) { sys_catalog_->StartShutdown(); } @@ -1469,6 +1472,7 @@ bool CatalogManager::StartShutdown() { void CatalogManager::CompleteShutdown() { // Shutdown the Catalog Manager background thread (load balancing). + refresh_yql_partitions_task_.CompleteShutdown(); if (background_tasks_) { background_tasks_->Shutdown(); } @@ -6869,6 +6873,7 @@ Status CatalogManager::EnableBgTasks() { RETURN_NOT_OK_PREPEND(background_tasks_->Init(), "Failed to initialize catalog manager background tasks"); // Add bg thread to rebuild the system partitions thread. + refresh_yql_partitions_task_.Bind(&master_->messenger()->scheduler()); RETURN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( std::bind(&CatalogManager::RebuildYQLSystemPartitions, this))); return Status::OK(); @@ -8742,23 +8747,16 @@ void CatalogManager::RebuildYQLSystemPartitions() { } } - while (true) { - // Allow for FLAGS_partitions_vtable_cache_refresh_secs to be changed on the fly. If set to 0, - // then this thread will sleep and awake every kDefaultYQLPartitionsRefreshBgTaskSleepSecs to - // check if the flag has changed again. - int sleep_secs = FLAGS_partitions_vtable_cache_refresh_secs; - if (sleep_secs <= 0) { - sleep_secs = kDefaultYQLPartitionsRefreshBgTaskSleepSecs; - } - SleepFor(MonoDelta::FromSeconds(sleep_secs)); - const auto s = background_tasks_thread_pool_->SubmitFunc( - std::bind(&CatalogManager::RebuildYQLSystemPartitions, this)); - if (s.ok() || s.IsServiceUnavailable()) { - // Either succesfully started new task, or we are shutting down. - return; - } - LOG(WARNING) << "Could not submit RebuildYQLSystemPartitions to thread pool: " << s.ToString(); + auto wait_time = FLAGS_partitions_vtable_cache_refresh_secs * 1s; + if (wait_time <= 0s) { + wait_time = kDefaultYQLPartitionsRefreshBgTaskSleep; } + refresh_yql_partitions_task_.Schedule([this](const Status& status) { + WARN_NOT_OK( + background_tasks_thread_pool_->SubmitFunc( + std::bind(&CatalogManager::RebuildYQLSystemPartitions, this)), + "Failed to schedule: RebuildYQLSystemPartitions"); + }, wait_time); } } // namespace master diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index eb0feb208e4e..a25d71c03f40 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -1381,7 +1381,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf { // Should be bumped up when tablet locations are changed. std::atomic tablet_locations_version_{0}; - static constexpr int kDefaultYQLPartitionsRefreshBgTaskSleepSecs = 10; + rpc::ScheduledTaskTracker refresh_yql_partitions_task_; DISALLOW_COPY_AND_ASSIGN(CatalogManager); }; diff --git a/src/yb/rpc/scheduler.cc b/src/yb/rpc/scheduler.cc index 8ce91668ca8f..595673fce898 100644 --- a/src/yb/rpc/scheduler.cc +++ b/src/yb/rpc/scheduler.cc @@ -15,6 +15,8 @@ #include "yb/rpc/scheduler.h" +#include + #include #include @@ -26,8 +28,10 @@ #include #include "yb/util/errno.h" +#include "yb/util/logging.h" #include "yb/util/status.h" +using namespace std::literals; using namespace std::placeholders; using boost::multi_index::const_mem_fun; using boost::multi_index::hashed_unique; @@ -36,6 +40,12 @@ using boost::multi_index::ordered_non_unique; namespace yb { namespace rpc { +namespace { + +constexpr int64_t kShutdownMark = -(1ULL << 32U); + +} + class Scheduler::Impl { public: explicit Impl(IoService* io_service) @@ -186,5 +196,32 @@ IoService& Scheduler::io_service() { return impl_->io_service(); } +void ScheduledTaskTracker::Abort() { + auto last_scheduled_task_id = last_scheduled_task_id_.load(std::memory_order_acquire); + if (last_scheduled_task_id != rpc::kInvalidTaskId) { + scheduler_->Abort(last_scheduled_task_id); + } +} + +void ScheduledTaskTracker::StartShutdown() { + auto num_scheduled = num_scheduled_.load(std::memory_order_acquire); + while (num_scheduled >= 0) { + num_scheduled_.compare_exchange_strong(num_scheduled, num_scheduled + kShutdownMark); + } +} + +void ScheduledTaskTracker::CompleteShutdown() { + for (;;) { + auto left = num_scheduled_.load(std::memory_order_acquire) - kShutdownMark; + if (left <= 0) { + LOG_IF(DFATAL, left < 0) << "Negative number of tasks left: " << left; + break; + } + YB_LOG_EVERY_N_SECS(INFO, 1) << "Waiting " << left << " tasks to complete"; + Abort(); + std::this_thread::sleep_for(1ms); + } +} + } // namespace rpc } // namespace yb diff --git a/src/yb/rpc/scheduler.h b/src/yb/rpc/scheduler.h index bc8f865623a3..6a7c682c2d62 100644 --- a/src/yb/rpc/scheduler.h +++ b/src/yb/rpc/scheduler.h @@ -114,6 +114,52 @@ class Scheduler { std::unique_ptr impl_; }; +class ScheduledTaskTracker { + public: + ScheduledTaskTracker() = default; + + explicit ScheduledTaskTracker(Scheduler* scheduler) : scheduler_(DCHECK_NOTNULL(scheduler)) {} + + void Bind(Scheduler* scheduler) { + scheduler_ = scheduler; + } + + template + void Schedule(const F& f, std::chrono::steady_clock::duration delay) { + Schedule(f, std::chrono::steady_clock::now() + delay); + } + + template + void Schedule(const F& f, std::chrono::steady_clock::time_point time) { + Abort(); + if (++num_scheduled_ < 0) { // Shutting down + --num_scheduled_; + return; + } + last_scheduled_task_id_ = scheduler_->Schedule( + [this, f](ScheduledTaskId task_id, const Status& status) { + last_scheduled_task_id_.compare_exchange_strong(task_id, rpc::kInvalidTaskId); + f(status); + --num_scheduled_; + }, time); + } + + void Abort(); + + void StartShutdown(); + void CompleteShutdown(); + + void Shutdown() { + StartShutdown(); + CompleteShutdown(); + } + + private: + Scheduler* scheduler_ = nullptr; + std::atomic num_scheduled_{0}; + std::atomic last_scheduled_task_id_{rpc::kInvalidTaskId}; +}; + } // namespace rpc } // namespace yb diff --git a/src/yb/util/threadpool.cc b/src/yb/util/threadpool.cc index 51e3949abc76..3eed9ca76ed3 100644 --- a/src/yb/util/threadpool.cc +++ b/src/yb/util/threadpool.cc @@ -46,6 +46,7 @@ #include "yb/gutil/strings/substitute.h" #include "yb/gutil/sysinfo.h" +#include "yb/util/debug/long_operation_tracker.h" #include "yb/util/errno.h" #include "yb/util/logging.h" #include "yb/util/metrics.h" @@ -612,6 +613,7 @@ void ThreadPool::DispatchThread(bool permanent) { // Execute the task { + LongOperationTracker long_operation_tracker("Thread Pool Task", std::chrono::seconds(5)); MicrosecondsInt64 start_wall_us = GetMonoTimeMicros(); task.runnable->Run(); int64_t wall_us = GetMonoTimeMicros() - start_wall_us;