Skip to content

Commit

Permalink
[BACKPORT 2024.1.1][#23260] CDCSDK: Propagate stream expiry & GC erro…
Browse files Browse the repository at this point in the history
…r to walsender

Summary:
**Backport Description:**
Minor conflict due to missing test

**Original Description:**
Original commit: 5ed864d / D36756
GetConsistentChanges RPC, at the moment, does not propagate any error to the walsender. But, with this diff, we will now propagate the following errors to the walsender as we cannot recover from these errors and the only solution is to create a new CDC stream.

  - Stream has expired
  - Intents have been GC'ed

The consequence of sending these errors to walsender is it will lead to a crash in walsender, thereby, propagating the error further to the actual CDC client (eg: YugabyteDB connector). Such errors are not to be retried by CDC clients as these are non-recoverable errors as mentioned above.
Jira: DB-12188

Test Plan:
Jenkins: urgent, test regex: .*CDC.*

./yb_build.sh --cxx-test cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestIntentGC

./yb_build.sh --cxx-test cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestStreamExpiry

Reviewers: asrinivasan, skumar, stiwary

Reviewed By: asrinivasan

Subscribers: ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36758
  • Loading branch information
siddharth2411 committed Jul 25, 2024
1 parent 9c1af4b commit 2f211c3
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 6 deletions.
38 changes: 34 additions & 4 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ DEFINE_test_flag(bool, cdc_force_destroy_virtual_wal_failure, false,
DEFINE_RUNTIME_PREVIEW_bool(enable_cdcsdk_setting_get_changes_response_byte_limit, false,
"When enabled, we'll consider the proto field getchanges_resp_max_size_bytes in "
"GetChangesRequestPB to limit the size of GetChanges response.");
DEFINE_test_flag(bool, cdcsdk_skip_stream_active_check, false,
"When enabled, GetChanges will skip checking if stream is active as well as skip "
"updating the active time.");

DECLARE_bool(enable_log_retention_by_op_idx);

Expand Down Expand Up @@ -1613,10 +1616,13 @@ void CDCServiceImpl::GetChanges(
RPC_STATUS_RETURN_ERROR(
CheckTabletNotOfInterest(producer_tablet), resp->mutable_error(),
CDCErrorPB::INTERNAL_ERROR, context);
RPC_STATUS_RETURN_ERROR(
CheckStreamActive(producer_tablet), resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR,
context);
impl_->UpdateActiveTime(producer_tablet);

if (!FLAGS_TEST_cdcsdk_skip_stream_active_check) {
RPC_STATUS_RETURN_ERROR(
CheckStreamActive(producer_tablet), resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR,
context);
impl_->UpdateActiveTime(producer_tablet);
}

if (IsCDCSDKSnapshotDone(*req)) {
// Remove 'kCDCSDKSnapshotKey' from the colocated snapshot row, to indicate that the snapshot
Expand Down Expand Up @@ -4493,6 +4499,25 @@ Result<tablet::TabletPeerPtr> CDCServiceImpl::GetServingTablet(const TabletId& t
return context_->GetServingTablet(tablet_id);
}

bool IsStreamInactiveError(Status status) {
if (!status.ok() && status.IsInternalError() &&
status.message().ToBuffer().find("expired for Tablet") != std::string::npos) {
return true;
}

return false;
}

bool IsIntentGCError(Status status) {
if (!status.ok() && status.IsInternalError() &&
status.message().ToBuffer().find("CDCSDK Trying to fetch already GCed intents") !=
std::string::npos) {
return true;
}

return false;
}

void CDCServiceImpl::InitVirtualWALForCDC(
const InitVirtualWALForCDCRequestPB* req, InitVirtualWALForCDCResponsePB* resp,
rpc::RpcContext context) {
Expand Down Expand Up @@ -4600,6 +4625,11 @@ void CDCServiceImpl::GetConsistentChanges(
Format("GetConsistentChanges failed for stream_id: $0 with error: $1", stream_id, s);
if (!s.IsTryAgain()) {
LOG(WARNING) << msg;
// Propogate the error to the client only when the stream has expired or the intents have been
// garbage collected.
if (IsStreamInactiveError(s) || IsIntentGCError(s)) {
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
}
} else {
YB_LOG_EVERY_N_SECS(WARNING, 300) << msg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3378,5 +3378,100 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestFailureCreatingStreamsOfDiffe
kNamespaceName_2));
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestStreamExpiry) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_vwal_getchanges_resp_max_size_bytes) = 100_KB;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_intent_retention_ms) = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_max_consistent_records) = 100;
ASSERT_OK(SetUpWithParams(3, 1, false, true));
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
ASSERT_OK(conn.Execute(
"CREATE TABLE test1 (id int PRIMARY KEY, value_1 int) SPLIT INTO 3 TABLETS"));
auto table = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, "test1"));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 3);
xrepl::StreamId stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

