From 059b8555bdf8c5414802da6ca67809fc3cf9dbd9 Mon Sep 17 00:00:00 2001 From: Hari Krishna Sunder Date: Thu, 27 Jun 2024 11:59:47 -0700 Subject: [PATCH] [#22908] xCluster: Use XClusterRemoteClient across XCluster Summary: Use `XClusterRemoteClient` in `XClsuterConsumer`. This helps unify the client usage in xCluster, and move all client creation logic out of `XClusterConsumer`. - Renamed `XClusterRemoteClient` to `XClusterRemoteClientHolder`. - Moved Client functions `SetupDbScopedUniverseReplication`, `IsSetupUniverseReplicationDone`, and `AddNamespaceToDbScopedUniverseReplication` from `XClusterRemoteClient` to `XClusterClient`. - Moved `xcluster_util` from `/src/cdc` to `/src/common` so that it can be used in the `client` library. - Moved `MockXClusterRemoteClientHolder` and `MockXClusterClient` to `xcluster_client_mock`. - Moved flags `certs_for_cdc_dir` and `cdc_read_rpc_timeout_ms` to `common_flags`. Fixes #22908 Jira: DB-11816 Test Plan: Jenkins Reviewers: jhe, xCluster Reviewed By: jhe Subscribers: esheng, ycdcxcluster, xCluster, ybase Differential Revision: https://phorge.dev.yugabyte.com/D35908 --- src/yb/cdc/CMakeLists.txt | 1 - src/yb/cdc/cdc_service.cc | 10 +- src/yb/client/CMakeLists.txt | 15 +- src/yb/client/client.cc | 2 +- src/yb/client/client.h | 16 +- src/yb/client/meta_cache.cc | 2 +- src/yb/client/meta_cache.h | 4 +- src/yb/client/xcluster_client.cc | 213 ++++++++++-------- src/yb/client/xcluster_client.h | 75 +++--- src/yb/client/xcluster_client_mock.cc | 34 +++ src/yb/client/xcluster_client_mock.h | 62 +++++ src/yb/common/CMakeLists.txt | 1 + src/yb/common/common_flags.cc | 10 + src/yb/{cdc => common}/xcluster_util.cc | 4 +- src/yb/{cdc => common}/xcluster_util.h | 0 .../xcluster/xcluster-test.cc | 45 ++-- .../xcluster/xcluster_db_scoped-test.cc | 2 +- .../xcluster/xcluster_test_base.cc | 2 +- src/yb/master/CMakeLists.txt | 1 + src/yb/master/master-path-handlers.cc | 2 +- .../add_table_to_xcluster_target_task.cc | 7 +- .../add_table_to_xcluster_target_task.h | 4 +- ...cluster_outbound_replication_group-test.cc | 62 ++--- .../xcluster_outbound_replication_group.cc | 35 ++- .../xcluster_outbound_replication_group.h | 4 +- .../xcluster/xcluster_replication_group.cc | 12 +- .../xcluster/xcluster_replication_group.h | 4 +- .../xcluster/xcluster_source_manager.cc | 1 + src/yb/master/xcluster_rpc_tasks.cc | 2 +- src/yb/master/xrepl_catalog_manager.cc | 2 +- src/yb/tools/yb-admin_cli.cc | 2 +- src/yb/tools/yb-admin_client.cc | 2 +- src/yb/tserver/tablet_server.cc | 8 +- src/yb/tserver/xcluster_consumer.cc | 126 +++-------- src/yb/tserver/xcluster_consumer.h | 30 +-- src/yb/tserver/xcluster_consumer_if.h | 2 +- src/yb/tserver/xcluster_poller.cc | 11 +- src/yb/tserver/xcluster_poller.h | 12 +- 38 files changed, 446 insertions(+), 381 deletions(-) create mode 100644 src/yb/client/xcluster_client_mock.cc create mode 100644 src/yb/client/xcluster_client_mock.h rename src/yb/{cdc => common}/xcluster_util.cc (97%) rename src/yb/{cdc => common}/xcluster_util.h (100%) diff --git a/src/yb/cdc/CMakeLists.txt b/src/yb/cdc/CMakeLists.txt index b8e5e89474f5..c762a1ebbcea 100644 --- a/src/yb/cdc/CMakeLists.txt +++ b/src/yb/cdc/CMakeLists.txt @@ -89,7 +89,6 @@ ADD_YB_LIBRARY(xcluster_producer_proto set(CDC_UTIL_SRCS cdc_util.cc xcluster_types.cc - xcluster_util.cc ) ADD_YB_LIBRARY( diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index 23941b49d146..9be085ce3712 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -96,9 +96,7 @@ using std::vector; constexpr uint32_t kUpdateIntervalMs = 15 * 1000; -DEFINE_NON_RUNTIME_int32(cdc_read_rpc_timeout_ms, 30 * 1000, - "Timeout used for CDC read rpc calls. Reads normally occur cross-cluster."); -TAG_FLAG(cdc_read_rpc_timeout_ms, advanced); +DECLARE_int32(cdc_read_rpc_timeout_ms); DEFINE_NON_RUNTIME_int32(cdc_write_rpc_timeout_ms, 30 * 1000, "Timeout used for CDC write rpc calls. Writes normally occur intra-cluster."); @@ -109,12 +107,6 @@ DEPRECATE_FLAG(int32, cdc_ybclient_reactor_threads, "09_2023"); DEFINE_RUNTIME_int32(cdc_state_checkpoint_update_interval_ms, kUpdateIntervalMs, "Rate at which CDC state's checkpoint is updated."); -DEFINE_NON_RUNTIME_string(certs_for_cdc_dir, "", - "The parent directory of where all certificates for xCluster producer universes will " - "be stored, for when the producer and consumer clusters use different certificates. " - "Place the certificates for each producer cluster in " - "//*."); - DEFINE_RUNTIME_int32(update_min_cdc_indices_interval_secs, 60, "How often to read cdc_state table to get the minimum applied index for each tablet " "across all streams. This information is used to correctly keep log files that " diff --git a/src/yb/client/CMakeLists.txt b/src/yb/client/CMakeLists.txt index 04b32b439fc7..b306a8337a56 100644 --- a/src/yb/client/CMakeLists.txt +++ b/src/yb/client/CMakeLists.txt @@ -77,12 +77,12 @@ set(CLIENT_LIBS tserver_proto tserver_service_proto pg_auto_analyze_service_proto + gutil test_echo_service_proto tserver_util yb_ql_expr yb_ash yb_util - gutil yrpc yb_dockv) @@ -96,6 +96,19 @@ ADD_YB_LIBRARY(yb_client SRCS ${CLIENT_SRCS} DEPS ${CLIENT_LIBS}) +set(CLIENT_MOCK_SRCS + xcluster_client_mock.cc +) + +set(CLIENT_MOCK_LIBS + yb_client + gmock + gtest) + +ADD_YB_LIBRARY(yb_client_mock + SRCS ${CLIENT_MOCK_SRCS} + DEPS ${CLIENT_MOCK_LIBS}) + if(NOT APPLE) # Localize thirdparty symbols using a linker version script. This hides them # from the client application. The OS X linker does not support the diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 1edaf1d74028..39811a5bebce 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -2346,7 +2346,7 @@ std::pair YBClient::NextRequestIdAndMinR return std::make_pair(id, *requests.running_requests.begin()); } -void YBClient::AddMetaCacheInfo(JsonWriter* writer) { +void YBClient::AddMetaCacheInfo(JsonWriter* writer) const { data_->meta_cache_->AddAllTabletInfo(writer); } diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 3e01b3576d83..d3585cb7d353 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -313,7 +313,7 @@ class YBClientBuilder { // This class is thread-safe. class YBClient { public: - ~YBClient(); + virtual ~YBClient(); std::unique_ptr NewTableCreator(); @@ -1007,7 +1007,7 @@ class YBClient { std::pair NextRequestIdAndMinRunningRequestId(); - void AddMetaCacheInfo(JsonWriter* writer); + void AddMetaCacheInfo(JsonWriter* writer) const; void RequestsFinished(const RetryableRequestIdRange& request_id_range); @@ -1032,6 +1032,7 @@ class YBClient { private: class Data; + friend class MockYBClient; friend class YBClientBuilder; friend class YBNoOp; friend class YBTable; @@ -1051,7 +1052,7 @@ class YBClient { friend class internal::ClientMasterRpcBase; friend class PlacementInfoTest; friend class XClusterClient; - friend class XClusterRemoteClient; + friend class XClusterRemoteClientHolder; FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist); FRIEND_TEST(ClientTest, TestMasterDown); @@ -1095,6 +1096,15 @@ class YBClient { DISALLOW_COPY_AND_ASSIGN(YBClient); }; +// A mock YBClient that can be used for testing. +// Currently it only allows us to create a MockYBClient object , and does not mock any member +// functions. +class MockYBClient : public YBClient { + public: + MockYBClient() = default; + virtual ~MockYBClient() = default; +}; + Result GetTableId(YBClient* client, const YBTableName& table_name); } // namespace client diff --git a/src/yb/client/meta_cache.cc b/src/yb/client/meta_cache.cc index 808a8f613e02..d6389807a1b9 100644 --- a/src/yb/client/meta_cache.cc +++ b/src/yb/client/meta_cache.cc @@ -1370,7 +1370,7 @@ void MetaCache::InvalidateTableCache(const YBTable& table) { } } -void MetaCache::AddAllTabletInfo(JsonWriter* writer) { +void MetaCache::AddAllTabletInfo(JsonWriter* writer) const { SharedLock lock(mutex_); writer->StartObject(); writer->String("tablets"); diff --git a/src/yb/client/meta_cache.h b/src/yb/client/meta_cache.h index 8e6246176a38..23ef9373a4fb 100644 --- a/src/yb/client/meta_cache.h +++ b/src/yb/client/meta_cache.h @@ -621,7 +621,7 @@ class MetaCache : public RefCountedThreadSafe { void InvalidateTableCache(const YBTable& table); - void AddAllTabletInfo(JsonWriter* writer); + void AddAllTabletInfo(JsonWriter* writer) const; const std::string& LogPrefix() const { return log_prefix_; } @@ -736,7 +736,7 @@ class MetaCache : public RefCountedThreadSafe { YBClient* const client_; - std::shared_timed_mutex mutex_; + mutable std::shared_timed_mutex mutex_; // Cache of Tablet Server locations: TS UUID -> RemoteTabletServer*. // diff --git a/src/yb/client/xcluster_client.cc b/src/yb/client/xcluster_client.cc index df359516d597..c7d7759f29b0 100644 --- a/src/yb/client/xcluster_client.cc +++ b/src/yb/client/xcluster_client.cc @@ -14,6 +14,7 @@ #include "yb/client/xcluster_client.h" #include "yb/cdc/cdc_service.pb.h" +#include "yb/common/xcluster_util.h" #include "yb/client/client.h" #include "yb/client/client-internal.h" #include "yb/master/master_defaults.h" @@ -26,6 +27,8 @@ #include "yb/util/path_util.h" DECLARE_bool(use_node_to_node_encryption); +DECLARE_string(certs_for_cdc_dir); +DECLARE_int32(cdc_read_rpc_timeout_ms); #define CALL_SYNC_LEADER_MASTER_RPC(method, req) \ VERIFY_RESULT(SyncLeaderMasterRpc( \ @@ -33,6 +36,113 @@ DECLARE_bool(use_node_to_node_encryption); BOOST_PP_STRINGIZE(method), &master::MasterReplicationProxy::BOOST_PP_CAT(method, Async))) namespace yb::client { + +// XClusterRemoteClientHolder + +XClusterRemoteClientHolder::XClusterRemoteClientHolder( + const xcluster::ReplicationGroupId& replication_group_id) + : replication_group_id_(xcluster::GetOriginalReplicationGroupId(replication_group_id)) {} + +XClusterRemoteClientHolder::~XClusterRemoteClientHolder() { Shutdown(); } + +void XClusterRemoteClientHolder::Shutdown() { + if (yb_client_) { + yb_client_->Shutdown(); + } + if (messenger_) { + messenger_->Shutdown(); + } +} + +Status XClusterRemoteClientHolder::Init(const std::vector& remote_masters) { + SCHECK(!remote_masters.empty(), InvalidArgument, "No master addresses provided"); + const auto master_addrs = HostPort::ToCommaSeparatedString(remote_masters); + + rpc::MessengerBuilder messenger_builder("xcluster-remote"); + std::string certs_dir; + + if (FLAGS_use_node_to_node_encryption) { + if (!FLAGS_certs_for_cdc_dir.empty()) { + certs_dir = JoinPathSegments(FLAGS_certs_for_cdc_dir, replication_group_id_.ToString()); + } + secure_context_ = VERIFY_RESULT(rpc::SetupSecureContext( + certs_dir, /*root_dir=*/"", /*name=*/"", rpc::SecureContextType::kInternal, + &messenger_builder)); + } + messenger_ = VERIFY_RESULT(messenger_builder.Build()); + + yb_client_ = VERIFY_RESULT(YBClientBuilder() + .set_client_name(kClientName) + .add_master_server_addr(master_addrs) + .default_admin_operation_timeout( + MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms)) + .Build(messenger_.get())); + xcluster_client_ = std::make_unique(*yb_client_); + + return Status::OK(); +} + +Result> XClusterRemoteClientHolder::Create( + const xcluster::ReplicationGroupId& replication_group_id, + const std::vector& remote_masters) { + auto client = std::shared_ptr( + new XClusterRemoteClientHolder(replication_group_id)); + RETURN_NOT_OK(client->Init(remote_masters)); + return client; +} + +Status XClusterRemoteClientHolder::SetMasterAddresses(const std::vector& remote_masters) { + return yb_client_->SetMasterAddresses(HostPort::ToCommaSeparatedString(remote_masters)); +} + +Status XClusterRemoteClientHolder::ReloadCertificates() { + if (!secure_context_) { + return Status::OK(); + } + + std::string cert_dir; + if (!FLAGS_certs_for_cdc_dir.empty()) { + cert_dir = JoinPathSegments(FLAGS_certs_for_cdc_dir, replication_group_id_.ToString()); + } + + return rpc::ReloadSecureContextKeysAndCertificates( + secure_context_.get(), cert_dir, "" /* node_name */); +} + +XClusterClient& XClusterRemoteClientHolder::GetXClusterClient() { + CHECK_NOTNULL(xcluster_client_); + return *xcluster_client_.get(); +} + +client::YBClient& XClusterRemoteClientHolder::GetYbClient() { + CHECK_NOTNULL(yb_client_); + return *yb_client_; +} + +google::protobuf::RepeatedPtrField GetXClusterStreamOptions() { + google::protobuf::RepeatedPtrField<::yb::master::CDCStreamOptionsPB> options; + options.Reserve(4); + auto source_type = options.Add(); + source_type->set_key(cdc::kSourceType); + source_type->set_value(CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); + + auto record_type = options.Add(); + record_type->set_key(cdc::kRecordType); + record_type->set_value(CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); + + auto record_format = options.Add(); + record_format->set_key(cdc::kRecordFormat); + record_format->set_value(CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); + + auto checkpoint_type = options.Add(); + checkpoint_type->set_key(cdc::kCheckpointType); + checkpoint_type->set_value(CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT)); + + return options; +} + +// XClusterClient + XClusterClient::XClusterClient(client::YBClient& yb_client) : yb_client_(yb_client) {} CoarseTimePoint XClusterClient::GetDeadline() const { @@ -366,7 +476,7 @@ void XClusterClient::CreateXClusterStreamAsync( const TableId& table_id, bool active, cdc::StreamModeTransactional transactional, CreateCDCStreamCallback callback) { yb_client_.data_->CreateXClusterStream( - &yb_client_, table_id, GetXClusterStreamOptions(), + &yb_client_, table_id, client::GetXClusterStreamOptions(), (active ? master::SysCDCStreamEntryPB::ACTIVE : master::SysCDCStreamEntryPB::INITIATED), transactional, GetDeadline(), std::move(callback)); } @@ -458,61 +568,7 @@ XClusterClient::GetUniverseReplicationInfo( return result; } -// XClusterRemoteClient - -XClusterRemoteClient::XClusterRemoteClient(const std::string& certs_for_cdc_dir, MonoDelta timeout) - : certs_for_cdc_dir_(certs_for_cdc_dir), timeout_(timeout) {} - -XClusterRemoteClient::~XClusterRemoteClient() { - if (messenger_) { - messenger_->Shutdown(); - } -} - -Status XClusterRemoteClient::Init( - const xcluster::ReplicationGroupId& replication_group_id, - const std::vector& remote_masters) { - SCHECK(!remote_masters.empty(), InvalidArgument, "No master addresses provided"); - const auto master_addrs = HostPort::ToCommaSeparatedString(remote_masters); - - rpc::MessengerBuilder messenger_builder("xcluster-remote"); - std::string certs_dir; - - if (FLAGS_use_node_to_node_encryption) { - if (!certs_for_cdc_dir_.empty()) { - certs_dir = JoinPathSegments(certs_for_cdc_dir_, replication_group_id.ToString()); - } - secure_context_ = VERIFY_RESULT(rpc::SetupSecureContext( - certs_dir, /*root_dir=*/"", /*name=*/"", rpc::SecureContextType::kInternal, - &messenger_builder)); - } - messenger_ = VERIFY_RESULT(messenger_builder.Build()); - - yb_client_ = VERIFY_RESULT(YBClientBuilder() - .add_master_server_addr(master_addrs) - .default_admin_operation_timeout(timeout_) - .Build(messenger_.get())); - xcluster_client_ = std::make_unique(*yb_client_); - - return Status::OK(); -} - -XClusterClient* XClusterRemoteClient::GetXClusterClient() { - CHECK_NOTNULL(xcluster_client_); - return xcluster_client_.get(); -} - -template -Result XClusterRemoteClient::SyncLeaderMasterRpc( - const RequestPB& req, const char* method_name, const Method& method) { - ResponsePB resp; - RETURN_NOT_OK(yb_client_->data_->SyncLeaderMasterRpc( - CoarseMonoClock::Now() + yb_client_->default_admin_operation_timeout(), req, &resp, - method_name, method)); - return resp; -} - -Result XClusterRemoteClient::SetupDbScopedUniverseReplication( +Result XClusterClient::SetupDbScopedUniverseReplication( const xcluster::ReplicationGroupId& replication_group_id, const std::vector& source_master_addresses, const std::vector& namespace_names, @@ -559,7 +615,7 @@ Result XClusterRemoteClient::SetupDbScopedUniverseReplication( return UniverseUuid::FromString(resp.universe_uuid()); } -Result XClusterRemoteClient::IsSetupUniverseReplicationDone( +Result XClusterClient::IsSetupUniverseReplicationDone( const xcluster::ReplicationGroupId& replication_group_id) { master::IsSetupUniverseReplicationDoneRequestPB req; req.set_replication_group_id(replication_group_id.ToString()); @@ -610,35 +666,32 @@ GetXClusterStreamsCallback CreateXClusterStreamsCallback(BootstrapProducerCallba }; } -Status XClusterRemoteClient::GetXClusterTableCheckpointInfos( +Status XClusterClient::GetXClusterTableCheckpointInfos( const xcluster::ReplicationGroupId& replication_group_id, const NamespaceId& namespace_id, const std::vector& table_names, const std::vector& pg_schema_names, BootstrapProducerCallback user_callback) { auto callback = CreateXClusterStreamsCallback(user_callback); - RETURN_NOT_OK(XClusterClient(*yb_client_) - .GetXClusterStreams( - CoarseMonoClock::Now() + yb_client_->default_admin_operation_timeout(), - replication_group_id, namespace_id, table_names, pg_schema_names, - std::move(callback))); + RETURN_NOT_OK(GetXClusterStreams( + CoarseMonoClock::Now() + yb_client_.default_admin_operation_timeout(), replication_group_id, + namespace_id, table_names, pg_schema_names, std::move(callback))); return Status::OK(); } -Status XClusterRemoteClient::GetXClusterTableCheckpointInfos( +Status XClusterClient::GetXClusterTableCheckpointInfos( const xcluster::ReplicationGroupId& replication_group_id, const NamespaceId& namespace_id, const std::vector& table_ids, BootstrapProducerCallback user_callback) { auto callback = CreateXClusterStreamsCallback(user_callback); - RETURN_NOT_OK(XClusterClient(*yb_client_) - .GetXClusterStreams( - CoarseMonoClock::Now() + yb_client_->default_admin_operation_timeout(), - replication_group_id, namespace_id, table_ids, std::move(callback))); + RETURN_NOT_OK(GetXClusterStreams( + CoarseMonoClock::Now() + yb_client_.default_admin_operation_timeout(), replication_group_id, + namespace_id, table_ids, std::move(callback))); return Status::OK(); } -Status XClusterRemoteClient::AddNamespaceToDbScopedUniverseReplication( +Status XClusterClient::AddNamespaceToDbScopedUniverseReplication( const xcluster::ReplicationGroupId& replication_group_id, const UniverseUuid& target_universe_uuid, const NamespaceName& namespace_name, const NamespaceId& source_namespace_id, const std::vector& source_table_ids, @@ -674,26 +727,4 @@ Status XClusterRemoteClient::AddNamespaceToDbScopedUniverseReplication( return Status::OK(); } -google::protobuf::RepeatedPtrField GetXClusterStreamOptions() { - google::protobuf::RepeatedPtrField<::yb::master::CDCStreamOptionsPB> options; - options.Reserve(4); - auto source_type = options.Add(); - source_type->set_key(cdc::kSourceType); - source_type->set_value(CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); - - auto record_type = options.Add(); - record_type->set_key(cdc::kRecordType); - record_type->set_value(CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); - - auto record_format = options.Add(); - record_format->set_key(cdc::kRecordFormat); - record_format->set_value(CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); - - auto checkpoint_type = options.Add(); - checkpoint_type->set_key(cdc::kCheckpointType); - checkpoint_type->set_value(CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT)); - - return options; -} - } // namespace yb::client diff --git a/src/yb/client/xcluster_client.h b/src/yb/client/xcluster_client.h index d3985e8a667a..3d85c7d11871 100644 --- a/src/yb/client/xcluster_client.h +++ b/src/yb/client/xcluster_client.h @@ -44,11 +44,49 @@ class SecureContext; namespace client { class YBClient; +class XClusterClient; using GetXClusterStreamsCallback = std::function)>; using IsXClusterBootstrapRequiredCallback = std::function)>; +// This class creates and holds a dedicated YbClient, XClusterClient and their dependant objects +// messenger and secure context. The client connects to a remote yb xCluster universe. +class XClusterRemoteClientHolder { + public: + static constexpr auto kClientName = "XClusterRemote"; + + static Result> Create( + const xcluster::ReplicationGroupId& replication_group_id, + const std::vector& remote_masters); + + virtual ~XClusterRemoteClientHolder(); + + virtual void Shutdown(); + + Status SetMasterAddresses(const std::vector& remote_masters); + + Status ReloadCertificates(); + + XClusterClient& GetXClusterClient(); + client::YBClient& GetYbClient(); + + private: + friend class MockXClusterRemoteClientHolder; + + explicit XClusterRemoteClientHolder(const xcluster::ReplicationGroupId& replication_group_id); + Status Init(const std::vector& remote_masters); + + const xcluster::ReplicationGroupId replication_group_id_; + std::unique_ptr secure_context_; + std::unique_ptr messenger_; + + std::unique_ptr yb_client_; + std::unique_ptr xcluster_client_; + + DISALLOW_COPY_AND_ASSIGN(XClusterRemoteClientHolder); +}; + // A wrapper over YBClient to handle xCluster related RPCs. // This class performs serialization of C++ objects to PBs and vice versa. class XClusterClient { @@ -170,30 +208,6 @@ class XClusterClient { Result GetUniverseReplicationInfo( const xcluster::ReplicationGroupId& replication_group_id); - private: - CoarseTimePoint GetDeadline() const; - - template - Result SyncLeaderMasterRpc( - const RequestPB& req, const char* method_name, const Method& method); - - client::YBClient& yb_client_; -}; - -// A wrapper over YBClient to handle xCluster related RPCs sent to a different yb universe. -// This class performs serialization of C++ objects to PBs and vice versa. -class XClusterRemoteClient { - public: - XClusterRemoteClient(const std::string& certs_for_cdc_dir, MonoDelta timeout); - virtual ~XClusterRemoteClient(); - - virtual Status Init( - const xcluster::ReplicationGroupId& replication_group_id, - const std::vector& remote_masters); - - XClusterClient* operator->() {return GetXClusterClient();} - XClusterClient* GetXClusterClient(); - // This requires flag enable_xcluster_api_v2 to be set. virtual Result SetupDbScopedUniverseReplication( const xcluster::ReplicationGroupId& replication_group_id, @@ -222,21 +236,16 @@ class XClusterRemoteClient { const std::vector& bootstrap_ids); private: + CoarseTimePoint GetDeadline() const; + template Result SyncLeaderMasterRpc( const RequestPB& req, const char* method_name, const Method& method); - const std::string certs_for_cdc_dir_; - const MonoDelta timeout_; - std::unique_ptr secure_context_; - std::unique_ptr messenger_; - - std::unique_ptr yb_client_; - std::unique_ptr xcluster_client_; + client::YBClient& yb_client_; }; -// TODO: Move xcluster_util to common and this into it. -google::protobuf::RepeatedPtrField GetXClusterStreamOptions(); +google::protobuf::RepeatedPtrField GetXClusterStreamOptions(); } // namespace client } // namespace yb diff --git a/src/yb/client/xcluster_client_mock.cc b/src/yb/client/xcluster_client_mock.cc new file mode 100644 index 000000000000..9a2541ea361a --- /dev/null +++ b/src/yb/client/xcluster_client_mock.cc @@ -0,0 +1,34 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/client/xcluster_client_mock.h" +#include "yb/client/client.h" + +namespace yb::client { + +const xcluster::ReplicationGroupId kDummyReplicationGroupId = + xcluster::ReplicationGroupId("dummy-replication-group-id"); + +MockXClusterRemoteClientHolder::MockXClusterRemoteClientHolder() + : XClusterRemoteClientHolder(kDummyReplicationGroupId) { + yb_client_ = std::make_unique(); + xcluster_client_ = std::make_unique(*yb_client_); +} + +MockXClusterRemoteClientHolder::~MockXClusterRemoteClientHolder() {} + +MockXClusterClient::MockXClusterClient(YBClient& yb_client) : XClusterClient(yb_client) {} + +MockXClusterClient::~MockXClusterClient() {} + +} // namespace yb::client diff --git a/src/yb/client/xcluster_client_mock.h b/src/yb/client/xcluster_client_mock.h new file mode 100644 index 000000000000..7a269ca538f7 --- /dev/null +++ b/src/yb/client/xcluster_client_mock.h @@ -0,0 +1,62 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#pragma once + +#include + +#include "yb/client/xcluster_client.h" +#include "yb/util/is_operation_done_result.h" + +namespace yb::client { + +class MockXClusterClient : public XClusterClient { + public: + explicit MockXClusterClient(YBClient& yb_client); + + virtual ~MockXClusterClient(); + + MOCK_METHOD( + Result, SetupDbScopedUniverseReplication, + (const xcluster::ReplicationGroupId&, const std::vector&, + const std::vector&, const std::vector&, + const std::vector&, const std::vector&), + (override)); + + MOCK_METHOD( + Result, IsSetupUniverseReplicationDone, + (const xcluster::ReplicationGroupId&), (override)); + + MOCK_METHOD( + Status, AddNamespaceToDbScopedUniverseReplication, + (const xcluster::ReplicationGroupId& replication_group_id, + const UniverseUuid& target_universe_uuid, const NamespaceName& namespace_name, + const NamespaceId& source_namespace_id, const std::vector& source_table_ids, + const std::vector& bootstrap_ids), + (override)); +}; + +class MockXClusterRemoteClientHolder : public XClusterRemoteClientHolder { + public: + MockXClusterRemoteClientHolder(); + + virtual ~MockXClusterRemoteClientHolder(); + + MockXClusterClient& GetMockXClusterClient() { + return *static_cast(xcluster_client_.get()); + } + + MOCK_METHOD(void, Shutdown, (), (override)); +}; + +} // namespace yb::client diff --git a/src/yb/common/CMakeLists.txt b/src/yb/common/CMakeLists.txt index 79f0cfc6d75a..d0b5d04437d7 100644 --- a/src/yb/common/CMakeLists.txt +++ b/src/yb/common/CMakeLists.txt @@ -105,6 +105,7 @@ set(COMMON_SRCS transaction_error.cc wire_protocol.cc llvm_profile_dumper.cc + xcluster_util.cc ) set(COMMON_LIBS diff --git a/src/yb/common/common_flags.cc b/src/yb/common/common_flags.cc index 5cbffdd6ed55..738492ffd38d 100644 --- a/src/yb/common/common_flags.cc +++ b/src/yb/common/common_flags.cc @@ -193,6 +193,16 @@ DEFINE_NON_RUNTIME_PREVIEW_bool(enable_pg_cron, false, "Enables the pg_cron extension. Jobs will be run on a single tserver node. The node should be " "assumed to be selected randomly."); +DEFINE_NON_RUNTIME_string(certs_for_cdc_dir, "", + "The parent directory of where all certificates for xCluster source universes will " + "be stored, for when the source and target universes use different certificates. " + "Place the certificates for each source universe in " + "//*."); + +DEFINE_NON_RUNTIME_int32(cdc_read_rpc_timeout_ms, 30 * 1000, + "Timeout used for CDC read rpc calls. Reads normally occur cross-cluster."); +TAG_FLAG(cdc_read_rpc_timeout_ms, advanced); + namespace yb { void InitCommonFlags() { diff --git a/src/yb/cdc/xcluster_util.cc b/src/yb/common/xcluster_util.cc similarity index 97% rename from src/yb/cdc/xcluster_util.cc rename to src/yb/common/xcluster_util.cc index 4fa22a316371..3112d2364794 100644 --- a/src/yb/cdc/xcluster_util.cc +++ b/src/yb/common/xcluster_util.cc @@ -11,7 +11,8 @@ // under the License. // -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" + #include "yb/gutil/strings/util.h" namespace yb::xcluster { @@ -42,4 +43,5 @@ std::string ShortReplicationType(XClusterReplicationType type) { XClusterReplicationType_Name(type), "XCLUSTER_", "", /*replace_all=*/false); } + } // namespace yb::xcluster diff --git a/src/yb/cdc/xcluster_util.h b/src/yb/common/xcluster_util.h similarity index 100% rename from src/yb/cdc/xcluster_util.h rename to src/yb/common/xcluster_util.h diff --git a/src/yb/integration-tests/xcluster/xcluster-test.cc b/src/yb/integration-tests/xcluster/xcluster-test.cc index e2edadd47200..bc96e352b8aa 100644 --- a/src/yb/integration-tests/xcluster/xcluster-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster-test.cc @@ -22,7 +22,8 @@ #include "yb/cdc/cdc_service.pb.h" #include "yb/cdc/cdc_service.proxy.h" #include "yb/cdc/cdc_state_table.h" -#include "yb/cdc/xcluster_util.h" +#include "yb/client/xcluster_client.h" +#include "yb/common/xcluster_util.h" #include "yb/cdc/xrepl_stream_metadata.h" #include "yb/client/client-test-util.h" @@ -3310,25 +3311,28 @@ Status VerifyMetaCacheObjectIsValid( return json_reader.ExtractObjectArray(meta_cache, "tablets", &tablets); } -void VerifyMetaCacheWithXClusterConsumerSetUp(const std::string& produced_json) { +Status VerifyMetaCacheWithXClusterConsumerSetUp(const std::string& produced_json) { JsonReader json_reader(produced_json); - EXPECT_OK(json_reader.Init()); + RETURN_NOT_OK(json_reader.Init()); const rapidjson::Value* object = nullptr; - EXPECT_OK(json_reader.ExtractObject(json_reader.root(), nullptr, &object)); - EXPECT_EQ(rapidjson::kObjectType, CHECK_NOTNULL(object)->GetType()); + RETURN_NOT_OK(json_reader.ExtractObject(json_reader.root(), nullptr, &object)); + SCHECK_EQ( + CHECK_NOTNULL(object)->GetType(), rapidjson::kObjectType, IllegalState, "Not an JSON object"); - EXPECT_OK(VerifyMetaCacheObjectIsValid(object, json_reader, "MainMetaCache")); + RETURN_NOT_OK(VerifyMetaCacheObjectIsValid(object, json_reader, "MainMetaCache")); bool found_xcluster_member = false; - for (auto it = object->MemberBegin(); it != object->MemberEnd(); - ++it) { + for (auto it = object->MemberBegin(); it != object->MemberEnd(); ++it) { std::string member_name = it->name.GetString(); - if (member_name.starts_with("XClusterConsumerRemote_")) { + if (member_name.starts_with(client::XClusterRemoteClientHolder::kClientName)) { found_xcluster_member = true; } - EXPECT_OK(VerifyMetaCacheObjectIsValid(object, json_reader, member_name.c_str())); + RETURN_NOT_OK(VerifyMetaCacheObjectIsValid(object, json_reader, member_name.c_str())); } - EXPECT_TRUE(found_xcluster_member) - << "No member name starting with XClusterConsumerRemote_ is found"; + SCHECK_FORMAT( + found_xcluster_member, IllegalState, "No member name starting with $0 found", + client::XClusterRemoteClientHolder::kClientName); + + return Status::OK(); } TEST_F_EX(XClusterTest, ListMetaCacheAfterXClusterSetup, XClusterTestNoParam) { @@ -3352,8 +3356,21 @@ TEST_F_EX(XClusterTest, ListMetaCacheAfterXClusterSetup, XClusterTestNoParam) { auto tserver_endpoint = tserver->bound_http_addr(); auto query_endpoint = "http://" + AsString(tserver_endpoint) + "/api/v1/meta-cache"; faststring result; - ASSERT_OK(EasyCurl().FetchURL(query_endpoint, &result)); - VerifyMetaCacheWithXClusterConsumerSetUp(result.ToString()); + + // Attempts to fetch url until a response with status OK, or until timeout. + // On mac the curl command fails with error "A libcurl function was given a bad argument", but + // succeeds on retries. + ASSERT_OK(WaitFor( + [&]() -> bool { + EasyCurl curl; + auto status = curl.FetchURL(query_endpoint, &result); + YB_LOG_IF_EVERY_N(WARNING, !status.ok(), 5) << status; + + return status.ok(); + }, + 30s, "Wait for curl response to return with status OK")); + + ASSERT_OK(VerifyMetaCacheWithXClusterConsumerSetUp(result.ToString())); } ASSERT_OK(DeleteUniverseReplication()); } diff --git a/src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc b/src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc index 825e2c89801e..b709c0661784 100644 --- a/src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc @@ -11,7 +11,7 @@ // under the License. // -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/client/table.h" #include "yb/client/xcluster_client.h" #include "yb/client/yb_table_name.h" diff --git a/src/yb/integration-tests/xcluster/xcluster_test_base.cc b/src/yb/integration-tests/xcluster/xcluster_test_base.cc index aa0c53fe94ea..ff24bd334894 100644 --- a/src/yb/integration-tests/xcluster/xcluster_test_base.cc +++ b/src/yb/integration-tests/xcluster/xcluster_test_base.cc @@ -15,7 +15,7 @@ #include -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/client/client.h" #include "yb/client/table.h" diff --git a/src/yb/master/CMakeLists.txt b/src/yb/master/CMakeLists.txt index 6699708c18f3..b9faacdf2603 100644 --- a/src/yb/master/CMakeLists.txt +++ b/src/yb/master/CMakeLists.txt @@ -260,6 +260,7 @@ ADD_YB_TEST(table_index_test) ADD_YB_TEST(tablet_creation_limits_test) ADD_YB_TEST(xcluster/xcluster_safe_time_service-test) ADD_YB_TEST(xcluster/xcluster_outbound_replication_group-test) +YB_TEST_TARGET_LINK_LIBRARIES(xcluster/xcluster_outbound_replication_group-test yb_client_mock) # Actual master executable. In LTO mode, can also act as the tablet server if executed through a # symlink named as the tablet server executable. diff --git a/src/yb/master/master-path-handlers.cc b/src/yb/master/master-path-handlers.cc index ea5bf1287d99..9bdbaa73e969 100644 --- a/src/yb/master/master-path-handlers.cc +++ b/src/yb/master/master-path-handlers.cc @@ -42,7 +42,7 @@ #include -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/common/common_types_util.h" #include "yb/common/hybrid_time.h" diff --git a/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc b/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc index ce35af7275eb..cce3d53d09d8 100644 --- a/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc +++ b/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc @@ -13,7 +13,7 @@ #include "yb/master/xcluster/add_table_to_xcluster_target_task.h" -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/client/xcluster_client.h" #include "yb/master/catalog_manager.h" #include "yb/util/is_operation_done_result.h" @@ -23,7 +23,6 @@ #include "yb/rpc/messenger.h" #include "yb/util/logging.h" #include "yb/util/sync_point.h" -#include "yb/util/trace.h" DEFINE_test_flag(bool, xcluster_fail_table_create_during_bootstrap, false, "Fail the table or index creation during xcluster bootstrap stage."); @@ -101,8 +100,8 @@ Status AddTableToXClusterTargetTask::FirstStep() { VERIFY_RESULT(GetProducerNamespaceId(*universe_, table_info_->namespace_id())); // We need to keep the client alive until the callback is invoked. - remote_client_ = VERIFY_RESULT(GetXClusterRemoteClient(*universe_)); - return remote_client_->GetXClusterTableCheckpointInfos( + remote_client_ = VERIFY_RESULT(GetXClusterRemoteClientHolder(*universe_)); + return remote_client_->GetXClusterClient().GetXClusterTableCheckpointInfos( universe_->ReplicationGroupId(), producer_namespace_id, {table_info_->name()}, {table_info_->pgschema_name()}, std::move(callback)); } diff --git a/src/yb/master/xcluster/add_table_to_xcluster_target_task.h b/src/yb/master/xcluster/add_table_to_xcluster_target_task.h index 93827d3bb285..33ab7b416705 100644 --- a/src/yb/master/xcluster/add_table_to_xcluster_target_task.h +++ b/src/yb/master/xcluster/add_table_to_xcluster_target_task.h @@ -24,7 +24,7 @@ namespace yb { namespace client { -class XClusterRemoteClient; +class XClusterRemoteClientHolder; } // namespace client namespace master { @@ -64,7 +64,7 @@ class AddTableToXClusterTargetTask : public PostTabletCreateTaskBase { HybridTime bootstrap_time_ = HybridTime::kInvalid; HybridTime initial_xcluster_safe_time_ = HybridTime::kInvalid; scoped_refptr universe_; - std::shared_ptr remote_client_; + std::shared_ptr remote_client_; XClusterManagerIf& xcluster_manager_; bool is_db_scoped_ = false; }; diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc b/src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc index 4a75af9b208f..99f00a3e489c 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc @@ -15,7 +15,7 @@ #include -#include "yb/client/xcluster_client.h" +#include "yb/client/xcluster_client_mock.h" #include "yb/master/catalog_entity_info.h" #include "yb/master/xcluster/xcluster_outbound_replication_group_tasks.h" @@ -39,41 +39,12 @@ using testing::Return; namespace yb::master { -const UniverseUuid kTargetUniverseUuid = UniverseUuid::GenerateRandom(); -const LeaderEpoch kEpoch = LeaderEpoch(1, 1); +const auto kEpoch = master::LeaderEpoch(1, 1); inline bool operator==(const NamespaceCheckpointInfo& lhs, const NamespaceCheckpointInfo& rhs) { return YB_STRUCT_EQUALS(initial_bootstrap_required, table_infos); } -class XClusterRemoteClientMocked : public client::XClusterRemoteClient { - public: - XClusterRemoteClientMocked() : client::XClusterRemoteClient("na", MonoDelta::kMax) { - DefaultValue>::Set(kTargetUniverseUuid); - DefaultValue>::Set(IsOperationDoneResult::Done()); - } - - MOCK_METHOD2(Init, Status(const xcluster::ReplicationGroupId&, const std::vector&)); - MOCK_METHOD6( - SetupDbScopedUniverseReplication, - Result( - const xcluster::ReplicationGroupId&, const std::vector&, - const std::vector&, const std::vector&, - const std::vector&, const std::vector&)); - - MOCK_METHOD1( - IsSetupUniverseReplicationDone, - Result(const xcluster::ReplicationGroupId&)); - - MOCK_METHOD6( - AddNamespaceToDbScopedUniverseReplication, - Status( - const xcluster::ReplicationGroupId& replication_group_id, - const UniverseUuid& target_universe_uuid, const NamespaceName& namespace_name, - const NamespaceId& source_namespace_id, const std::vector& source_table_ids, - const std::vector& bootstrap_ids)); -}; - Status ValidateEpoch(const LeaderEpoch& epoch) { SCHECK_EQ(epoch, kEpoch, IllegalState, "Epoch does not match"); return Status::OK(); @@ -96,11 +67,11 @@ class XClusterOutboundReplicationGroupMocked : public XClusterOutboundReplicatio : XClusterOutboundReplicationGroup( replication_group_id, {}, std::move(helper_functions), /*tasks_tracker=*/nullptr, task_factory) { - remote_client_ = std::make_shared(); + remote_client_ = std::make_shared(); } - void SetRemoteClient(std::shared_ptr remote_client) { - remote_client_ = remote_client; + client::MockXClusterClient& GetMockXClusterClient() { + return remote_client_->GetMockXClusterClient(); } bool IsDeleted() const { @@ -151,16 +122,17 @@ class XClusterOutboundReplicationGroupMocked : public XClusterOutboundReplicatio } private: - virtual Result> GetRemoteClient( + virtual Result> GetRemoteClient( const std::vector& remote_masters) const override { return remote_client_; } - std::shared_ptr remote_client_; + std::shared_ptr remote_client_; }; class XClusterOutboundReplicationGroupMockedTest : public YBTest { public: + const UniverseUuid kTargetUniverseUuid = UniverseUuid::GenerateRandom(); const NamespaceName kNamespaceName = "db1"; const NamespaceId kNamespaceId = "db1_id"; const PgSchemaName kPgSchemaName = "public", kPgSchemaName2 = "public2"; @@ -172,6 +144,9 @@ class XClusterOutboundReplicationGroupMockedTest : public YBTest { XClusterOutboundReplicationGroupMockedTest() { google::SetVLOGLevel("*", 4); + DefaultValue>::Set(kTargetUniverseUuid); + DefaultValue>::Set(IsOperationDoneResult::Done()); + ThreadPoolBuilder thread_pool_builder("Test"); CHECK_OK(thread_pool_builder.Build(&thread_pool)); @@ -473,14 +448,13 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, CreateTargetReplicationGroup) auto outbound_rg_ptr = CreateReplicationGroup(); auto& outbound_rg = *outbound_rg_ptr; - auto remote_client = std::make_shared(); - outbound_rg.SetRemoteClient(remote_client); + auto& xcluster_client = outbound_rg.GetMockXClusterClient(); ASSERT_OK(outbound_rg.AddNamespaceSync(kEpoch, kNamespaceId, kTimeout)); std::vector streams{xcluster_streams.begin(), xcluster_streams.end()}; EXPECT_CALL( - *remote_client, + xcluster_client, SetupDbScopedUniverseReplication( kReplicationGroupId, _, std::vector{kNamespaceName}, std::vector{kNamespaceId}, std::vector{kTableId1}, streams)) @@ -488,7 +462,7 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, CreateTargetReplicationGroup) ASSERT_OK(outbound_rg.CreateXClusterReplication({}, {}, kEpoch)); - EXPECT_CALL(*remote_client, IsSetupUniverseReplicationDone(_)) + EXPECT_CALL(xcluster_client, IsSetupUniverseReplicationDone(_)) .WillOnce(Return(IsOperationDoneResult::NotDone())); auto create_result = ASSERT_RESULT(outbound_rg.IsCreateXClusterReplicationDone({}, kEpoch)); @@ -496,7 +470,7 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, CreateTargetReplicationGroup) // Fail the Setup. const auto error_str = "Failed by test"; - EXPECT_CALL(*remote_client, IsSetupUniverseReplicationDone(_)) + EXPECT_CALL(xcluster_client, IsSetupUniverseReplicationDone(_)) .WillOnce(Return(STATUS(IllegalState, error_str))); auto result = outbound_rg.IsCreateXClusterReplicationDone({}, kEpoch); ASSERT_NOK(result); @@ -509,7 +483,7 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, CreateTargetReplicationGroup) pb.target_universe_info().state(), SysXClusterOutboundReplicationGroupEntryPB::TargetUniverseInfoPB::CREATING_REPLICATION_GROUP); - EXPECT_CALL(*remote_client, IsSetupUniverseReplicationDone(_)) + EXPECT_CALL(xcluster_client, IsSetupUniverseReplicationDone(_)) .WillOnce(Return(IsOperationDoneResult::Done(STATUS(IllegalState, error_str)))); create_result = ASSERT_RESULT(outbound_rg.IsCreateXClusterReplicationDone({}, kEpoch)); ASSERT_TRUE(create_result.done()); @@ -519,10 +493,10 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, CreateTargetReplicationGroup) ASSERT_FALSE(pb.has_target_universe_info()); // Success case. - EXPECT_CALL(*remote_client, IsSetupUniverseReplicationDone(_)) + EXPECT_CALL(xcluster_client, IsSetupUniverseReplicationDone(_)) .WillOnce(Return(IsOperationDoneResult::Done())); - EXPECT_CALL(*remote_client, SetupDbScopedUniverseReplication(_, _, _, _, _, _)); + EXPECT_CALL(xcluster_client, SetupDbScopedUniverseReplication(_, _, _, _, _, _)); // Calling create again should not do anything. ASSERT_OK(outbound_rg.CreateXClusterReplication({}, {}, kEpoch)); diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc index 2928f115f609..165cccf7aea4 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc @@ -12,7 +12,7 @@ // #include "yb/master/xcluster/xcluster_outbound_replication_group.h" -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/client/xcluster_client.h" #include "yb/common/colocated_util.h" #include "yb/master/catalog_entity_info.h" @@ -24,9 +24,6 @@ DEFINE_RUNTIME_uint32(max_xcluster_streams_to_checkpoint_in_parallel, 200, "Maximum number of xCluster streams to checkpoint in parallel"); -DECLARE_int32(cdc_read_rpc_timeout_ms); -DECLARE_string(certs_for_cdc_dir); - using namespace std::placeholders; namespace yb::master { @@ -502,8 +499,8 @@ Status XClusterOutboundReplicationGroup::RemoveNamespace( UniverseUuid::FromString(outbound_group_pb.target_universe_info().universe_uuid())); auto remote_client = VERIFY_RESULT(GetRemoteClient(target_master_addresses)); - RETURN_NOT_OK( - (*remote_client)->RemoveNamespaceFromUniverseReplication(Id(), namespace_id, target_uuid)); + RETURN_NOT_OK(remote_client->GetXClusterClient().RemoveNamespaceFromUniverseReplication( + Id(), namespace_id, target_uuid)); } RETURN_NOT_OK(DeleteNamespaceStreams(epoch, namespace_id, outbound_group_pb)); @@ -533,8 +530,8 @@ Status XClusterOutboundReplicationGroup::Delete( UniverseUuid::FromString(outbound_group_pb.target_universe_info().universe_uuid())); auto remote_client = VERIFY_RESULT(GetRemoteClient(target_master_addresses)); - RETURN_NOT_OK( - (*remote_client)->DeleteUniverseReplication(Id(), /*ignore_errors=*/true, target_uuid)); + RETURN_NOT_OK(remote_client->GetXClusterClient().DeleteUniverseReplication( + Id(), /*ignore_errors=*/true, target_uuid)); } for (const auto& [namespace_id, _] : *outbound_group_pb.mutable_namespace_infos()) { @@ -678,13 +675,10 @@ XClusterOutboundReplicationGroup::GetNamespaceCheckpointInfoForTableIds( return ns_info; } -Result> +Result> XClusterOutboundReplicationGroup::GetRemoteClient( const std::vector& remote_masters) const { - auto client = std::make_shared( - FLAGS_certs_for_cdc_dir, MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms)); - RETURN_NOT_OK(client->Init(Id(), remote_masters)); - return client; + return client::XClusterRemoteClientHolder::Create(Id(), remote_masters); } Status XClusterOutboundReplicationGroup::CreateXClusterReplication( @@ -729,9 +723,10 @@ Status XClusterOutboundReplicationGroup::CreateXClusterReplication( auto remote_client = VERIFY_RESULT(GetRemoteClient(target_master_addresses)); - auto target_uuid = VERIFY_RESULT(remote_client->SetupDbScopedUniverseReplication( - Id(), source_master_addresses, namespace_names, namespace_ids, source_table_ids, - bootstrap_ids)); + auto target_uuid = + VERIFY_RESULT(remote_client->GetXClusterClient().SetupDbScopedUniverseReplication( + Id(), source_master_addresses, namespace_names, namespace_ids, source_table_ids, + bootstrap_ids)); auto* target_universe_info = l.mutable_data()->pb.mutable_target_universe_info(); @@ -778,7 +773,8 @@ Result XClusterOutboundReplicationGroup::IsCreateXCluster // TODO(#20810): Remove this once async task that polls for IsCreateXClusterReplicationDone gets // added. auto remote_client = VERIFY_RESULT(GetRemoteClient(target_master_addresses)); - setup_result = VERIFY_RESULT(remote_client->IsSetupUniverseReplicationDone(Id())); + setup_result = + VERIFY_RESULT(remote_client->GetXClusterClient().IsSetupUniverseReplicationDone(Id())); } if (!setup_result.done()) { @@ -842,7 +838,7 @@ Status XClusterOutboundReplicationGroup::AddNamespaceToTarget( auto remote_client = VERIFY_RESULT(GetRemoteClient(target_master_addresses)); - RETURN_NOT_OK(remote_client->AddNamespaceToDbScopedUniverseReplication( + RETURN_NOT_OK(remote_client->GetXClusterClient().AddNamespaceToDbScopedUniverseReplication( Id(), target_uuid, namespace_name, source_namespace_id, source_table_ids, bootstrap_ids)); // TODO(#20810): Start a async task that will poll for IsCreateXClusterReplicationDone and update @@ -854,7 +850,8 @@ Status XClusterOutboundReplicationGroup::AddNamespaceToTarget( Result XClusterOutboundReplicationGroup::IsAlterXClusterReplicationDone( const std::vector& target_master_addresses, const LeaderEpoch& epoch) { auto remote_client = VERIFY_RESULT(GetRemoteClient(target_master_addresses)); - return remote_client->IsSetupUniverseReplicationDone(xcluster::GetAlterReplicationGroupId(Id())); + return remote_client->GetXClusterClient().IsSetupUniverseReplicationDone( + xcluster::GetAlterReplicationGroupId(Id())); } bool XClusterOutboundReplicationGroup::HasNamespace(const NamespaceId& namespace_id) const { diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.h b/src/yb/master/xcluster/xcluster_outbound_replication_group.h index f77098f314e2..7719e42462b4 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.h +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.h @@ -25,7 +25,7 @@ namespace yb { class IsOperationDoneResult; namespace client { -class XClusterRemoteClient; +class XClusterRemoteClientHolder; } // namespace client namespace master { @@ -167,7 +167,7 @@ class XClusterOutboundReplicationGroup const LeaderEpoch& epoch, const NamespaceId& namespace_id, const SysXClusterOutboundReplicationGroupEntryPB& pb) REQUIRES(mutex_); - virtual Result> GetRemoteClient( + virtual Result> GetRemoteClient( const std::vector& remote_masters) const; // Checks if the namespace is part of this replication group. Caller must hold the read or write diff --git a/src/yb/master/xcluster/xcluster_replication_group.cc b/src/yb/master/xcluster/xcluster_replication_group.cc index 62d750626ca4..e99b3ea2d6c1 100644 --- a/src/yb/master/xcluster/xcluster_replication_group.cc +++ b/src/yb/master/xcluster/xcluster_replication_group.cc @@ -24,14 +24,11 @@ #include "yb/util/is_operation_done_result.h" #include "yb/master/xcluster_rpc_tasks.h" #include "yb/master/xcluster/xcluster_manager_if.h" -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/master/sys_catalog.h" #include "yb/util/flags/auto_flags_util.h" #include "yb/util/result.h" -DECLARE_int32(cdc_read_rpc_timeout_ms); -DECLARE_string(certs_for_cdc_dir); - DEFINE_RUNTIME_bool(xcluster_skip_health_check_on_replication_setup, false, "Skip health check on xCluster replication setup"); @@ -398,16 +395,13 @@ bool IncludesConsumerNamespace( return opt_namespace_id.has_value(); } -Result> GetXClusterRemoteClient( +Result> GetXClusterRemoteClientHolder( UniverseReplicationInfo& universe) { auto master_addresses = universe.LockForRead()->pb.producer_master_addresses(); std::vector hp; HostPortsFromPBs(master_addresses, &hp); - auto xcluster_client = std::make_shared( - FLAGS_certs_for_cdc_dir, MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms)); - RETURN_NOT_OK(xcluster_client->Init(universe.ReplicationGroupId(), hp)); - return xcluster_client; + return client::XClusterRemoteClientHolder::Create(universe.ReplicationGroupId(), hp); } Result IsSetupUniverseReplicationDone( diff --git a/src/yb/master/xcluster/xcluster_replication_group.h b/src/yb/master/xcluster/xcluster_replication_group.h index 84665ad8f6be..d37d9c93a53f 100644 --- a/src/yb/master/xcluster/xcluster_replication_group.h +++ b/src/yb/master/xcluster/xcluster_replication_group.h @@ -23,7 +23,7 @@ class IsOperationDoneResult; class SysCatalogTable; namespace client { -class XClusterRemoteClient; +class XClusterRemoteClientHolder; } // namespace client namespace master { @@ -72,7 +72,7 @@ Result GetProducerNamespaceId( bool IncludesConsumerNamespace( UniverseReplicationInfo& universe, const NamespaceId& consumer_namespace_id); -Result> GetXClusterRemoteClient( +Result> GetXClusterRemoteClientHolder( UniverseReplicationInfo& universe); // Returns (false, Status::OK()) if the universe setup is still in progress. diff --git a/src/yb/master/xcluster/xcluster_source_manager.cc b/src/yb/master/xcluster/xcluster_source_manager.cc index 958a577c2acc..c2b83f0289f4 100644 --- a/src/yb/master/xcluster/xcluster_source_manager.cc +++ b/src/yb/master/xcluster/xcluster_source_manager.cc @@ -18,6 +18,7 @@ #include "yb/cdc/cdc_state_table.h" #include "yb/cdc/xcluster_types.h" #include "yb/client/xcluster_client.h" +#include "yb/common/xcluster_util.h" #include "yb/master/catalog_manager.h" #include "yb/master/master.h" #include "yb/master/xcluster/master_xcluster_util.h" diff --git a/src/yb/master/xcluster_rpc_tasks.cc b/src/yb/master/xcluster_rpc_tasks.cc index f8902fec3b1c..bf73227186af 100644 --- a/src/yb/master/xcluster_rpc_tasks.cc +++ b/src/yb/master/xcluster_rpc_tasks.cc @@ -16,7 +16,7 @@ #include "yb/client/client.h" #include "yb/client/yb_table_name.h" -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/gutil/callback.h" diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index e97d2e9388df..79720f2045b5 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -12,7 +12,7 @@ #include "yb/cdc/cdc_service.h" #include "yb/cdc/cdc_state_table.h" -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/client/meta_cache.h" #include "yb/client/schema.h" diff --git a/src/yb/tools/yb-admin_cli.cc b/src/yb/tools/yb-admin_cli.cc index 348a408c8250..84e00b505888 100644 --- a/src/yb/tools/yb-admin_cli.cc +++ b/src/yb/tools/yb-admin_cli.cc @@ -38,7 +38,7 @@ #include #include -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/client/xcluster_client.h" #include "yb/common/hybrid_time.h" #include "yb/common/json_util.h" diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index 5862be7ec695..ec0a3d253637 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -46,7 +46,7 @@ #include #include "yb/cdc/cdc_service.h" -#include "yb/cdc/xcluster_util.h" +#include "yb/common/xcluster_util.h" #include "yb/client/client.h" #include "yb/client/table.h" #include "yb/client/table_creator.h" diff --git a/src/yb/tserver/tablet_server.cc b/src/yb/tserver/tablet_server.cc index c5b6e70d5f93..bc7e9080277d 100644 --- a/src/yb/tserver/tablet_server.cc +++ b/src/yb/tserver/tablet_server.cc @@ -1131,14 +1131,12 @@ void TabletServer::SetYsqlDBCatalogVersions( void TabletServer::WriteServerMetaCacheAsJson(JsonWriter* writer) { writer->StartObject(); + DbServerBase::WriteMainMetaCacheAsJson(writer); if (auto xcluster_consumer = GetXClusterConsumer()) { - auto clients = xcluster_consumer->GetYbClientsList(); - for (auto client : clients) { - writer->String(client->client_name()); - client->AddMetaCacheInfo(writer); - } + xcluster_consumer->WriteServerMetaCacheAsJson(*writer); } + writer->EndObject(); } diff --git a/src/yb/tserver/xcluster_consumer.cc b/src/yb/tserver/xcluster_consumer.cc index 5fdbcfeccc3a..0e56228d82a2 100644 --- a/src/yb/tserver/xcluster_consumer.cc +++ b/src/yb/tserver/xcluster_consumer.cc @@ -11,12 +11,11 @@ // under the License. // -#include - #include "yb/cdc/xcluster_types.h" -#include "yb/cdc/xcluster_util.h" +#include "yb/client/error.h" #include "yb/client/session.h" #include "yb/client/table_handle.h" +#include "yb/client/xcluster_client.h" #include "yb/client/yb_op.h" #include "yb/client/yb_table_name.h" @@ -25,9 +24,7 @@ #include "yb/master/master_defaults.h" #include "yb/master/master_heartbeat.pb.h" -#include "yb/rpc/messenger.h" #include "yb/rpc/rpc.h" -#include "yb/rpc/secure_stream.h" #include "yb/tserver/xcluster_consumer.h" #include "yb/tserver/tserver_xcluster_context_if.h" #include "yb/tserver/xcluster_consumer_auto_flags_info.h" @@ -36,18 +33,15 @@ #include "yb/cdc/cdc_consumer.pb.h" -#include "yb/client/error.h" #include "yb/client/client.h" #include "yb/rocksdb/rate_limiter.h" #include "yb/gutil/map-util.h" -#include "yb/rpc/secure.h" #include "yb/util/callsite_profiling.h" #include "yb/util/flags.h" #include "yb/util/logging.h" -#include "yb/util/path_util.h" #include "yb/util/shared_lock.h" #include "yb/util/size_literals.h" #include "yb/util/status_log.h" @@ -103,11 +97,6 @@ DEFINE_test_flag(bool, xcluster_disable_delete_old_pollers, false, DEFINE_test_flag(bool, xcluster_enable_ddl_replication, false, "Enables xCluster automatic DDL replication."); -DECLARE_int32(cdc_read_rpc_timeout_ms); -DECLARE_int32(cdc_write_rpc_timeout_ms); -DECLARE_bool(use_node_to_node_encryption); -DECLARE_string(certs_for_cdc_dir); - using namespace std::chrono_literals; #define ACQUIRE_SHARED_LOCK_IF_ONLINE \ @@ -122,21 +111,6 @@ namespace yb { namespace tserver { -XClusterClient::~XClusterClient() { - if (messenger) { - messenger->Shutdown(); - } -} - -void XClusterClient::Shutdown() { - if (client) { - client->Shutdown(); - } - if (messenger) { - messenger->Shutdown(); - } -} - Result> CreateXClusterConsumer( std::function get_leader_term, const std::string& ts_uuid, client::YBClient& local_client, ConnectToPostgresFunc connect_to_pg_func, @@ -192,15 +166,14 @@ XClusterConsumer::~XClusterConsumer() { } Status XClusterConsumer::Init() { - // TODO(NIC): Unify xcluster_consumer thread_pool & remote_client_ threadpools RETURN_NOT_OK(yb::Thread::Create( "XClusterConsumer", "Poll", &XClusterConsumer::RunThread, this, &run_trigger_poll_thread_)); - ThreadPoolBuilder cdc_consumer_thread_pool_builder("XClusterConsumerHandler"); + ThreadPoolBuilder thread_pool_builder("XClusterConsumerHandler"); if (FLAGS_xcluster_consumer_thread_pool_size > 0) { - cdc_consumer_thread_pool_builder.set_max_threads(FLAGS_xcluster_consumer_thread_pool_size); + thread_pool_builder.set_max_threads(FLAGS_xcluster_consumer_thread_pool_size); } - return cdc_consumer_thread_pool_builder.Build(&thread_pool_); + return thread_pool_builder.Build(&thread_pool_); } void XClusterConsumer::Shutdown() { @@ -352,13 +325,12 @@ void XClusterConsumer::HandleMasterHeartbeatResponse( std::vector hp; HostPortsFromPBs(producer_entry_pb.master_addrs(), &hp); - auto master_addrs = HostPort::ToCommaSeparatedString(std::move(hp)); if (ContainsKey(old_uuid_master_addrs, replication_group_id) && - old_uuid_master_addrs[replication_group_id] != master_addrs) { + old_uuid_master_addrs[replication_group_id] != hp) { // If master addresses changed, mark for YBClient update. changed_master_addrs_.insert(replication_group_id); } - uuid_master_addrs_[replication_group_id] = std::move(master_addrs); + uuid_master_addrs_[replication_group_id] = std::move(hp); UpdateReplicationGroupInMemState(replication_group_id, producer_entry_pb); } @@ -495,7 +467,7 @@ void XClusterConsumer::TriggerPollForNewTablets() { // Update the Master Addresses, if altered after setup. if (ContainsKey(remote_clients_, replication_group_id) && changed_master_addrs_.count(replication_group_id) > 0) { - auto status = remote_clients_[replication_group_id]->client->SetMasterAddresses( + auto status = remote_clients_[replication_group_id]->SetMasterAddresses( uuid_master_addrs_[replication_group_id]); if (status.ok()) { changed_master_addrs_.erase(replication_group_id); @@ -515,53 +487,20 @@ void XClusterConsumer::TriggerPollForNewTablets() { if (start_polling) { // This is a new tablet, trigger a poll. // See if we need to create a new client connection - if (!ContainsKey(remote_clients_, replication_group_id)) { - CHECK(ContainsKey(uuid_master_addrs_, replication_group_id)); - - auto remote_client = std::make_unique(); - std::string dir; - if (FLAGS_use_node_to_node_encryption) { - rpc::MessengerBuilder messenger_builder("xcluster-consumer"); - if (!FLAGS_certs_for_cdc_dir.empty()) { - dir = JoinPathSegments( - FLAGS_certs_for_cdc_dir, - xcluster::GetOriginalReplicationGroupId(replication_group_id).ToString()); - } - - auto secure_context_result = rpc::SetupSecureContext( - dir, /*root_dir=*/"", /*name=*/"", rpc::SecureContextType::kInternal, - &messenger_builder); - if (!secure_context_result.ok()) { - LOG(WARNING) << "Could not create secure context for " << replication_group_id << ": " - << secure_context_result.status().ToString(); - return; // Don't finish creation. Try again on the next heartbeat. - } - remote_client->secure_context = std::move(*secure_context_result); - - auto messenger_result = messenger_builder.Build(); - if (!messenger_result.ok()) { - LOG(WARNING) << "Could not build messenger for " << replication_group_id << ": " - << secure_context_result.status().ToString(); - return; // Don't finish creation. Try again on the next heartbeat. - } - remote_client->messenger = std::move(*messenger_result); + if (!remote_clients_.contains(replication_group_id)) { + if (!uuid_master_addrs_.contains(replication_group_id)) { + LOG(DFATAL) << "Master address not found for " << replication_group_id; + return; // Don't finish creation. Try again on the next heartbeat. } - auto client_result = - yb::client::YBClientBuilder() - .set_client_name("XClusterConsumerRemote") - .add_master_server_addr(uuid_master_addrs_[replication_group_id]) - .skip_master_flagfile() - .default_rpc_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms)) - .Build(remote_client->messenger.get()); - if (!client_result.ok()) { - LOG(WARNING) << "Could not create a new YBClient for " << replication_group_id << ": " - << client_result.status().ToString(); + auto remote_client = client::XClusterRemoteClientHolder::Create( + replication_group_id, uuid_master_addrs_[replication_group_id]); + if (!remote_client) { + LOG(WARNING) << "Could not build messenger for " << replication_group_id << ": " + << remote_client.status(); return; // Don't finish creation. Try again on the next heartbeat. } - - remote_client->client = std::move(*client_result); - remote_clients_[replication_group_id] = std::move(remote_client); + remote_clients_[replication_group_id] = std::move(*remote_client); } SchemaVersion last_compatible_consumer_schema_version = cdc::kInvalidSchemaVersion; @@ -644,7 +583,7 @@ void XClusterConsumer::UpdatePollerSchemaVersionMaps( void XClusterConsumer::TriggerDeletionOfOldPollers() { // Shutdown outside of master_data_mutex_ lock, to not block any heartbeats. - std::vector> clients_to_delete; + std::vector> clients_to_delete; std::vector> pollers_to_shutdown; { ACQUIRE_SHARED_LOCK_IF_ONLINE; @@ -736,19 +675,8 @@ int32_t XClusterConsumer::cluster_config_version() const { Status XClusterConsumer::ReloadCertificates() { SharedLock read_lock(pollers_map_mutex_); - for (const auto& [replication_group_id, client] : remote_clients_) { - if (!client->secure_context) { - continue; - } - - std::string cert_dir; - if (!FLAGS_certs_for_cdc_dir.empty()) { - cert_dir = JoinPathSegments( - FLAGS_certs_for_cdc_dir, - xcluster::GetOriginalReplicationGroupId(replication_group_id).ToString()); - } - RETURN_NOT_OK(rpc::ReloadSecureContextKeysAndCertificates( - client->secure_context.get(), cert_dir, "" /* node_name */)); + for (const auto& [_, client] : remote_clients_) { + RETURN_NOT_OK(client->ReloadCertificates()); } return Status::OK(); @@ -870,17 +798,17 @@ Status XClusterConsumer::ReportNewAutoFlagConfigVersion( void XClusterConsumer::ClearAllClientMetaCaches() const { std::lock_guard write_lock_pollers(pollers_map_mutex_); for (auto& [group_id, xcluster_client] : remote_clients_) { - xcluster_client->client->ClearAllMetaCachesOnServer(); + xcluster_client->GetYbClient().ClearAllMetaCachesOnServer(); } } -std::vector> XClusterConsumer::GetYbClientsList() const { +void XClusterConsumer::WriteServerMetaCacheAsJson(JsonWriter& writer) const { SharedLock read_lock(pollers_map_mutex_); - std::vector> result; - for (auto& [_, remote_client] : remote_clients_) { - result.push_back(remote_client->client); + for (const auto& [_, remote_client] : remote_clients_) { + const auto& client = remote_client->GetYbClient(); + writer.String(client.client_name()); + client.AddMetaCacheInfo(&writer); } - return result; } } // namespace tserver diff --git a/src/yb/tserver/xcluster_consumer.h b/src/yb/tserver/xcluster_consumer.h index f3b43468a4ab..0a4af3849eaf 100644 --- a/src/yb/tserver/xcluster_consumer.h +++ b/src/yb/tserver/xcluster_consumer.h @@ -43,22 +43,18 @@ class RateLimiter; } // namespace rocksdb namespace yb { +class HostPort; class Thread; class ThreadPool; namespace rpc { class Messenger; class Rpcs; -class SecureContext; } // namespace rpc -namespace cdc { -class ConsumerRegistryPB; -} // namespace cdc - -namespace master { -class TSHeartbeatRequestPB; -} // namespace master +namespace client { +class XClusterRemoteClientHolder; +} // namespace client namespace tserver { class AutoFlagsVersionHandler; @@ -66,15 +62,6 @@ class XClusterPoller; class TabletServer; class TserverXClusterContextIf; -struct XClusterClient { - std::unique_ptr messenger; - std::unique_ptr secure_context; - std::shared_ptr client; - - ~XClusterClient(); - void Shutdown(); -}; - class XClusterConsumer : public XClusterConsumerIf { public: XClusterConsumer( @@ -110,7 +97,7 @@ class XClusterConsumer : public XClusterConsumerIf { return TEST_num_successful_write_rpcs_.load(std::memory_order_acquire); } - std::vector> GetYbClientsList() const override; + void WriteServerMetaCacheAsJson(JsonWriter& writer) const override; Status ReloadCertificates() override; @@ -235,9 +222,10 @@ class XClusterConsumer : public XClusterConsumerIf { client::YBClient& local_client_; // map: {replication_group_id : ...}. - std::unordered_map> remote_clients_ - GUARDED_BY(pollers_map_mutex_); - std::unordered_map uuid_master_addrs_ + std::unordered_map< + xcluster::ReplicationGroupId, std::shared_ptr> + remote_clients_ GUARDED_BY(pollers_map_mutex_); + std::unordered_map> uuid_master_addrs_ GUARDED_BY(master_data_mutex_); std::unordered_set changed_master_addrs_ GUARDED_BY(master_data_mutex_); diff --git a/src/yb/tserver/xcluster_consumer_if.h b/src/yb/tserver/xcluster_consumer_if.h index 37de9ceee113..bcc70d85bd9a 100644 --- a/src/yb/tserver/xcluster_consumer_if.h +++ b/src/yb/tserver/xcluster_consumer_if.h @@ -67,7 +67,7 @@ class XClusterConsumerIf { virtual std::vector TEST_producer_tablets_running() const = 0; virtual uint32_t TEST_GetNumSuccessfulWriteRpcs() = 0; virtual std::vector> TEST_ListPollers() const = 0; - virtual std::vector> GetYbClientsList() const = 0; + virtual void WriteServerMetaCacheAsJson(JsonWriter& writer) const = 0; virtual void ClearAllClientMetaCaches() const = 0; virtual scoped_refptr TEST_metric_replication_error_count() const = 0; virtual scoped_refptr TEST_metric_apply_failure_count() const = 0; diff --git a/src/yb/tserver/xcluster_poller.cc b/src/yb/tserver/xcluster_poller.cc index f38bfd05c3c9..5a35bfaf46b0 100644 --- a/src/yb/tserver/xcluster_poller.cc +++ b/src/yb/tserver/xcluster_poller.cc @@ -13,6 +13,7 @@ #include "yb/tserver/xcluster_poller.h" #include "yb/client/client_fwd.h" +#include "yb/client/xcluster_client.h" #include "yb/common/wire_protocol.h" #include "yb/gutil/strings/split.h" #include "yb/tserver/xcluster_consumer.h" @@ -112,9 +113,9 @@ XClusterPoller::XClusterPoller( const NamespaceId& consumer_namespace_id, std::shared_ptr auto_flags_version, ThreadPool* thread_pool, rpc::Rpcs* rpcs, client::YBClient& local_client, - const std::shared_ptr& producer_client, XClusterConsumer* xcluster_consumer, - SchemaVersion last_compatible_consumer_schema_version, int64_t leader_term, - std::function get_leader_term) + const std::shared_ptr& source_client, + XClusterConsumer* xcluster_consumer, SchemaVersion last_compatible_consumer_schema_version, + int64_t leader_term, std::function get_leader_term) : XClusterAsyncExecutor(thread_pool, local_client.messenger(), rpcs), producer_tablet_info_(producer_tablet_info), consumer_tablet_info_(consumer_tablet_info), @@ -128,7 +129,7 @@ XClusterPoller::XClusterPoller( last_compatible_consumer_schema_version_(last_compatible_consumer_schema_version), get_leader_term_(std::move(get_leader_term)), local_client_(local_client), - producer_client_(producer_client), + source_client_(source_client), xcluster_consumer_(xcluster_consumer), producer_safe_time_(HybridTime::kInvalid) { DCHECK_NE(GetLeaderTerm(), yb::OpId::kUnknownTerm); @@ -377,7 +378,7 @@ void XClusterPoller::DoPoll() { *handle = rpc::xcluster::CreateGetChangesRpc( CoarseMonoClock::now() + MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms), nullptr, /* RemoteTablet: will get this from 'req' */ - producer_client_->client.get(), &req, + &source_client_->GetYbClient(), &req, [weak_ptr = weak_from_this(), this, handle, rpcs = rpcs_]( const Status& status, cdc::GetChangesResponsePB&& resp) { RpcCallback( diff --git a/src/yb/tserver/xcluster_poller.h b/src/yb/tserver/xcluster_poller.h index c0021d0e2e48..dd9e7b8b00c4 100644 --- a/src/yb/tserver/xcluster_poller.h +++ b/src/yb/tserver/xcluster_poller.h @@ -41,6 +41,10 @@ class CDCServiceProxy; } // namespace cdc +namespace client { +class XClusterRemoteClientHolder; +} // namespace client + namespace tserver { class AutoFlagsCompatibleVersion; @@ -55,9 +59,9 @@ class XClusterPoller : public XClusterAsyncExecutor { const NamespaceId& consumer_namespace_id, std::shared_ptr auto_flags_version, ThreadPool* thread_pool, rpc::Rpcs* rpcs, client::YBClient& local_client, - const std::shared_ptr& producer_client, XClusterConsumer* xcluster_consumer, - SchemaVersion last_compatible_consumer_schema_version, int64_t leader_term, - std::function get_leader_term); + const std::shared_ptr& source_client, + XClusterConsumer* xcluster_consumer, SchemaVersion last_compatible_consumer_schema_version, + int64_t leader_term, std::function get_leader_term); ~XClusterPoller(); void Init(bool use_local_tserver, rocksdb::RateLimiter* rate_limiter); @@ -161,7 +165,7 @@ class XClusterPoller : public XClusterAsyncExecutor { client::YBClient& local_client_; std::shared_ptr output_client_; - std::shared_ptr producer_client_; + std::shared_ptr source_client_; std::shared_ptr ddl_queue_handler_; // Unsafe to use after shutdown.