Skip to content

Commit

Permalink
[BACKPORT 2.20][#23278] CDCSDK: Handle non-eligible tables cleanup wi…
Browse files Browse the repository at this point in the history
…th drop table while loading CDC stream

Summary:
**Backport description:**
Faced minor merge conflicts as some code is refactored in latest master.

**Original description:**
Original commit: 64e1bf8 / D37053
When a table present under a CDC stream is dropped, it is removed from the CDC stream metadata by a background thread.
Suppose before the background thread could cleanup, there was a master restart or a master leadership change. On either of these scenarios, while loading the CDC streams, we check all tables present in the CDC stream metadata for ineligibility. Table schema is one of the objects that is scanned while checking for ineligibility. To get the table schema, we fetch the `TableInfo` object from master. This step was leading to a master crash as we receive a nullptr while fetching TableInfo since the table has been dropped.
Jira: DB-12205

Test Plan: ./yb_build.sh --cxx-test cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTablesCleanupWhenDropTableCleanupIsDisabled

Reviewers: asrinivasan, stiwary, skumar

Reviewed By: stiwary

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D37091
  • Loading branch information
siddharth2411 committed Aug 14, 2024
1 parent 71fd6b9 commit 862dde4
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 25 deletions.
56 changes: 56 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8689,5 +8689,61 @@ TEST_F(CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsiste
/* use_consistent_snapshot_stream */ true);
}

TEST_F(
CDCSDKYsqlTest,
YB_DISABLE_TEST_IN_TSAN(TestNonEligibleTablesCleanupWhenDropTableCleanupIsDisabled)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) = true;
// Setup cluster.
ASSERT_OK(SetUpWithParams(3, 3, false));
const vector<string> table_list_suffix = {"_1", "_2", "_3"};
const int kNumTables = 3;
vector<YBTableName> table(kNumTables);
int idx = 0;
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(kNumTables);

while (idx < 3) {
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true,
table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));
ASSERT_OK(WriteEnumsRows(
0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName,
kTableName));
idx += 1;
}

auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream());
std::unordered_set<std::string> expected_table_ids = {
table[0].table_id(), table[1].table_id(), table[2].table_id()};
VerifyTablesInStreamMetadata(stream_id, expected_table_ids, "Waiting for stream metadata.");

LOG(INFO) << "Dropping table: " << Format("$0$1", kTableName, table_list_suffix[0]);
DropTable(&test_cluster_, Format("$0$1", kTableName, table_list_suffix[0]).c_str());
// Stream metadata wouldnt be cleaned up since the codepath is disabled via
// 'TEST_cdcsdk_disable_drop_table_cleanup' flag. Therefore all 3 tables are expected to be
// present in stream metadata.
SleepFor(MonoDelta::FromSeconds(3));
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids, "Waiting for stream metadata after drop table.");

// On loading of CDC stream after a master leader restart, presence of non-eligible tables in CDC
// stream will be checked.
auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster());
ASSERT_OK(leader_master->Restart());
LOG(INFO) << "Master Restarted";
SleepFor(MonoDelta::FromSeconds(5));

// Enable bg threads to cleanup CDC stream metadata for dropped tables.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) = false;

// Verify the dropped table has been removed from stream metadata after enabling the cleanup.
expected_table_ids.erase(table[0].table_id());
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids,
"Waiting for GetDBStreamInfo post metadata cleanup after restart.");
}

} // namespace cdc
} // namespace yb
1 change: 1 addition & 0 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option);
DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal);
DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream);
DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream);
DECLARE_bool(TEST_cdcsdk_disable_drop_table_cleanup);

namespace yb {

Expand Down
60 changes: 35 additions & 25 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ DEFINE_RUNTIME_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream, fa
"materialised view etc. in their stream metadata and these tables will be marked for removal "
"by catalog manager background thread.");

DEFINE_test_flag(bool, cdcsdk_disable_drop_table_cleanup, false,
"When enabled, cleanup of dropped tables from CDC streams will be skipped.");

DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_identification_of_non_eligible_tables,
kLocalPersisted,
false,
Expand Down Expand Up @@ -1754,27 +1757,32 @@ void CatalogManager::FindAllNonEligibleTablesInCDCSDKStream(
for (const auto& table_id : table_ids) {
if (!user_table_ids.contains(table_id)) {
auto table_info = GetTableInfoUnlocked(table_id);
Schema schema;
Status status = table_info->GetSchema(&schema);
if (!status.ok()) {
LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name();
// Skip this table for now, it will be revisited for removal on master restart/master leader
// change.
continue;
}
if (table_info) {
Schema schema;
Status status = table_info->GetSchema(&schema);
if (!status.ok()) {
LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name();
// Skip this table for now, it will be revisited for removal on master restart/master
// leader change.
continue;
}

// Re-confirm this table is not meant to be part of a CDC stream.
if (!IsTableEligibleForCDCSDKStream(table_info, schema)) {
LOG(INFO) << "Found a non-eligible table: " << table_info->id()
<< ", for stream: " << stream_id;
LockGuard lock(cdcsdk_non_eligible_table_mutex_);
namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert(
table_info->id());
// Re-confirm this table is not meant to be part of a CDC stream.
if (!IsTableEligibleForCDCSDKStream(table_info, schema)) {
LOG(INFO) << "Found a non-eligible table: " << table_info->id()
<< ", for stream: " << stream_id;
LockGuard lock(cdcsdk_non_eligible_table_mutex_);
namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert(
table_info->id());
} else {
// Ideally we are not expected to enter the else clause.
LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id
<< " that is not present in the eligible list of tables "
"from the namespace for CDC";
}
} else {
// Ideally we are not expected to enter the else clause.
LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id
<< " that is not present in the eligible list of tables "
"from the namespace for CDC";
LOG(INFO) << "Found table " << table_id << " in stream " << stream_id
<< " metadata that is not present in master.";
}
}
}
Expand Down Expand Up @@ -6682,12 +6690,14 @@ void CatalogManager::RunXClusterBgTasks(const LeaderEpoch& epoch) {

// DELETING_METADATA special state is used by CDC, to do CDC streams metadata cleanup from
// cache as well as from the system catalog for the drop table scenario.
std::vector<CDCStreamInfoPtr> cdcsdk_streams;
WARN_NOT_OK(
FindCDCStreamsMarkedForMetadataDeletion(
&cdcsdk_streams, SysCDCStreamEntryPB::DELETING_METADATA),
"Failed CDC Stream Metadata Deletion");
WARN_NOT_OK(CleanUpCDCStreamsMetadata(cdcsdk_streams), "Failed Cleanup CDC Streams Metadata");
if (!FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) {
std::vector<CDCStreamInfoPtr> cdcsdk_streams;
WARN_NOT_OK(
FindCDCStreamsMarkedForMetadataDeletion(
&cdcsdk_streams, SysCDCStreamEntryPB::DELETING_METADATA),
"Failed CDC Stream Metadata Deletion");
WARN_NOT_OK(CleanUpCDCStreamsMetadata(cdcsdk_streams), "Failed Cleanup CDC Streams Metadata");
}

// Restart xCluster and CDCSDK parent tablet deletion bg task.
StartCDCParentTabletDeletionTaskIfStopped();
Expand Down

0 comments on commit 862dde4

Please sign in to comment.