diff --git a/src/yb/master/master.cc b/src/yb/master/master.cc index ff580922b82d..816080dab5c1 100644 --- a/src/yb/master/master.cc +++ b/src/yb/master/master.cc @@ -222,19 +222,19 @@ Status Master::Init() { return Status::OK(); } -Status Master::InitAutoFlags() { +Status Master::InitAutoFlags(rpc::Messenger* messenger) { // Will we be in shell mode if we dont have a sys catalog yet? bool is_shell_mode_if_new = FLAGS_master_join_existing_universe || !opts().AreMasterAddressesProvided(); RETURN_NOT_OK(auto_flags_manager_->Init( - options_.HostsString(), + messenger, [this]() { return fs_manager_->LookupTablet(kSysCatalogTabletId); } /* has_sys_catalog_func */, is_shell_mode_if_new)); - return RpcAndWebServerBase::InitAutoFlags(); + return RpcAndWebServerBase::InitAutoFlags(messenger); } Result> Master::GetAvailableAutoFlagsForServer() const { @@ -246,7 +246,7 @@ Status Master::InitAutoFlagsFromMasterLeader(const HostPort& leader_address) { opts().IsShellMode(), IllegalState, "Cannot load AutoFlags from another master when not in shell mode."); - return auto_flags_manager_->LoadFromMasterLeader(options_.HostsString(), {{leader_address}}); + return auto_flags_manager_->LoadFromMasterLeader({{leader_address}}); } MonoDelta Master::default_client_timeout() { diff --git a/src/yb/master/master.h b/src/yb/master/master.h index 4b6072f0ea5b..307077199c5e 100644 --- a/src/yb/master/master.h +++ b/src/yb/master/master.h @@ -86,7 +86,7 @@ class Master : public tserver::DbServerBase { explicit Master(const MasterOptions& opts); virtual ~Master(); - virtual Status InitAutoFlags() override; + virtual Status InitAutoFlags(rpc::Messenger* messenger) override; Status InitAutoFlagsFromMasterLeader(const HostPort& leader_address); Status Init() override; Status Start() override; diff --git a/src/yb/master/master_auto_flags_manager.cc b/src/yb/master/master_auto_flags_manager.cc index 1634ff2106f1..19101bcaef5a 100644 --- a/src/yb/master/master_auto_flags_manager.cc +++ b/src/yb/master/master_auto_flags_manager.cc @@ -241,9 +241,8 @@ MasterAutoFlagsManager::MasterAutoFlagsManager( update_lock_(mutex_, std::defer_lock) {} Status MasterAutoFlagsManager::Init( - const std::string& local_hosts, std::function has_sys_catalog_func, - bool is_shell_mode) { - RETURN_NOT_OK(AutoFlagsManagerBase::Init(local_hosts)); + rpc::Messenger* messenger, std::function has_sys_catalog_func, bool is_shell_mode) { + RETURN_NOT_OK(AutoFlagsManagerBase::Init(messenger)); if (VERIFY_RESULT(LoadFromFile())) { return Status::OK(); @@ -266,8 +265,8 @@ Status MasterAutoFlagsManager::Init( } Status MasterAutoFlagsManager::LoadFromMasterLeader( - const std::string& local_hosts, const server::MasterAddresses& master_addresses) { - return AutoFlagsManagerBase::LoadFromMasterLeader(local_hosts, master_addresses); + const server::MasterAddresses& master_addresses) { + return AutoFlagsManagerBase::LoadFromMasterLeader(master_addresses); } Status MasterAutoFlagsManager::LoadNewConfig(const AutoFlagsConfigPB new_config) { diff --git a/src/yb/master/master_auto_flags_manager.h b/src/yb/master/master_auto_flags_manager.h index c1bee966beea..f06d37f4bc8d 100644 --- a/src/yb/master/master_auto_flags_manager.h +++ b/src/yb/master/master_auto_flags_manager.h @@ -14,7 +14,10 @@ #pragma once #include "yb/server/auto_flags_manager_base.h" + #include "yb/master/master_cluster.pb.h" +#include "yb/util/flags/auto_flags_util.h" +#include "yb/util/unique_lock.h" namespace yb { namespace master { @@ -49,11 +52,9 @@ class MasterAutoFlagsManager : public AutoFlagsManagerBase { virtual ~MasterAutoFlagsManager() {} Status Init( - const std::string& local_hosts, std::function has_sys_catalog_func, - bool is_shell_mode); + rpc::Messenger* messenger, std::function has_sys_catalog_func, bool is_shell_mode); - Status LoadFromMasterLeader( - const std::string& local_hosts, const server::MasterAddresses& master_addresses); + Status LoadFromMasterLeader(const server::MasterAddresses& master_addresses); Status ProcessAutoFlagsConfigOperation(const AutoFlagsConfigPB new_config) override; diff --git a/src/yb/server/auto_flags_manager_base.cc b/src/yb/server/auto_flags_manager_base.cc index fa850a16843b..a289b6978985 100644 --- a/src/yb/server/auto_flags_manager_base.cc +++ b/src/yb/server/auto_flags_manager_base.cc @@ -18,14 +18,10 @@ #include "yb/gutil/strings/join.h" -#include "yb/master/master_cluster.pb.h" - #include "yb/rpc/messenger.h" -#include "yb/rpc/secure_stream.h" - -#include "yb/rpc/secure.h" #include "yb/util/flags/auto_flags.h" +#include "yb/util/flags/auto_flags_util.h" #include "yb/util/net/net_util.h" #include "yb/util/flags.h" #include "yb/util/logging.h" @@ -100,25 +96,9 @@ AutoFlagsManagerBase::AutoFlagsManagerBase( current_config_.set_config_version(kInvalidAutoFlagsConfigVersion); } -AutoFlagsManagerBase::~AutoFlagsManagerBase() { - if (messenger_) { - messenger_->Shutdown(); - } -} - -Status AutoFlagsManagerBase::Init(const std::string& local_hosts) { - rpc::MessengerBuilder messenger_builder("auto_flags_client"); - secure_context_ = VERIFY_RESULT(rpc::SetupInternalSecureContext( - local_hosts, fs_manager_->GetDefaultRootDir(), &messenger_builder)); - - messenger_ = VERIFY_RESULT(messenger_builder.Build()); - - if (PREDICT_FALSE(FLAGS_TEST_running_test)) { - std::vector host_ports; - RETURN_NOT_OK(HostPort::ParseStrings(local_hosts, 0 /* default_port */, &host_ports)); - messenger_->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress(host_ports[0].host()))); - } - +Status AutoFlagsManagerBase::Init(rpc::Messenger* server_messenger) { + SCHECK_NOTNULL(server_messenger); + server_messenger_ = server_messenger; return Status::OK(); } @@ -150,17 +130,16 @@ Result AutoFlagsManagerBase::LoadFromFile() { Result> AutoFlagsManagerBase::GetAutoFlagConfigFromMasterLeader( const std::string& master_addresses) { + SCHECK_NOTNULL(server_messenger_); auto client = VERIFY_RESULT(yb::client::YBClientBuilder() .add_master_server_addr(master_addresses) .default_admin_operation_timeout(MonoDelta::FromSeconds( FLAGS_yb_client_admin_operation_timeout_sec)) - .Build(messenger_.get())); - + .Build(server_messenger_)); return client->GetAutoFlagConfig(); } -Status AutoFlagsManagerBase::LoadFromMasterLeader( - const std::string& local_hosts, const server::MasterAddresses& master_addresses) { +Status AutoFlagsManagerBase::LoadFromMasterLeader(const server::MasterAddresses& master_addresses) { if (FLAGS_disable_auto_flags_management) { LOG(WARNING) << "AutoFlags management is disabled."; return Status::OK(); @@ -258,7 +237,8 @@ Status AutoFlagsManagerBase::LoadFromConfigUnlocked( const auto delay = VERIFY_RESULT(GetTimeLeftToApplyConfig()); if (delay) { LOG(INFO) << "New AutoFlags config will be applied in " << delay; - RETURN_NOT_OK(messenger_->ScheduleOnReactor( + SCHECK_NOTNULL(server_messenger_); + RETURN_NOT_OK(server_messenger_->ScheduleOnReactor( std::bind( &AutoFlagsManagerBase::AsyncApplyConfig, this, current_config_.config_version(), apply_non_runtime), diff --git a/src/yb/server/auto_flags_manager_base.h b/src/yb/server/auto_flags_manager_base.h index 526db0b29096..2d76aab061ad 100644 --- a/src/yb/server/auto_flags_manager_base.h +++ b/src/yb/server/auto_flags_manager_base.h @@ -15,13 +15,9 @@ #include -#include "yb/common/hybrid_time.h" #include "yb/common/wire_protocol.pb.h" #include "yb/server/server_base_options.h" -#include "yb/util/locks.h" -#include "yb/util/status.h" -#include "yb/util/flags/auto_flags_util.h" -#include "yb/util/unique_lock.h" +#include "yb/util/status_fwd.h" namespace yb { @@ -55,11 +51,11 @@ class AutoFlagsManagerBase { explicit AutoFlagsManagerBase( const std::string& process_name, const scoped_refptr& clock, FsManager* fs_manager); - virtual ~AutoFlagsManagerBase(); + virtual ~AutoFlagsManagerBase() = default; MonoDelta GetApplyDelay() const; - Status Init(const std::string& local_hosts); + Status Init(rpc::Messenger* server_messenger); // Returns true if the load was successful, false if the file was not found. // Returns true without doing any work if AutoFlags management is disabled. @@ -67,9 +63,7 @@ class AutoFlagsManagerBase { // local_hosts is a comma separated list of ip addresses and ports. // Returns Status::OK without doing any work if AutoFlags management is disabled. - Status LoadFromMasterLeader( - const std::string& local_hosts, const server::MasterAddresses& master_addresses) - EXCLUDES(mutex_); + Status LoadFromMasterLeader(const server::MasterAddresses& master_addresses) EXCLUDES(mutex_); Status LoadFromConfigUnlocked( const AutoFlagsConfigPB new_config, ApplyNonRuntimeAutoFlags apply_non_runtime, @@ -105,8 +99,7 @@ class AutoFlagsManagerBase { const std::string process_name_; - std::unique_ptr secure_context_; - std::unique_ptr messenger_; + rpc::Messenger* server_messenger_ = nullptr; DISALLOW_COPY_AND_ASSIGN(AutoFlagsManagerBase); }; diff --git a/src/yb/server/server_base.cc b/src/yb/server/server_base.cc index 4641fb697121..c5a8b8fd2948 100644 --- a/src/yb/server/server_base.cc +++ b/src/yb/server/server_base.cc @@ -131,6 +131,8 @@ METRIC_DEFINE_gauge_uint64(server, untracked_memory, "Untracked memory", yb::MetricUnit::kBytes, "The amount of memory not tracked by MemTracker"); +DECLARE_bool(TEST_running_test); + using namespace std::literals; using namespace std::placeholders; @@ -291,7 +293,7 @@ Status RpcServerBase::SetupMessengerBuilder(rpc::MessengerBuilder* builder) { return Status::OK(); } -Status RpcServerBase::InitAutoFlags() { return Status::OK(); } +Status RpcServerBase::InitAutoFlags(rpc::Messenger* messenger) { return Status::OK(); } Status RpcServerBase::Init() { CHECK(!initialized_); @@ -311,8 +313,6 @@ Status RpcServerBase::Init() { RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock"); } - RETURN_NOT_OK(InitAutoFlags()); - // Create the Messenger. rpc::MessengerBuilder builder(name_); builder.UseDefaultConnectionContextFactory(mem_tracker()); @@ -320,6 +320,15 @@ Status RpcServerBase::Init() { messenger_ = VERIFY_RESULT(builder.Build()); proxy_cache_ = std::make_unique(messenger_.get()); + if (PREDICT_FALSE(FLAGS_TEST_running_test)) { + std::vector host_ports; + RETURN_NOT_OK( + HostPort::ParseStrings(options_.HostsString(), 0 /* default_port */, &host_ports)); + messenger_->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress(host_ports[0].host()))); + } + + RETURN_NOT_OK(InitAutoFlags(messenger_.get())); + RETURN_NOT_OK(rpc_server_->Init(messenger_.get())); RETURN_NOT_OK(rpc_server_->Bind()); clock_->RegisterMetrics(metric_entity_); @@ -538,7 +547,7 @@ Status RpcAndWebServerBase::Init() { return Status::OK(); } -Status RpcAndWebServerBase::InitAutoFlags() { +Status RpcAndWebServerBase::InitAutoFlags(rpc::Messenger* messenger) { auto process_auto_flags_result = GetAvailableAutoFlagsForServer(); if (!process_auto_flags_result) { LOG(WARNING) << "Unable to get the AutoFlags for this process: " @@ -547,7 +556,7 @@ Status RpcAndWebServerBase::InitAutoFlags() { web_server_->SetAutoFlags(std::move(*process_auto_flags_result)); } - return RpcServerBase::InitAutoFlags(); + return RpcServerBase::InitAutoFlags(messenger); } void RpcAndWebServerBase::GetStatusPB(ServerStatusPB* status) const { diff --git a/src/yb/server/server_base.h b/src/yb/server/server_base.h index ec11ab0a5b24..317ab8c10fd6 100644 --- a/src/yb/server/server_base.h +++ b/src/yb/server/server_base.h @@ -118,7 +118,7 @@ class RpcServerBase { const scoped_refptr& clock = nullptr); virtual ~RpcServerBase(); - virtual Status InitAutoFlags(); + virtual Status InitAutoFlags(rpc::Messenger* messenger); virtual Status Init(); virtual Status Start(); @@ -207,7 +207,7 @@ class RpcAndWebServerBase : public RpcServerBase { const std::string caption, const std::string url); Status Init() override; - Status InitAutoFlags() override; + Status InitAutoFlags(rpc::Messenger* messenger) override; Status Start() override; void Shutdown() override; diff --git a/src/yb/tserver/tablet_server.cc b/src/yb/tserver/tablet_server.cc index 444625c2ce7e..8fbf3206a343 100644 --- a/src/yb/tserver/tablet_server.cc +++ b/src/yb/tserver/tablet_server.cc @@ -513,10 +513,10 @@ Status TabletServer::Init() { return Status::OK(); } -Status TabletServer::InitAutoFlags() { - RETURN_NOT_OK(auto_flags_manager_->Init(options_.HostsString(), *opts_.GetMasterAddresses())); +Status TabletServer::InitAutoFlags(rpc::Messenger* messenger) { + RETURN_NOT_OK(auto_flags_manager_->Init(messenger, *opts_.GetMasterAddresses())); - return RpcAndWebServerBase::InitAutoFlags(); + return RpcAndWebServerBase::InitAutoFlags(messenger); } Result> TabletServer::GetAvailableAutoFlagsForServer() const { diff --git a/src/yb/tserver/tablet_server.h b/src/yb/tserver/tablet_server.h index fc4357f88f60..909b7dfb4a1e 100644 --- a/src/yb/tserver/tablet_server.h +++ b/src/yb/tserver/tablet_server.h @@ -117,7 +117,7 @@ class TabletServer : public DbServerBase, public TabletServerIf { // complete by calling WaitInited(). Status Init() override; - virtual Status InitAutoFlags() override; + virtual Status InitAutoFlags(rpc::Messenger* messenger) override; Status GetRegistration(ServerRegistrationPB* reg, server::RpcOnly rpc_only = server::RpcOnly::kFalse) const override; diff --git a/src/yb/tserver/tserver_auto_flags_manager.cc b/src/yb/tserver/tserver_auto_flags_manager.cc index b0af505885d8..a59e72bf4a3b 100644 --- a/src/yb/tserver/tserver_auto_flags_manager.cc +++ b/src/yb/tserver/tserver_auto_flags_manager.cc @@ -25,14 +25,14 @@ TserverAutoFlagsManager::TserverAutoFlagsManager( : AutoFlagsManagerBase(kYbTserverProcessName, clock, fs_manager) {} Status TserverAutoFlagsManager::Init( - const std::string& local_hosts, const server::MasterAddresses& master_addresses) { - RETURN_NOT_OK(AutoFlagsManagerBase::Init(local_hosts)); + rpc::Messenger* messenger, const server::MasterAddresses& master_addresses) { + RETURN_NOT_OK(AutoFlagsManagerBase::Init(messenger)); if (VERIFY_RESULT(LoadFromFile())) { return Status::OK(); } - return LoadFromMasterLeader(local_hosts, master_addresses); + return LoadFromMasterLeader(master_addresses); } Status TserverAutoFlagsManager::ProcessAutoFlagsConfigOperation( diff --git a/src/yb/tserver/tserver_auto_flags_manager.h b/src/yb/tserver/tserver_auto_flags_manager.h index 0a893eb63377..fe2807f496de 100644 --- a/src/yb/tserver/tserver_auto_flags_manager.h +++ b/src/yb/tserver/tserver_auto_flags_manager.h @@ -15,6 +15,8 @@ #include "yb/server/auto_flags_manager_base.h" +#include "yb/common/hybrid_time.h" + namespace yb { namespace tserver { @@ -31,7 +33,7 @@ class TserverAutoFlagsManager : public AutoFlagsManagerBase { virtual ~TserverAutoFlagsManager() {} - Status Init(const std::string& local_hosts, const server::MasterAddresses& master_addresses); + Status Init(rpc::Messenger* server_messenger, const server::MasterAddresses& master_addresses); Status ProcessAutoFlagsConfigOperation(const AutoFlagsConfigPB new_config) override;