Skip to content

Commit

Permalink
[#21338] DocDB: Move secure.cc and placement flags tp rpc library
Browse files Browse the repository at this point in the history
Summary:
Move `secure.cc` from `server_process` to `rpc` library since this file is a collection of utilities for `rpc::SecureContext`.
Mark the gflags in this file as NON_RUNTIME as updating them does not modify pre created clients.
Move `placement_cloud`, `placement_region` and `placement_zone` from `server_base_options.cc` to `common_flags.cc` since these flags are used by client `meta_cache` and `transaction_manager`.
Remove the usage of `FsManager` in the `client` library.

Fixes #21338
Jira: DB-10239

Test Plan: Jenkins

Reviewers: mbautin, xCluster

Reviewed By: mbautin

Subscribers: esheng, yql, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D32895
  • Loading branch information
hari90 committed Mar 7, 2024
1 parent 102a2a5 commit fd3cfae
Show file tree
Hide file tree
Showing 31 changed files with 220 additions and 250 deletions.
4 changes: 2 additions & 2 deletions src/yb/client/client_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "yb/rpc/messenger.h"
#include "yb/rpc/rpc.h"

#include "yb/server/secure.h"
#include "yb/rpc/secure.h"

#include "yb/util/atomic.h"
#include "yb/util/locks.h"
Expand Down Expand Up @@ -65,7 +65,7 @@ Result<std::unique_ptr<rpc::Messenger>> CreateClientMessenger(
builder.set_metric_entity(metric_entity);
builder.UseDefaultConnectionContextFactory(parent_mem_tracker);
if (secure_context) {
server::ApplySecureContext(secure_context, &builder);
rpc::ApplySecureContext(secure_context, &builder);
}
auto messenger = VERIFY_RESULT(builder.Build());
if (PREDICT_FALSE(FLAGS_TEST_running_test)) {
Expand Down
12 changes: 7 additions & 5 deletions src/yb/client/stateful_services/stateful_service_client_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
#include <chrono>

#include "yb/master/master_client.proxy.h"
#include "yb/tserver/tablet_server.h"
#include "yb/rpc/messenger.h"
#include "yb/rpc/secure.h"
#include "yb/rpc/secure_stream.h"
#include "yb/tserver/tablet_server.h"
#include "yb/client/client-internal.h"
#include "yb/util/backoff_waiter.h"
#include "yb/util/status_format.h"
Expand Down Expand Up @@ -53,8 +55,8 @@ Status StatefulServiceClientBase::Init(tserver::TabletServer* server) {

std::lock_guard lock(mutex_);
rpc::MessengerBuilder messenger_builder(service_name_ + "_Client");
secure_context_ = VERIFY_RESULT(
server::SetupInternalSecureContext(local_hosts, *server->fs_manager(), &messenger_builder));
secure_context_ = VERIFY_RESULT(rpc::SetupInternalSecureContext(
local_hosts, server->fs_manager()->GetDefaultRootDir(), &messenger_builder));

messenger_ = VERIFY_RESULT(messenger_builder.Build());

Expand All @@ -79,8 +81,8 @@ Status StatefulServiceClientBase::TESTInit(
const std::string& local_host, const std::string& master_addresses) {
std::lock_guard lock(mutex_);
rpc::MessengerBuilder messenger_builder(service_name_ + "Client");
secure_context_ = VERIFY_RESULT(server::SetupSecureContext(
FLAGS_certs_dir, local_host, server::SecureContextType::kInternal, &messenger_builder));
secure_context_ = VERIFY_RESULT(rpc::SetupSecureContext(
FLAGS_certs_dir, local_host, rpc::SecureContextType::kInternal, &messenger_builder));

messenger_ = VERIFY_RESULT(messenger_builder.Build());

Expand Down
17 changes: 9 additions & 8 deletions src/yb/client/stateful_services/stateful_service_client_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@
#pragma once

#include <memory>
#include "yb/client/client.h"
#include "yb/rpc/messenger.h"
#include "yb/server/secure.h"
#include "yb/util/net/net_util.h"
#include "yb/util/status.h"
#include "yb/rpc/proxy.h"
#include "yb/rpc/rpc_context.h"
#include "yb/gutil/thread_annotations.h"

#include "yb/common/wire_protocol.h"
#include "yb/common/wire_protocol.pb.h"
#include "yb/gutil/thread_annotations.h"
#include "yb/rpc/rpc_fwd.h"
#include "yb/util/status.h"

using namespace std::placeholders;

namespace yb {

class HostPort;

namespace tserver {
class TabletServer;
}

namespace client {
class YBClient;

#define STATEFUL_SERVICE_RPC(r, service, method_name) \
template <typename T> \
Expand Down
21 changes: 19 additions & 2 deletions src/yb/client/transaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ DEFINE_test_flag(string, transaction_manager_preferred_tablet, "",
"For testing only. If non-empty, transaction manager will try to use the status "
"tablet with id matching this flag, if present in the list of status tablets.");

DECLARE_string(placement_cloud);
DECLARE_string(placement_region);
DECLARE_string(placement_zone);

namespace yb {
namespace client {

Expand Down Expand Up @@ -159,6 +163,20 @@ class TransactionTableState {
TransactionStatusTablets tablets_ GUARDED_BY(mutex_);
};

const CloudInfoPB& GetPlacementFromGFlags() {
static GoogleOnceType once = GOOGLE_ONCE_INIT;
static CloudInfoPB cloud_info;
auto set_placement_from_gflags = [](CloudInfoPB* cloud_info) {
cloud_info->set_placement_cloud(FLAGS_placement_cloud);
cloud_info->set_placement_region(FLAGS_placement_region);
cloud_info->set_placement_zone(FLAGS_placement_zone);
};
GoogleOnceInitArg(
&once, static_cast<void (*)(CloudInfoPB*)>(set_placement_from_gflags), &cloud_info);

return cloud_info;
}

// Loads transaction tablets list to cache.
class LoadStatusTabletsTask {
public:
Expand Down Expand Up @@ -200,8 +218,7 @@ class LoadStatusTabletsTask {

private:
Result<TransactionStatusTablets> GetTransactionStatusTablets() {
CloudInfoPB this_pb = yb::server::GetPlacementFromGFlags();
return client_->GetTransactionStatusTablets(this_pb);
return client_->GetTransactionStatusTablets(GetPlacementFromGFlags());
}

YBClient* client_;
Expand Down
15 changes: 6 additions & 9 deletions src/yb/client/universe_key_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

#include "yb/encryption/encryption.pb.h"

#include "yb/fs/fs_manager.h"

#include "yb/master/master_encryption.proxy.h"

#include "yb/rpc/messenger.h"
#include "yb/rpc/rpc_controller.h"
#include "yb/rpc/secure_stream.h"

#include "yb/server/secure.h"
#include "yb/rpc/secure.h"

#include "yb/util/backoff_waiter.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -84,13 +82,12 @@ void UniverseKeyClient::ProcessGetUniverseKeyRegistryResponse(
callback_(resp->universe_keys());
}

Result<encryption::UniverseKeyRegistryPB>UniverseKeyClient::GetFullUniverseKeyRegistry(
const std::string& local_hosts,
const std::string& master_addresses,
const FsManager& fs_manager) {
Result<encryption::UniverseKeyRegistryPB> UniverseKeyClient::GetFullUniverseKeyRegistry(
const std::string& local_hosts, const std::string& master_addresses,
const std::string& root_dir) {
rpc::MessengerBuilder messenger_builder("universe_key_client");
auto secure_context = VERIFY_RESULT(
server::SetupInternalSecureContext(local_hosts, fs_manager, &messenger_builder));
auto secure_context =
VERIFY_RESULT(rpc::SetupInternalSecureContext(local_hosts, root_dir, &messenger_builder));
auto messenger = VERIFY_RESULT(messenger_builder.Build());
auto se = ScopeExit([&] {
if (messenger) {
Expand Down
8 changes: 3 additions & 5 deletions src/yb/client/universe_key_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

namespace yb {

class FsManager;

namespace client {

class UniverseKeyClient {
Expand All @@ -47,9 +45,9 @@ class UniverseKeyClient {
// Synchronous call from tserver Init to get the full universe key registry from master
// leader.
static Result<encryption::UniverseKeyRegistryPB> GetFullUniverseKeyRegistry(
const std::string& local_hosts,
const std::string& master_addresses,
const FsManager& fs_manager);
const std::string& local_hosts, const std::string& master_addresses,
const std::string& root_dir);

private:

void ProcessGetUniverseKeyRegistryResponse(
Expand Down
6 changes: 3 additions & 3 deletions src/yb/client/xcluster_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "yb/rpc/messenger.h"
#include "yb/rpc/proxy.h"
#include "yb/rpc/secure_stream.h"
#include "yb/server/secure.h"
#include "yb/rpc/secure.h"
#include "yb/util/path_util.h"

DECLARE_bool(use_node_to_node_encryption);
Expand Down Expand Up @@ -48,8 +48,8 @@ Status XClusterRemoteClient::Init(
if (!certs_for_cdc_dir_.empty()) {
certs_dir = JoinPathSegments(certs_for_cdc_dir_, replication_group_id.ToString());
}
secure_context_ = VERIFY_RESULT(server::SetupSecureContext(
certs_dir, /*root_dir=*/"", /*name=*/"", server::SecureContextType::kInternal,
secure_context_ = VERIFY_RESULT(rpc::SetupSecureContext(
certs_dir, /*root_dir=*/"", /*name=*/"", rpc::SecureContextType::kInternal,
&messenger_builder));
}
messenger_ = VERIFY_RESULT(messenger_builder.Build());
Expand Down
15 changes: 15 additions & 0 deletions src/yb/common/common_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ DEFINE_RUNTIME_PG_FLAG(bool, TEST_enable_replication_slot_consumption, false,
TAG_FLAG(ysql_TEST_enable_replication_slot_consumption, unsafe);
TAG_FLAG(ysql_TEST_enable_replication_slot_consumption, hidden);

// The following flags related to the cloud, region and availability zone that an instance is
// started in. These are passed in from whatever provisioning mechanics start the servers. They
// are used for generic placement policies on table creation and tablet load balancing, to
// either constrain data to a certain location (table A should only live in aws.us-west2.a), or to
// define the required level of fault tolerance expected (table B should have N replicas, across
// two regions of AWS and one of GCE).
//
// These are currently for use in a cloud-based deployment, but could be retrofitted to work for
// an on-premise deployment as well, with datacenter, cluster and rack levels, for example.
DEFINE_NON_RUNTIME_string(placement_cloud, "cloud1",
"The cloud in which this instance is started.");
DEFINE_NON_RUNTIME_string(placement_region, "datacenter1",
"The cloud region in which this instance is started.");
DEFINE_NON_RUNTIME_string(placement_zone, "rack1",
"The cloud availability zone in which this instance is started.");
namespace {

constexpr const auto kMinRpcThrottleThresholdBytes = 16;
Expand Down
5 changes: 0 additions & 5 deletions src/yb/fs/fs_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ const char kConsensusMetadataDirName[] = "consensus-meta";
const char kLogsDirName[] = "logs";
const char kTmpInfix[] = ".tmp";
const char kCheckFileTemplate[] = "check.XXXXXX";
const char kSecureCertsDirName[] = "certs";
const char kPrefixMetricId[] = "drive:";

std::string DataDir(const std::string& root, const std::string& server_type) {
Expand Down Expand Up @@ -836,10 +835,6 @@ std::string FsManager::GetDefaultRootDir() const {
return GetServerTypeDataPath(canonicalized_default_fs_root_, server_type_);
}

std::string FsManager::GetCertsDir(const std::string& root_dir) {
return JoinPathSegments(root_dir, kSecureCertsDirName);
}

std::vector<std::string> FsManager::GetConsensusMetadataDirs() const {
DCHECK(initted_);
vector<string> data_paths;
Expand Down
1 change: 0 additions & 1 deletion src/yb/fs/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ class FsManager {

// Return the directory where the certs are stored.
std::string GetDefaultRootDir() const;
static std::string GetCertsDir(const std::string& root_dir);

std::vector<std::string> GetConsensusMetadataDirs() const;
// Return the directory where the consensus metadata is stored.
Expand Down
6 changes: 3 additions & 3 deletions src/yb/integration-tests/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

#include "yb/server/server_base.pb.h"
#include "yb/server/server_base.proxy.h"
#include "yb/server/secure.h"
#include "yb/rpc/secure.h"

#include "yb/tserver/tserver_admin.proxy.h"
#include "yb/tserver/tserver_service.proxy.h"
Expand Down Expand Up @@ -3023,8 +3023,8 @@ void StartSecure(
std::unique_ptr<rpc::Messenger>* messenger,
const std::vector<std::string>& master_flags) {
rpc::MessengerBuilder messenger_builder("test_client");
*secure_context = ASSERT_RESULT(server::SetupSecureContext(
"", "127.0.0.100", server::SecureContextType::kInternal, &messenger_builder));
*secure_context = ASSERT_RESULT(rpc::SetupSecureContext(
/*root_dir=*/"", "127.0.0.100", rpc::SecureContextType::kInternal, &messenger_builder));
*messenger = ASSERT_RESULT(messenger_builder.Build());
(**messenger).TEST_SetOutboundIpBase(ASSERT_RESULT(HostToAddress("127.0.0.1")));

Expand Down
6 changes: 3 additions & 3 deletions src/yb/integration-tests/mini_cluster_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include "yb/client/stateful_services/stateful_service_client_base.h"
#include "yb/server/hybrid_clock.h"
#include "yb/server/secure.h"
#include "yb/rpc/secure.h"
#include "yb/util/net/net_util.h"
#include "yb/util/result.h"
#include "yb/rpc/messenger.h"
Expand All @@ -32,8 +32,8 @@ Result<std::unique_ptr<client::YBClient>> CreateSecureClientInternal(
const std::string& name, const std::string& host,
std::unique_ptr<rpc::SecureContext>* secure_context, client::YBClientBuilder* builder) {
rpc::MessengerBuilder messenger_builder("test_client");
*secure_context = VERIFY_RESULT(server::SetupSecureContext(
FLAGS_certs_dir, name, server::SecureContextType::kInternal, &messenger_builder));
*secure_context = VERIFY_RESULT(rpc::SetupSecureContext(
FLAGS_certs_dir, name, rpc::SecureContextType::kInternal, &messenger_builder));
auto messenger = VERIFY_RESULT(messenger_builder.Build());
messenger->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress(host)));
auto clock = make_scoped_refptr<server::HybridClock>();
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/secure_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "yb/rpc/messenger.h"
#include "yb/rpc/secure_stream.h"

#include "yb/server/secure.h"
#include "yb/rpc/secure.h"

#include "yb/util/size_literals.h"
#include "yb/util/env_util.h"
Expand Down
15 changes: 6 additions & 9 deletions src/yb/master/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
#include "yb/rpc/yb_rpc.h"

#include "yb/server/rpc_server.h"
#include "yb/server/secure.h"
#include "yb/rpc/secure.h"
#include "yb/server/hybrid_clock.h"


Expand Down Expand Up @@ -103,7 +103,6 @@ DEFINE_NON_RUNTIME_int32(master_backup_svc_queue_length, 50,
"RPC queue length for master backup service");
TAG_FLAG(master_backup_svc_queue_length, advanced);

DECLARE_string(cert_node_filename);
DECLARE_bool(master_join_existing_universe);

METRIC_DEFINE_entity(cluster);
Expand Down Expand Up @@ -685,8 +684,8 @@ Status Master::get_ysql_db_oid_to_cat_version_info_map(
Status Master::SetupMessengerBuilder(rpc::MessengerBuilder* builder) {
RETURN_NOT_OK(DbServerBase::SetupMessengerBuilder(builder));

secure_context_ = VERIFY_RESULT(
server::SetupInternalSecureContext(options_.HostsString(), *fs_manager_, builder));
secure_context_ = VERIFY_RESULT(rpc::SetupInternalSecureContext(
options_.HostsString(), fs_manager_->GetDefaultRootDir(), builder));

return Status::OK();
}
Expand All @@ -696,11 +695,9 @@ Status Master::ReloadKeysAndCertificates() {
return Status::OK();
}

return server::ReloadSecureContextKeysAndCertificates(
secure_context_.get(),
fs_manager_->GetDefaultRootDir(),
server::SecureContextType::kInternal,
options_.HostsString());
return rpc::ReloadSecureContextKeysAndCertificates(
secure_context_.get(), fs_manager_->GetDefaultRootDir(), rpc::SecureContextType::kInternal,
options_.HostsString());
}

std::string Master::GetCertificateDetails() {
Expand Down
6 changes: 3 additions & 3 deletions src/yb/master/xcluster_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "yb/rpc/messenger.h"
#include "yb/rpc/secure_stream.h"

#include "yb/server/secure.h"
#include "yb/rpc/secure.h"

#include "yb/util/logging.h"
#include "yb/util/path_util.h"
Expand Down Expand Up @@ -75,8 +75,8 @@ Result<std::shared_ptr<XClusterRpcTasks>> XClusterRpcTasks::CreateWithMasterAddr
FLAGS_certs_for_cdc_dir,
xcluster::GetOriginalReplicationGroupId(replication_group_id).ToString());
}
xcluster_rpc_tasks->secure_context_ = VERIFY_RESULT(server::SetupSecureContext(
dir, "", "", server::SecureContextType::kInternal, &messenger_builder));
xcluster_rpc_tasks->secure_context_ = VERIFY_RESULT(rpc::SetupSecureContext(
dir, /*root_dir=*/"", /*name=*/"", rpc::SecureContextType::kInternal, &messenger_builder));
xcluster_rpc_tasks->messenger_ = VERIFY_RESULT(messenger_builder.Build());
}

Expand Down
1 change: 1 addition & 0 deletions src/yb/rpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ set(YRPC_SRCS
rpc_with_queue.cc
rpc_util.cc
scheduler.cc
secure.cc
secure_stream.cc
serialization.cc
service_if.cc
Expand Down
1 change: 1 addition & 0 deletions src/yb/rpc/rpc_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class LightweightMessage;
class MessengerBuilder;
class PeriodicTimer;
class Proxy;
class ProxyBase;
class ProxyCache;
class ProxyContext;
class Reactor;
Expand Down
Loading

0 comments on commit fd3cfae

Please sign in to comment.