Skip to content

Commit

Permalink
[#22076] DocDB: Reuse Server messenger in AutoFlags
Browse files Browse the repository at this point in the history
Summary:
Messenger launches 4 extra reactor threads.
With this change we reuse the messenger from the sever in AutoFlags instead of creating its own messenger.
The derived classes `MasterAutoFlagsManager` and `TserverAutoFlagsManager` are destroyed before the servers messenger.

We continue to create a temp YbClient object when fetching AutoFlags from master since the servers client is started much later. Also this operation only happens the very first time a node is added to a cluster.

Fixes #22076
Jira: DB-10998

Test Plan: Jenkins

Reviewers: sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D34307
  • Loading branch information
hari90 committed Apr 19, 2024
1 parent fa7c8bb commit e6bb62a
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 70 deletions.
8 changes: 4 additions & 4 deletions src/yb/master/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,19 @@ Status Master::Init() {
return Status::OK();
}

Status Master::InitAutoFlags() {
Status Master::InitAutoFlags(rpc::Messenger* messenger) {
// Will we be in shell mode if we dont have a sys catalog yet?
bool is_shell_mode_if_new =
FLAGS_master_join_existing_universe || !opts().AreMasterAddressesProvided();

RETURN_NOT_OK(auto_flags_manager_->Init(
options_.HostsString(),
messenger,
[this]() {
return fs_manager_->LookupTablet(kSysCatalogTabletId);
} /* has_sys_catalog_func */,
is_shell_mode_if_new));

return RpcAndWebServerBase::InitAutoFlags();
return RpcAndWebServerBase::InitAutoFlags(messenger);
}

Result<std::unordered_set<std::string>> Master::GetAvailableAutoFlagsForServer() const {
Expand All @@ -246,7 +246,7 @@ Status Master::InitAutoFlagsFromMasterLeader(const HostPort& leader_address) {
opts().IsShellMode(), IllegalState,
"Cannot load AutoFlags from another master when not in shell mode.");

return auto_flags_manager_->LoadFromMasterLeader(options_.HostsString(), {{leader_address}});
return auto_flags_manager_->LoadFromMasterLeader({{leader_address}});
}

MonoDelta Master::default_client_timeout() {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/master.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class Master : public tserver::DbServerBase {
explicit Master(const MasterOptions& opts);
virtual ~Master();

virtual Status InitAutoFlags() override;
virtual Status InitAutoFlags(rpc::Messenger* messenger) override;
Status InitAutoFlagsFromMasterLeader(const HostPort& leader_address);
Status Init() override;
Status Start() override;
Expand Down
9 changes: 4 additions & 5 deletions src/yb/master/master_auto_flags_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,8 @@ MasterAutoFlagsManager::MasterAutoFlagsManager(
update_lock_(mutex_, std::defer_lock) {}

Status MasterAutoFlagsManager::Init(
const std::string& local_hosts, std::function<bool()> has_sys_catalog_func,
bool is_shell_mode) {
RETURN_NOT_OK(AutoFlagsManagerBase::Init(local_hosts));
rpc::Messenger* messenger, std::function<bool()> has_sys_catalog_func, bool is_shell_mode) {
RETURN_NOT_OK(AutoFlagsManagerBase::Init(messenger));

if (VERIFY_RESULT(LoadFromFile())) {
return Status::OK();
Expand All @@ -266,8 +265,8 @@ Status MasterAutoFlagsManager::Init(
}

Status MasterAutoFlagsManager::LoadFromMasterLeader(
const std::string& local_hosts, const server::MasterAddresses& master_addresses) {
return AutoFlagsManagerBase::LoadFromMasterLeader(local_hosts, master_addresses);
const server::MasterAddresses& master_addresses) {
return AutoFlagsManagerBase::LoadFromMasterLeader(master_addresses);
}

Status MasterAutoFlagsManager::LoadNewConfig(const AutoFlagsConfigPB new_config) {
Expand Down
9 changes: 5 additions & 4 deletions src/yb/master/master_auto_flags_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
#pragma once

#include "yb/server/auto_flags_manager_base.h"

#include "yb/master/master_cluster.pb.h"
#include "yb/util/flags/auto_flags_util.h"
#include "yb/util/unique_lock.h"

namespace yb {
namespace master {
Expand Down Expand Up @@ -49,11 +52,9 @@ class MasterAutoFlagsManager : public AutoFlagsManagerBase {
virtual ~MasterAutoFlagsManager() {}

Status Init(
const std::string& local_hosts, std::function<bool()> has_sys_catalog_func,
bool is_shell_mode);
rpc::Messenger* messenger, std::function<bool()> has_sys_catalog_func, bool is_shell_mode);

Status LoadFromMasterLeader(
const std::string& local_hosts, const server::MasterAddresses& master_addresses);
Status LoadFromMasterLeader(const server::MasterAddresses& master_addresses);

Status ProcessAutoFlagsConfigOperation(const AutoFlagsConfigPB new_config) override;

Expand Down
38 changes: 9 additions & 29 deletions src/yb/server/auto_flags_manager_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@

#include "yb/gutil/strings/join.h"

#include "yb/master/master_cluster.pb.h"

#include "yb/rpc/messenger.h"
#include "yb/rpc/secure_stream.h"

#include "yb/rpc/secure.h"

#include "yb/util/flags/auto_flags.h"
#include "yb/util/flags/auto_flags_util.h"
#include "yb/util/net/net_util.h"
#include "yb/util/flags.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -100,25 +96,9 @@ AutoFlagsManagerBase::AutoFlagsManagerBase(
current_config_.set_config_version(kInvalidAutoFlagsConfigVersion);
}

AutoFlagsManagerBase::~AutoFlagsManagerBase() {
if (messenger_) {
messenger_->Shutdown();
}
}

Status AutoFlagsManagerBase::Init(const std::string& local_hosts) {
rpc::MessengerBuilder messenger_builder("auto_flags_client");
secure_context_ = VERIFY_RESULT(rpc::SetupInternalSecureContext(
local_hosts, fs_manager_->GetDefaultRootDir(), &messenger_builder));

messenger_ = VERIFY_RESULT(messenger_builder.Build());

if (PREDICT_FALSE(FLAGS_TEST_running_test)) {
std::vector<HostPort> host_ports;
RETURN_NOT_OK(HostPort::ParseStrings(local_hosts, 0 /* default_port */, &host_ports));
messenger_->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress(host_ports[0].host())));
}

Status AutoFlagsManagerBase::Init(rpc::Messenger* server_messenger) {
SCHECK_NOTNULL(server_messenger);
server_messenger_ = server_messenger;
return Status::OK();
}

Expand Down Expand Up @@ -150,17 +130,16 @@ Result<bool> AutoFlagsManagerBase::LoadFromFile() {

Result<std::optional<AutoFlagsConfigPB>> AutoFlagsManagerBase::GetAutoFlagConfigFromMasterLeader(
const std::string& master_addresses) {
SCHECK_NOTNULL(server_messenger_);
auto client = VERIFY_RESULT(yb::client::YBClientBuilder()
.add_master_server_addr(master_addresses)
.default_admin_operation_timeout(MonoDelta::FromSeconds(
FLAGS_yb_client_admin_operation_timeout_sec))
.Build(messenger_.get()));

.Build(server_messenger_));
return client->GetAutoFlagConfig();
}

Status AutoFlagsManagerBase::LoadFromMasterLeader(
const std::string& local_hosts, const server::MasterAddresses& master_addresses) {
Status AutoFlagsManagerBase::LoadFromMasterLeader(const server::MasterAddresses& master_addresses) {
if (FLAGS_disable_auto_flags_management) {
LOG(WARNING) << "AutoFlags management is disabled.";
return Status::OK();
Expand Down Expand Up @@ -258,7 +237,8 @@ Status AutoFlagsManagerBase::LoadFromConfigUnlocked(
const auto delay = VERIFY_RESULT(GetTimeLeftToApplyConfig());
if (delay) {
LOG(INFO) << "New AutoFlags config will be applied in " << delay;
RETURN_NOT_OK(messenger_->ScheduleOnReactor(
SCHECK_NOTNULL(server_messenger_);
RETURN_NOT_OK(server_messenger_->ScheduleOnReactor(
std::bind(
&AutoFlagsManagerBase::AsyncApplyConfig, this, current_config_.config_version(),
apply_non_runtime),
Expand Down
17 changes: 5 additions & 12 deletions src/yb/server/auto_flags_manager_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@

#include <shared_mutex>

#include "yb/common/hybrid_time.h"
#include "yb/common/wire_protocol.pb.h"
#include "yb/server/server_base_options.h"
#include "yb/util/locks.h"
#include "yb/util/status.h"
#include "yb/util/flags/auto_flags_util.h"
#include "yb/util/unique_lock.h"
#include "yb/util/status_fwd.h"

namespace yb {

Expand Down Expand Up @@ -55,21 +51,19 @@ class AutoFlagsManagerBase {
explicit AutoFlagsManagerBase(
const std::string& process_name, const scoped_refptr<ClockBase>& clock,
FsManager* fs_manager);
virtual ~AutoFlagsManagerBase();
virtual ~AutoFlagsManagerBase() = default;

MonoDelta GetApplyDelay() const;

Status Init(const std::string& local_hosts);
Status Init(rpc::Messenger* server_messenger);

// Returns true if the load was successful, false if the file was not found.
// Returns true without doing any work if AutoFlags management is disabled.
Result<bool> LoadFromFile() EXCLUDES(mutex_);

// local_hosts is a comma separated list of ip addresses and ports.
// Returns Status::OK without doing any work if AutoFlags management is disabled.
Status LoadFromMasterLeader(
const std::string& local_hosts, const server::MasterAddresses& master_addresses)
EXCLUDES(mutex_);
Status LoadFromMasterLeader(const server::MasterAddresses& master_addresses) EXCLUDES(mutex_);

Status LoadFromConfigUnlocked(
const AutoFlagsConfigPB new_config, ApplyNonRuntimeAutoFlags apply_non_runtime,
Expand Down Expand Up @@ -105,8 +99,7 @@ class AutoFlagsManagerBase {

const std::string process_name_;

std::unique_ptr<rpc::SecureContext> secure_context_;
std::unique_ptr<rpc::Messenger> messenger_;
rpc::Messenger* server_messenger_ = nullptr;

DISALLOW_COPY_AND_ASSIGN(AutoFlagsManagerBase);
};
Expand Down
19 changes: 14 additions & 5 deletions src/yb/server/server_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ METRIC_DEFINE_gauge_uint64(server, untracked_memory,
"Untracked memory", yb::MetricUnit::kBytes,
"The amount of memory not tracked by MemTracker");

DECLARE_bool(TEST_running_test);

using namespace std::literals;
using namespace std::placeholders;

Expand Down Expand Up @@ -291,7 +293,7 @@ Status RpcServerBase::SetupMessengerBuilder(rpc::MessengerBuilder* builder) {
return Status::OK();
}

Status RpcServerBase::InitAutoFlags() { return Status::OK(); }
Status RpcServerBase::InitAutoFlags(rpc::Messenger* messenger) { return Status::OK(); }

Status RpcServerBase::Init() {
CHECK(!initialized_);
Expand All @@ -311,15 +313,22 @@ Status RpcServerBase::Init() {
RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
}

RETURN_NOT_OK(InitAutoFlags());

// Create the Messenger.
rpc::MessengerBuilder builder(name_);
builder.UseDefaultConnectionContextFactory(mem_tracker());
RETURN_NOT_OK(SetupMessengerBuilder(&builder));
messenger_ = VERIFY_RESULT(builder.Build());
proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_.get());

if (PREDICT_FALSE(FLAGS_TEST_running_test)) {
std::vector<HostPort> host_ports;
RETURN_NOT_OK(
HostPort::ParseStrings(options_.HostsString(), 0 /* default_port */, &host_ports));
messenger_->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress(host_ports[0].host())));
}

RETURN_NOT_OK(InitAutoFlags(messenger_.get()));

RETURN_NOT_OK(rpc_server_->Init(messenger_.get()));
RETURN_NOT_OK(rpc_server_->Bind());
clock_->RegisterMetrics(metric_entity_);
Expand Down Expand Up @@ -538,7 +547,7 @@ Status RpcAndWebServerBase::Init() {
return Status::OK();
}

Status RpcAndWebServerBase::InitAutoFlags() {
Status RpcAndWebServerBase::InitAutoFlags(rpc::Messenger* messenger) {
auto process_auto_flags_result = GetAvailableAutoFlagsForServer();
if (!process_auto_flags_result) {
LOG(WARNING) << "Unable to get the AutoFlags for this process: "
Expand All @@ -547,7 +556,7 @@ Status RpcAndWebServerBase::InitAutoFlags() {
web_server_->SetAutoFlags(std::move(*process_auto_flags_result));
}

return RpcServerBase::InitAutoFlags();
return RpcServerBase::InitAutoFlags(messenger);
}

void RpcAndWebServerBase::GetStatusPB(ServerStatusPB* status) const {
Expand Down
4 changes: 2 additions & 2 deletions src/yb/server/server_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class RpcServerBase {
const scoped_refptr<Clock>& clock = nullptr);
virtual ~RpcServerBase();

virtual Status InitAutoFlags();
virtual Status InitAutoFlags(rpc::Messenger* messenger);
virtual Status Init();
virtual Status Start();

Expand Down Expand Up @@ -207,7 +207,7 @@ class RpcAndWebServerBase : public RpcServerBase {
const std::string caption, const std::string url);

Status Init() override;
Status InitAutoFlags() override;
Status InitAutoFlags(rpc::Messenger* messenger) override;
Status Start() override;
void Shutdown() override;

Expand Down
6 changes: 3 additions & 3 deletions src/yb/tserver/tablet_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,10 @@ Status TabletServer::Init() {
return Status::OK();
}

Status TabletServer::InitAutoFlags() {
RETURN_NOT_OK(auto_flags_manager_->Init(options_.HostsString(), *opts_.GetMasterAddresses()));
Status TabletServer::InitAutoFlags(rpc::Messenger* messenger) {
RETURN_NOT_OK(auto_flags_manager_->Init(messenger, *opts_.GetMasterAddresses()));

return RpcAndWebServerBase::InitAutoFlags();
return RpcAndWebServerBase::InitAutoFlags(messenger);
}

Result<std::unordered_set<std::string>> TabletServer::GetAvailableAutoFlagsForServer() const {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/tserver/tablet_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TabletServer : public DbServerBase, public TabletServerIf {
// complete by calling WaitInited().
Status Init() override;

virtual Status InitAutoFlags() override;
virtual Status InitAutoFlags(rpc::Messenger* messenger) override;

Status GetRegistration(ServerRegistrationPB* reg,
server::RpcOnly rpc_only = server::RpcOnly::kFalse) const override;
Expand Down
6 changes: 3 additions & 3 deletions src/yb/tserver/tserver_auto_flags_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ TserverAutoFlagsManager::TserverAutoFlagsManager(
: AutoFlagsManagerBase(kYbTserverProcessName, clock, fs_manager) {}

Status TserverAutoFlagsManager::Init(
const std::string& local_hosts, const server::MasterAddresses& master_addresses) {
RETURN_NOT_OK(AutoFlagsManagerBase::Init(local_hosts));
rpc::Messenger* messenger, const server::MasterAddresses& master_addresses) {
RETURN_NOT_OK(AutoFlagsManagerBase::Init(messenger));

if (VERIFY_RESULT(LoadFromFile())) {
return Status::OK();
}

return LoadFromMasterLeader(local_hosts, master_addresses);
return LoadFromMasterLeader(master_addresses);
}

Status TserverAutoFlagsManager::ProcessAutoFlagsConfigOperation(
Expand Down
4 changes: 3 additions & 1 deletion src/yb/tserver/tserver_auto_flags_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#include "yb/server/auto_flags_manager_base.h"

#include "yb/common/hybrid_time.h"

namespace yb {
namespace tserver {

Expand All @@ -31,7 +33,7 @@ class TserverAutoFlagsManager : public AutoFlagsManagerBase {

virtual ~TserverAutoFlagsManager() {}

Status Init(const std::string& local_hosts, const server::MasterAddresses& master_addresses);
Status Init(rpc::Messenger* server_messenger, const server::MasterAddresses& master_addresses);

Status ProcessAutoFlagsConfigOperation(const AutoFlagsConfigPB new_config) override;

Expand Down

0 comments on commit e6bb62a

Please sign in to comment.