auto conn1 = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
int num_batches = 10;
int inserts_per_batch = 100;
for (int i = 0; i < num_batches; i++) {
ASSERT_OK(conn1.Execute("BEGIN"));
for (int j = i * inserts_per_batch; j < ((i + 1) * inserts_per_batch); j++) {
ASSERT_OK(conn1.ExecuteFormat("INSERT INTO test1 VALUES ($0, $1)", j, j + 1));
}
ASSERT_OK(conn1.Execute("COMMIT"));
}

int expected_dml_records = num_batches * inserts_per_batch;
auto vwal1_result = GetAllPendingTxnsFromVirtualWAL(
stream_id, {table.table_id()}, expected_dml_records, true /* init_virtual_wal */);

ASSERT_NOK(vwal1_result);
ASSERT_TRUE(vwal1_result.status().IsInternalError());
ASSERT_STR_CONTAINS(vwal1_result.status().message().ToBuffer(), "expired for Tablet");

// A new VWAL on the same stream should again receive the stream expired error.
auto vwal2_result = GetAllPendingTxnsFromVirtualWAL(
stream_id, {table.table_id()}, expected_dml_records, true /* init_virtual_wal */,
kVWALSessionId2);

ASSERT_NOK(vwal2_result);
ASSERT_TRUE(vwal2_result.status().IsInternalError());
ASSERT_STR_CONTAINS(vwal2_result.status().message().ToBuffer(), "expired for Tablet");
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestIntentGC) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_vwal_getchanges_resp_max_size_bytes) = 1_KB;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_intent_retention_ms) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_stream_active_check) = true;
ASSERT_OK(SetUpWithParams(1, 1, false, true));
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
ASSERT_OK(
conn.Execute("CREATE TABLE test1 (id int PRIMARY KEY, value_1 int) SPLIT INTO 2 TABLETS"));
auto table = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, "test1"));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 2);
xrepl::StreamId stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

auto conn1 = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
int num_batches = 50;
int inserts_per_batch = 2;
for (int i = 0; i < num_batches; i++) {
ASSERT_OK(conn1.Execute("BEGIN"));
for (int j = i * inserts_per_batch; j < ((i + 1) * inserts_per_batch); j++) {
ASSERT_OK(conn1.ExecuteFormat("INSERT INTO test1 VALUES ($0, $1)", j, j + 1));
}
ASSERT_OK(conn1.Execute("COMMIT"));
}

// Sleep for UpdatePeersAndMetrics to move retention barriers that will lead to garbage collection
// of intents.
SleepFor(MonoDelta::FromSeconds(30 * kTimeMultiplier));
ASSERT_OK(InitVirtualWAL(stream_id, {table.table_id()}));
bool received_gc_error = false;
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
auto vwal1_result = GetConsistentChangesFromCDC(stream_id);

if (!vwal1_result.ok()) {
if (vwal1_result.status().IsInternalError() &&
vwal1_result.status().message().ToBuffer().find(
"CDCSDK Trying to fetch already GCed intents") != std::string::npos) {
received_gc_error = true;
return true;
}
}

return false;
},
MonoDelta::FromSeconds(60), "Did not see Intents GC error"));

ASSERT_TRUE(received_gc_error);
}

} // namespace cdc
} // namespace yb
7 changes: 6 additions & 1 deletion src/yb/integration-tests/cdcsdk_ysql_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,11 @@ Result<string> CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) {
status = StatusFromPB(change_resp.error().status());
if (status.IsNotFound() || status.IsInvalidArgument()) {
RETURN_NOT_OK(status);
} else if (status.IsInternalError()) {
auto err_msg = status.message().ToBuffer();
if ((err_msg.find("expired for Tablet") ||
err_msg.find("CDCSDK Trying to fetch already GCed intents")))
RETURN_NOT_OK(status);
}
}

Expand Down Expand Up @@ -1981,7 +1986,7 @@ Result<string> CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) {
if (init_virtual_wal) {
Status s = InitVirtualWAL(stream_id, table_ids, session_id);
if (!s.ok()) {
LOG(ERROR) << "Error while trying to initialize virtual WAL";
LOG(ERROR) << "Error while trying to initialize virtual WAL: " << s;
RETURN_NOT_OK(s);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/cdcsdk_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option);
DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal);
DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream);
DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream);

DECLARE_bool(TEST_cdcsdk_skip_stream_active_check);
namespace yb {

using client::YBClient;
Expand Down

0 comments on commit 2f211c3

Please sign in to comment.