Skip to content

Commit

Permalink
[BACKPORT 2024.1][#22996] xCluster: Add SOURCE_UNREACHABLE and SYSTEM…
Browse files Browse the repository at this point in the history
…_ERROR enums

Summary:
Original commit: 2f267ca / D36097
Prior to this change the Poller only reports certain types of errors. This change adds two new enums `SOURCE_UNREACHABLE`, and `SYSTEM_ERROR`. `SOURCE_UNREACHABLE` is reported if the Poller (xCluster Target) cannot communicate with the Producer (xCluster Source). `SYSTEM_ERROR` is reported if the Poller encounters an other error.

**Upgrade/Rollback safety:**
New enum values are in-memory only. They are sent from tserver to master. Since master is always upgraded before the tserver and rolled back after the tserver rollback this is safe to upgrade and rollback.

Fixes #22996
Jira: DB-11919

Test Plan:
XClusterTest, NetworkReplicationError
XClusterTest, ApplyReplicationError

Reviewers: jhe, slingam, xCluster

Reviewed By: slingam

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36183
  • Loading branch information
hari90 committed Jun 26, 2024
1 parent 410f110 commit 1cfb4f6
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 3 deletions.
6 changes: 6 additions & 0 deletions src/yb/common/common_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ enum ReplicationErrorPb {
// The AutoFlags config has changed and the new version is not compatible, or
// has not yet been validated.
REPLICATION_AUTO_FLAG_CONFIG_VERSION_MISMATCH = 6;

// Error connecting to the source universe.
REPLICATION_SOURCE_UNREACHABLE = 7;

// There was a generic system error.
REPLICATION_SYSTEM_ERROR = 8;
}

// Stateful services.
Expand Down
55 changes: 55 additions & 0 deletions src/yb/integration-tests/xcluster/xcluster-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3883,6 +3883,61 @@ TEST_F_EX(XClusterTest, ReplicationErrorAfterMasterFailover, XClusterTestNoParam
ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt));
}

// Make sure we get SOURCE_UNREACHABLE when the source universe is not reachable.
TEST_F_EX(XClusterTest, NetworkReplicationError, XClusterTestNoParam) {
// Send metrics report including the xCluster status in every heartbeat.
const auto short_metric_report_interval_ms = 250;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_heartbeat_interval_ms) = short_metric_report_interval_ms * 2;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_tserver_heartbeat_metrics_interval_ms) =
short_metric_report_interval_ms;

ASSERT_OK(SetUpWithParams(
{1}, {1}, /* replication_factor */ 1, /* num_masters */ 1, /* num_tservers */ 1));
ASSERT_OK(SetupReplication());
ASSERT_OK(CorrectlyPollingAllTablets(1));
const auto stream_id = ASSERT_RESULT(GetCDCStreamID(producer_table_->id()));

ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt));

ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_read_rpc_timeout_ms) = 5 * 1000;

producer_cluster()->mini_tablet_server(0)->Shutdown();

ASSERT_OK(VerifyReplicationError(
consumer_table_->id(), stream_id, ReplicationErrorPb::REPLICATION_SOURCE_UNREACHABLE,
kRpcTimeout));

ASSERT_OK(producer_cluster()->mini_tablet_server(0)->Start());

ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt));
}

// Make sure we get SYSTEM_ERROR when apply fails.
TEST_F_EX(XClusterTest, ApplyReplicationError, XClusterTestNoParam) {
// Send metrics report including the xCluster status in every heartbeat.
const auto short_metric_report_interval_ms = 250;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_heartbeat_interval_ms) = short_metric_report_interval_ms * 2;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_tserver_heartbeat_metrics_interval_ms) =
short_metric_report_interval_ms;

ASSERT_OK(SetUpWithParams(
{1}, {1}, /* replication_factor */ 1, /* num_masters */ 1, /* num_tservers */ 1));
ASSERT_OK(SetupReplication());
ASSERT_OK(CorrectlyPollingAllTablets(1));
const auto stream_id = ASSERT_RESULT(GetCDCStreamID(producer_table_->id()));

ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt));

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_simulate_random_failure_after_apply) = 1;

ASSERT_OK(VerifyReplicationError(
consumer_table_->id(), stream_id, ReplicationErrorPb::REPLICATION_SYSTEM_ERROR));

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_simulate_random_failure_after_apply) = 0;

ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt));
}

