From 862dde42f147eec23a964f8443e52a70356df340 Mon Sep 17 00:00:00 2001 From: Siddharth Shah Date: Tue, 6 Aug 2024 16:13:50 +0530 Subject: [PATCH] [BACKPORT 2.20][#23278] CDCSDK: Handle non-eligible tables cleanup with drop table while loading CDC stream Summary: **Backport description:** Faced minor merge conflicts as some code is refactored in latest master. **Original description:** Original commit: 64e1bf8eef10fcf7ee1d33cb67ea9758a56f1346 / D37053 When a table present under a CDC stream is dropped, it is removed from the CDC stream metadata by a background thread. Suppose before the background thread could cleanup, there was a master restart or a master leadership change. On either of these scenarios, while loading the CDC streams, we check all tables present in the CDC stream metadata for ineligibility. Table schema is one of the objects that is scanned while checking for ineligibility. To get the table schema, we fetch the `TableInfo` object from master. This step was leading to a master crash as we receive a nullptr while fetching TableInfo since the table has been dropped. Jira: DB-12205 Test Plan: ./yb_build.sh --cxx-test cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTablesCleanupWhenDropTableCleanupIsDisabled Reviewers: asrinivasan, stiwary, skumar Reviewed By: stiwary Subscribers: ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D37091 --- src/yb/integration-tests/cdcsdk_ysql-test.cc | 56 +++++++++++++++++ .../integration-tests/cdcsdk_ysql_test_base.h | 1 + src/yb/master/xrepl_catalog_manager.cc | 60 +++++++++++-------- 3 files changed, 92 insertions(+), 25 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 4aa6680b78a6..61f00a1e4144 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -8689,5 +8689,61 @@ TEST_F(CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsiste /* use_consistent_snapshot_stream */ true); } +TEST_F( + CDCSDKYsqlTest, + YB_DISABLE_TEST_IN_TSAN(TestNonEligibleTablesCleanupWhenDropTableCleanupIsDisabled)) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + const vector table_list_suffix = {"_1", "_2", "_3"}; + const int kNumTables = 3; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + while (idx < 3) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + idx += 1; + } + + auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream()); + std::unordered_set expected_table_ids = { + table[0].table_id(), table[1].table_id(), table[2].table_id()}; + VerifyTablesInStreamMetadata(stream_id, expected_table_ids, "Waiting for stream metadata."); + + LOG(INFO) << "Dropping table: " << Format("$0$1", kTableName, table_list_suffix[0]); + DropTable(&test_cluster_, Format("$0$1", kTableName, table_list_suffix[0]).c_str()); + // Stream metadata wouldnt be cleaned up since the codepath is disabled via + // 'TEST_cdcsdk_disable_drop_table_cleanup' flag. Therefore all 3 tables are expected to be + // present in stream metadata. + SleepFor(MonoDelta::FromSeconds(3)); + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids, "Waiting for stream metadata after drop table."); + + // On loading of CDC stream after a master leader restart, presence of non-eligible tables in CDC + // stream will be checked. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + SleepFor(MonoDelta::FromSeconds(5)); + + // Enable bg threads to cleanup CDC stream metadata for dropped tables. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) = false; + + // Verify the dropped table has been removed from stream metadata after enabling the cleanup. + expected_table_ids.erase(table[0].table_id()); + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids, + "Waiting for GetDBStreamInfo post metadata cleanup after restart."); +} + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 1ff1646cf6b7..8e28842c9945 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -140,6 +140,7 @@ DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal); DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream); DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream); +DECLARE_bool(TEST_cdcsdk_disable_drop_table_cleanup); namespace yb { diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index a360ac26e349..bec6fc30ad49 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -158,6 +158,9 @@ DEFINE_RUNTIME_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream, fa "materialised view etc. in their stream metadata and these tables will be marked for removal " "by catalog manager background thread."); +DEFINE_test_flag(bool, cdcsdk_disable_drop_table_cleanup, false, + "When enabled, cleanup of dropped tables from CDC streams will be skipped."); + DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_identification_of_non_eligible_tables, kLocalPersisted, false, @@ -1754,27 +1757,32 @@ void CatalogManager::FindAllNonEligibleTablesInCDCSDKStream( for (const auto& table_id : table_ids) { if (!user_table_ids.contains(table_id)) { auto table_info = GetTableInfoUnlocked(table_id); - Schema schema; - Status status = table_info->GetSchema(&schema); - if (!status.ok()) { - LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name(); - // Skip this table for now, it will be revisited for removal on master restart/master leader - // change. - continue; - } + if (table_info) { + Schema schema; + Status status = table_info->GetSchema(&schema); + if (!status.ok()) { + LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name(); + // Skip this table for now, it will be revisited for removal on master restart/master + // leader change. + continue; + } - // Re-confirm this table is not meant to be part of a CDC stream. - if (!IsTableEligibleForCDCSDKStream(table_info, schema)) { - LOG(INFO) << "Found a non-eligible table: " << table_info->id() - << ", for stream: " << stream_id; - LockGuard lock(cdcsdk_non_eligible_table_mutex_); - namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert( - table_info->id()); + // Re-confirm this table is not meant to be part of a CDC stream. + if (!IsTableEligibleForCDCSDKStream(table_info, schema)) { + LOG(INFO) << "Found a non-eligible table: " << table_info->id() + << ", for stream: " << stream_id; + LockGuard lock(cdcsdk_non_eligible_table_mutex_); + namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert( + table_info->id()); + } else { + // Ideally we are not expected to enter the else clause. + LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id + << " that is not present in the eligible list of tables " + "from the namespace for CDC"; + } } else { - // Ideally we are not expected to enter the else clause. - LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id - << " that is not present in the eligible list of tables " - "from the namespace for CDC"; + LOG(INFO) << "Found table " << table_id << " in stream " << stream_id + << " metadata that is not present in master."; } } } @@ -6682,12 +6690,14 @@ void CatalogManager::RunXClusterBgTasks(const LeaderEpoch& epoch) { // DELETING_METADATA special state is used by CDC, to do CDC streams metadata cleanup from // cache as well as from the system catalog for the drop table scenario. - std::vector cdcsdk_streams; - WARN_NOT_OK( - FindCDCStreamsMarkedForMetadataDeletion( - &cdcsdk_streams, SysCDCStreamEntryPB::DELETING_METADATA), - "Failed CDC Stream Metadata Deletion"); - WARN_NOT_OK(CleanUpCDCStreamsMetadata(cdcsdk_streams), "Failed Cleanup CDC Streams Metadata"); + if (!FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) { + std::vector cdcsdk_streams; + WARN_NOT_OK( + FindCDCStreamsMarkedForMetadataDeletion( + &cdcsdk_streams, SysCDCStreamEntryPB::DELETING_METADATA), + "Failed CDC Stream Metadata Deletion"); + WARN_NOT_OK(CleanUpCDCStreamsMetadata(cdcsdk_streams), "Failed Cleanup CDC Streams Metadata"); + } // Restart xCluster and CDCSDK parent tablet deletion bg task. StartCDCParentTabletDeletionTaskIfStopped();