Skip to content

Commit

Permalink
[#22908] xCluster: Use XClusterRemoteClient across XCluster
Browse files Browse the repository at this point in the history
Summary:
Use `XClusterRemoteClient` in `XClsuterConsumer`. This helps unify the client usage in xCluster, and move all client creation logic out of `XClusterConsumer`.

- Renamed `XClusterRemoteClient` to `XClusterRemoteClientHolder`.
- Moved Client functions `SetupDbScopedUniverseReplication`, `IsSetupUniverseReplicationDone`, and `AddNamespaceToDbScopedUniverseReplication` from `XClusterRemoteClient` to `XClusterClient`.
- Moved `xcluster_util` from `/src/cdc` to `/src/common` so that it can be used in the `client` library.
- Moved `MockXClusterRemoteClientHolder` and `MockXClusterClient` to `xcluster_client_mock`.
- Moved flags `certs_for_cdc_dir` and `cdc_read_rpc_timeout_ms` to `common_flags`.

Fixes #22908
Jira: DB-11816

Test Plan: Jenkins

Reviewers: jhe, xCluster

Reviewed By: jhe

Subscribers: esheng, ycdcxcluster, xCluster, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D35908
  • Loading branch information
hari90 committed Jul 1, 2024
1 parent 37912f1 commit 059b855
Show file tree
Hide file tree
Showing 38 changed files with 446 additions and 381 deletions.
1 change: 0 additions & 1 deletion src/yb/cdc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ ADD_YB_LIBRARY(xcluster_producer_proto
set(CDC_UTIL_SRCS
cdc_util.cc
xcluster_types.cc
xcluster_util.cc
)

ADD_YB_LIBRARY(
Expand Down
10 changes: 1 addition & 9 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ using std::vector;

constexpr uint32_t kUpdateIntervalMs = 15 * 1000;

DEFINE_NON_RUNTIME_int32(cdc_read_rpc_timeout_ms, 30 * 1000,
"Timeout used for CDC read rpc calls. Reads normally occur cross-cluster.");
TAG_FLAG(cdc_read_rpc_timeout_ms, advanced);
DECLARE_int32(cdc_read_rpc_timeout_ms);

DEFINE_NON_RUNTIME_int32(cdc_write_rpc_timeout_ms, 30 * 1000,
"Timeout used for CDC write rpc calls. Writes normally occur intra-cluster.");
Expand All @@ -109,12 +107,6 @@ DEPRECATE_FLAG(int32, cdc_ybclient_reactor_threads, "09_2023");
DEFINE_RUNTIME_int32(cdc_state_checkpoint_update_interval_ms, kUpdateIntervalMs,
"Rate at which CDC state's checkpoint is updated.");

DEFINE_NON_RUNTIME_string(certs_for_cdc_dir, "",
"The parent directory of where all certificates for xCluster producer universes will "
"be stored, for when the producer and consumer clusters use different certificates. "
"Place the certificates for each producer cluster in "
"<certs_for_cdc_dir>/<producer_cluster_id>/*.");

DEFINE_RUNTIME_int32(update_min_cdc_indices_interval_secs, 60,
"How often to read cdc_state table to get the minimum applied index for each tablet "
"across all streams. This information is used to correctly keep log files that "
Expand Down
15 changes: 14 additions & 1 deletion src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ set(CLIENT_LIBS
tserver_proto
tserver_service_proto
pg_auto_analyze_service_proto
gutil
test_echo_service_proto
tserver_util
yb_ql_expr
yb_ash
yb_util
gutil
yrpc
yb_dockv)

Expand All @@ -96,6 +96,19 @@ ADD_YB_LIBRARY(yb_client
SRCS ${CLIENT_SRCS}
DEPS ${CLIENT_LIBS})

set(CLIENT_MOCK_SRCS
xcluster_client_mock.cc
)

set(CLIENT_MOCK_LIBS
yb_client
gmock
gtest)

ADD_YB_LIBRARY(yb_client_mock
SRCS ${CLIENT_MOCK_SRCS}
DEPS ${CLIENT_MOCK_LIBS})

if(NOT APPLE)
# Localize thirdparty symbols using a linker version script. This hides them
# from the client application. The OS X linker does not support the
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2346,7 +2346,7 @@ std::pair<RetryableRequestId, RetryableRequestId> YBClient::NextRequestIdAndMinR
return std::make_pair(id, *requests.running_requests.begin());
}

void YBClient::AddMetaCacheInfo(JsonWriter* writer) {
void YBClient::AddMetaCacheInfo(JsonWriter* writer) const {
data_->meta_cache_->AddAllTabletInfo(writer);
}

Expand Down
16 changes: 13 additions & 3 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class YBClientBuilder {
// This class is thread-safe.
class YBClient {
public:
~YBClient();
virtual ~YBClient();

std::unique_ptr<YBTableCreator> NewTableCreator();

Expand Down Expand Up @@ -1007,7 +1007,7 @@ class YBClient {

std::pair<RetryableRequestId, RetryableRequestId> NextRequestIdAndMinRunningRequestId();

void AddMetaCacheInfo(JsonWriter* writer);
void AddMetaCacheInfo(JsonWriter* writer) const;

void RequestsFinished(const RetryableRequestIdRange& request_id_range);

Expand All @@ -1032,6 +1032,7 @@ class YBClient {
private:
class Data;

friend class MockYBClient;
friend class YBClientBuilder;
friend class YBNoOp;
friend class YBTable;
Expand All @@ -1051,7 +1052,7 @@ class YBClient {
friend class internal::ClientMasterRpcBase;
friend class PlacementInfoTest;
friend class XClusterClient;
friend class XClusterRemoteClient;
friend class XClusterRemoteClientHolder;

FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist);
FRIEND_TEST(ClientTest, TestMasterDown);
Expand Down Expand Up @@ -1095,6 +1096,15 @@ class YBClient {
DISALLOW_COPY_AND_ASSIGN(YBClient);
};

// A mock YBClient that can be used for testing.
// Currently it only allows us to create a MockYBClient object , and does not mock any member
// functions.
class MockYBClient : public YBClient {
public:
MockYBClient() = default;
virtual ~MockYBClient() = default;
};

Result<TableId> GetTableId(YBClient* client, const YBTableName& table_name);

} // namespace client
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ void MetaCache::InvalidateTableCache(const YBTable& table) {
}
}

void MetaCache::AddAllTabletInfo(JsonWriter* writer) {
void MetaCache::AddAllTabletInfo(JsonWriter* writer) const {
SharedLock lock(mutex_);
writer->StartObject();
writer->String("tablets");
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/meta_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {

void InvalidateTableCache(const YBTable& table);

void AddAllTabletInfo(JsonWriter* writer);
void AddAllTabletInfo(JsonWriter* writer) const;

const std::string& LogPrefix() const { return log_prefix_; }

Expand Down Expand Up @@ -736,7 +736,7 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {

YBClient* const client_;

std::shared_timed_mutex mutex_;
mutable std::shared_timed_mutex mutex_;

// Cache of Tablet Server locations: TS UUID -> RemoteTabletServer*.
//
Expand Down
Loading

0 comments on commit 059b855

Please sign in to comment.