// Test deleting inbound replication group without performing source stream cleanup.
TEST_F_EX(XClusterTest, DeleteWithoutStreamCleanup, XClusterTestNoParam) {
constexpr int kNTabletsPerTable = 1;
Expand Down
4 changes: 2 additions & 2 deletions src/yb/integration-tests/xcluster/xcluster_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ Status XClusterTestBase::WaitForReplicationDrain(

Status XClusterTestBase::VerifyReplicationError(
const std::string& consumer_table_id, const xrepl::StreamId& stream_id,
const std::optional<ReplicationErrorPb> expected_replication_error) {
const std::optional<ReplicationErrorPb> expected_replication_error, int timeout_secs) {
// 1. Verify that the RPC contains the expected error.
master::GetReplicationStatusRequestPB req;
master::GetReplicationStatusResponsePB resp;
Expand Down Expand Up @@ -858,7 +858,7 @@ Status XClusterTestBase::VerifyReplicationError(
return resp.statuses()[0].errors_size() == 0;
}
},
MonoDelta::FromSeconds(30), "Waiting for replication error"));
MonoDelta::FromSeconds(timeout_secs), "Waiting for replication error"));

// 2. Verify that the yb-admin output contains the expected error.
auto admin_out =
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/xcluster/xcluster_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class XClusterTestBase : public YBTest {

Status VerifyReplicationError(
const std::string& consumer_table_id, const xrepl::StreamId& stream_id,
const std::optional<ReplicationErrorPb> expected_replication_error);
const std::optional<ReplicationErrorPb> expected_replication_error, int timeout_secs = 30);

Result<xrepl::StreamId> GetCDCStreamID(const TableId& producer_table_id);

Expand Down
22 changes: 22 additions & 0 deletions src/yb/tserver/xcluster_poller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ void XClusterPoller::HandleGetChangesResponse(
if (!status.ok()) {
LOG_WITH_PREFIX(WARNING) << "XClusterPoller GetChanges failure: " << status.ToString();

if (status.IsTimedOut() || status.IsNetworkError()) {
StoreReplicationError(ReplicationErrorPb::REPLICATION_SOURCE_UNREACHABLE);
} else {
StoreNOKReplicationError();
}

if (FLAGS_enable_xcluster_stat_collection) {
poll_stats_history_.SetError(std::move(status));
}
Expand Down Expand Up @@ -479,6 +485,8 @@ void XClusterPoller::VerifyApplyChangesResponse(XClusterOutputClientResponse res
RandomActWithProbability(FLAGS_TEST_xcluster_simulate_random_failure_after_apply)) {
LOG_WITH_PREFIX(WARNING) << "ApplyChanges failure: " << response.status;

StoreNOKReplicationError();

if (FLAGS_enable_xcluster_stat_collection) {
poll_stats_history_.SetError(std::move(response.status));
}
Expand All @@ -502,6 +510,9 @@ void XClusterPoller::HandleApplyChangesResponse(XClusterOutputClientResponse res
if (!s.ok()) {
// If processing ddl_queue table fails, then retry just this part (don't repeat ApplyChanges).
YB_LOG_EVERY_N(WARNING, 30) << "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG;

StoreNOKReplicationError();

if (FLAGS_enable_xcluster_stat_collection) {
poll_stats_history_.SetError(std::move(s));
}
Expand Down Expand Up @@ -659,6 +670,17 @@ void XClusterPoller::StoreReplicationError(ReplicationErrorPb error) {
}
}

void XClusterPoller::StoreNOKReplicationError() {
{
std::lock_guard l(replication_error_mutex_);
if (previous_replication_error_ != ReplicationErrorPb::REPLICATION_OK) {
return;
}
}

StoreReplicationError(ReplicationErrorPb::REPLICATION_SYSTEM_ERROR);
}

void XClusterPoller::ClearReplicationError() {
StoreReplicationError(ReplicationErrorPb::REPLICATION_OK);
}
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tserver/xcluster_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class XClusterPoller : public XClusterAsyncExecutor {
void MarkFailed(const std::string& reason, const Status& status = Status::OK()) override;
// Stores a replication error and detail. This overwrites a previously stored 'error'.
void StoreReplicationError(ReplicationErrorPb error) EXCLUDES(replication_error_mutex_);
// Stores a non OK replication error if one has not already been set.
void StoreNOKReplicationError() EXCLUDES(replication_error_mutex_);
void ClearReplicationError() EXCLUDES(replication_error_mutex_);
void TEST_IncrementNumSuccessfulWriteRpcs();
void ApplyChangesCallback(XClusterOutputClientResponse&& response);
Expand Down

0 comments on commit 1cfb4f6

Please sign in to comment.