From 7605b1a996c5e32c9e92f089ce6b68d6cc3f4c95 Mon Sep 17 00:00:00 2001 From: Siddharth Shah Date: Wed, 26 Jun 2024 17:44:09 +0530 Subject: [PATCH] [BACKPORT 2024.1][#22876][#22835][#22773] CDCSDK: Remove non-eligible tables for CDC from existing CDCSDK stream Summary: **Backport Description:** No merge conflicts. **Original Description:** Original commit: 4e9a81c58461c6e4dd16e64d307f40b2f1f19b29 / D36031 Some non-eligible tables like indexes etc. created after creation of a CDC stream were getting added to the CDC stream due to [[ https://phorge.dev.yugabyte.com/D35856 | this missing logic in addition of dynamic tables codepath ]]. We do not hold retention barriers on tablets of such tables until and unless they split, in which case, we start holding retention barriers on the children tablets. This leads to heavy resource usage over time and since the CDC stream never polls on tables of such tables, retention barriers are not lifted until the active time of these tablets exceed `cdc_intent_retention_ms`. Therefore, to prevent resource consumption from such tables, we want to achieve the following: 1. Remove these non-eligible tables from the stream metadata so that any further tablet splitting on these tables do not lead to addition of children tablets in cdc state table. 2. Release retention barriers on the existing tablets that are part of the cdc state table and finally remove these state table entries. We have followed the same pattern of achieving the above tasks via a background thread, exactly similar to addition of dynamic table in CDC streams. **Working:** - //FindAllNonUserTablesInCDCSDKStream// - On a master restart/leadership change, while loading CDCSDK streams into memory, we will compute the set difference between tables present in stream metadata and tables in the namespace that are eligible for a CDC stream. This set difference will give us the set of non-eligible tables that were not supposed to get added to the CDC stream, but got added because of the above mentioned bug. These non-eligible tables will be added to `namespace_to_cdcsdk_non_user_table_map_` which is further processed in catalog manager background thread by //FindCDCSDKStreamsForNonUserTables//. The bg thread of catalog manager (CatalogManagerBgTasks), with the following methods handles the actual table removal: - //FindCDCSDKStreamsForNonUserTables//: This method is run in every subsequent iteration of the bg thread of catalog manager. It scans the cdc_stream_map_ and finds all streams in ACTIVE/DELETING METADATA state which have the non-eligible table entry in stream metadata, and collects the details to be further processed by //RemoveNonUserTablesForCDCSDKStreams//. - //RemoveNonUserTablesForCDCSDKStreams//: This method is run after FindCDCSDKStreamsForNonUserTables and does the following for each stream that contains the non-eligible table entry: 1. Update the checkpoint of cdc state entries related to non-eligible table to OpId max. Incase of colocated tables, entries with a colocated_table_id will be deleted. 2. Removes the table from stream metadata and cdcsdk_tables_to_stream_map_. Once the table is removed from all relevant CDC streams, then we remove the table entry from `namespace_to_cdcsdk_non_user_table_map_`. Note: 1. To enable this cleanup of non-eligible tables, user has to set the master flag `enable_cleanup_of_non_eligible_tables_from_cdcsdk_stream`. 2. In single iteration of the bg thread, we only process two non-eligible tables across all namespaces. This processing limit is configurable and we are reusing the existing flag `cdcsdk_table_processing_limit_per_run`. Additionally, in the tablet split codepath, before adding cdc state entries for children tables, we will now check if the table is a non-eligible table for CDC stream or not. This also helps in preventing a race condition when a tablet of a non-eligible is split and concurrently, there was a master restart/leadership changes and we are trying to remove the table from stream metadata. Jira: DB-11778, DB-11733, DB-11676 Test Plan: Jenkins: urgent ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTableRemovalFromNonConsistentSnapshotCDCStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTableRemovalFromConsistentSnapshotCDCStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestChildTabletsOfNonEligibleTableDoNotGetAddedToNonConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsistentSnapshotStream Reviewers: asrinivasan, stiwary, skumar, xCluster, hsunder Reviewed By: asrinivasan Subscribers: ycdcxcluster Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D36172 --- src/yb/integration-tests/cdcsdk_ysql-test.cc | 334 ++++++++++++++++- .../cdcsdk_ysql_test_base.cc | 2 +- .../integration-tests/cdcsdk_ysql_test_base.h | 9 +- src/yb/master/catalog_manager.h | 32 +- src/yb/master/catalog_manager_bg_tasks.cc | 26 ++ src/yb/master/xrepl_catalog_manager.cc | 337 ++++++++++++++++-- src/yb/tools/yb-admin_client.cc | 2 +- 7 files changed, 695 insertions(+), 47 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 6d67c097f3fe..4c680f5956d5 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -8522,7 +8522,7 @@ TEST_F(CDCSDKYsqlTest, TestGetChangesResponseSize) { ASSERT_TRUE(seen_resp_greater_than_limit); } -void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( +void CDCSDKYsqlTest::TestNonEligibleTableShouldNotGetAddedToCDCStream( bool create_consistent_snapshot_stream) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = create_consistent_snapshot_stream; @@ -8544,7 +8544,7 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); ASSERT_EQ(table1_tablets.size(), 3); - // Create non-user tables like index, mat views BEFORE the stream has been created + // Create non-eligible tables like index, mat views BEFORE the stream has been created ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx1 ON $0(a ASC)", tableName1)); ASSERT_OK( conn.ExecuteFormat("CREATE MATERIALIZED VIEW $0_mv1 AS SELECT COUNT(*) FROM $0", tableName1)); @@ -8554,7 +8554,7 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( ? ASSERT_RESULT(CreateConsistentSnapshotStream()) : ASSERT_RESULT(CreateDBStream()); - // // Create non-user tables AFTER the stream has been created + // // Create non-eligible tables AFTER the stream has been created ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx2 ON $0(b ASC)", tableName1)); ASSERT_OK( conn.ExecuteFormat("CREATE MATERIALIZED VIEW $0_mv2 AS SELECT COUNT(*) FROM $0", tableName1)); @@ -8614,12 +8614,12 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( ASSERT_EQ(expected_tablets, actual_tablets); } -TEST_F(CDCSDKYsqlTest, TestNonUserTableShouldNotGetAddedToNonConsistentSnapshotCDCStream) { - TestNonUserTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ false); +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableShouldNotGetAddedToNonConsistentSnapshotCDCStream) { + TestNonEligibleTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ false); } -TEST_F(CDCSDKYsqlTest, TestNonUserTableShouldNotGetAddedToConsistentSnapshotCDCStream) { - TestNonUserTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ true); +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableShouldNotGetAddedToConsistentSnapshotCDCStream) { + TestNonEligibleTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ true); } TEST_F(CDCSDKYsqlTest, TestTablesWithEnumArrayColumnShouldNotGetAddedToStream) { @@ -8991,5 +8991,325 @@ TEST_F( /* use_consistent_snapshot_stream */ true); } +// This test performs the following: +// 1. Create a table t1 +// 2. Create a CDC stream +// 3. Create an index i1 on t1 - since test flag to add index is enabled, i1 should get added to CDC +// stream. +// 4. Confirm t1 & i1 are part of CDC stream metadata and cdc state table. +// 5. Restart master -> i1 will be marked for removal and bg thread will actually remove it from CDC +// stream metadata and update the checkpoint for state entries to max. +// 6. Verify i1 no longer exists in stream metadata and state entries have been deleted. +// 7. Create a table t2 +// 8. Verify it gets added to stream metadata and cdc state table. +void CDCSDKYsqlTest::TestNonEligibleTableRemovalFromCDCStream(bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_catalog_manager_bg_task_wait_ms) = 100; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_yb_enable_replica_identity) = false; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams( + 1, 1, false /* colocated */, false /* cdc_populate_safepoint_record */, + true /* set_pgsql_proxy_bind_address */)); + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + const auto tableName1 = "test_table_1"; + const auto tableName2 = "test_table_2"; + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName1)); + auto table1 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName1)); + google::protobuf::RepeatedPtrField table1_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table1_tablets.size(), 3); + + xrepl::StreamId stream_id1 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + xrepl::StreamId stream_id2 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + const vector index_list_suffix = {"_0", "_1", "_2", "_3"}; + const int kNumIndexes = 4; + vector indexes(kNumIndexes); + int i = 0; + vector> idx_tablets(kNumIndexes); + + while (i < kNumIndexes) { + // Create an index AFTER the stream has been created + ASSERT_OK( + conn.ExecuteFormat("CREATE INDEX $0_idx$1 ON $0(b ASC)", tableName1, index_list_suffix[i])); + indexes[i] = ASSERT_RESULT(GetTable( + &test_cluster_, kNamespaceName, Format("$0_idx$1", tableName1, index_list_suffix[i]))); + // Wait for the bg thread to complete finding out new tables added in the namespace and add + // them to CDC stream if relevant. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + ASSERT_OK(test_client()->GetTablets( + indexes[i], 0, &idx_tablets[i], /* partition_list_version=*/nullptr)); + ASSERT_EQ(idx_tablets[i].size(), 1); + i++; + } + + // Verify CDC stream metadata contains both table1 and the index table. + std::unordered_set expected_tables = {table1.table_id()}; + for (const auto& idx : indexes) { + expected_tables.insert(idx.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating an index after stream creation"); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after creating an index after stream creation"); + + // Verify cdc state table contains entries from both table1 & index table. + std::unordered_set expected_tablets; + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + for (const auto& tablets : idx_tablets) { + for (const auto& tablet : tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream contains the user table as well as indexes"; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = false; + // Non-eligible tables like the index will be removed from stream on a master restart. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + + // wait for the bg thread to remove the index from stream metadata and update the checkpoint for + // corresponding state table entries to max. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + // Stream metadata should no longer contain the index. + expected_tables.clear(); + expected_tables.insert(table1.table_id()); + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after non-user table removal from CDC stream."); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after non-user table removal from CDC stream."); + + // Since checkpoint will be set to max for index's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream, after master restart, only contains the user table."; + + // Create a dynamic table and create non eligible tables on this dynamic table. + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName2)); + auto table2 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName2)); + google::protobuf::RepeatedPtrField table2_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table2, 0, &table2_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table2_tablets.size(), 3); + + expected_tables.insert(table2.table_id()); + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating a new user table post master restart."); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after creating a new user table post master restart."); + + for (const auto& tablet : table2_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream contains both the user tables."; +} + +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableRemovalFromNonConsistentSnapshotCDCStream) { + TestNonEligibleTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableRemovalFromConsistentSnapshotCDCStream) { + TestNonEligibleTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ true); +} + +// This test performs the following: +// 1. Create a table t1 +// 2. Create a CDC stream +// 3. Create an index i1 on t1 - since test flag to add index is enabled, i1 should get added to CDC +// stream. +// 4. Confirm t1 & i1 are part of CDC stream metadata and cdc state table. +// 5. Split one tablet each of index i1 and table t1. +// 6. Verify none of the children tablets of i1 are added to cdc state table. +// 7. Verify both children tablets of table t1 have been added to cdc state table. +void CDCSDKYsqlTest::TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_cdcsdk_streamed_tables) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cleanup_split_tablets_interval_sec) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams( + 1, 1, false /* colocated */, false /* cdc_populate_safepoint_record */, + true /* set_pgsql_proxy_bind_address */)); + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + const auto tableName1 = "test_table_1"; + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName1)); + auto table1 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName1)); + google::protobuf::RepeatedPtrField table1_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table1_tablets.size(), 3); + + int num_inserts = 10; + for (int i = 0; i < num_inserts; i++) { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1, $2)", tableName1, i, i + 1)); + } + + xrepl::StreamId stream_id1 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Create an index AFTER the stream has been created + ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx1 ON $0(b ASC)", tableName1)); + auto idx1 = + ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, Format("$0_idx1", tableName1))); + // Wait for the bg thread to complete finding out new tables added in the namespace and add + // them to CDC stream if relevant. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + google::protobuf::RepeatedPtrField idx1_tablets; + ASSERT_OK(test_client()->GetTablets(idx1, 0, &idx1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(idx1_tablets.size(), 1); + + // Verify CDC stream metadata contains both table1 and the index table. + std::unordered_set expected_tables = {table1.table_id(), idx1.table_id()}; + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating an index creation after stream creation"); + + // Verify cdc state table contains entries from both table1 & index table. + std::unordered_set expected_tablets; + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + for (const auto& tablet : idx1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + LOG(INFO) << "Stream contains the user table as well as index"; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = false; + + ASSERT_OK(WaitForFlushTables( + {idx1.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split the index's tablet. + WaitUntilSplitIsSuccesful(idx1_tablets.Get(0).tablet_id(), idx1, 2); + google::protobuf::RepeatedPtrField idx1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + idx1, 0, &idx1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(idx1_tablets_after_split.size(), 2); + + ASSERT_OK(WaitForFlushTables( + {table1.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split the table1's tablet. + WaitUntilSplitIsSuccesful(table1_tablets.Get(0).tablet_id(), table1, 4); + google::protobuf::RepeatedPtrField table1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + table1, 0, &table1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(table1_tablets_after_split.size(), 4); + + // wait for sometime so that tablet split codepath has completed adding new cdc state entries. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + std::unordered_set new_expected_tablets_in_state_table; + for (const auto& tablet : table1_tablets_after_split) { + new_expected_tablets_in_state_table.insert(tablet.tablet_id()); + } + + std::unordered_set tablets_not_expected_in_state_table; + for (const auto& tablet : idx1_tablets_after_split) { + tablets_not_expected_in_state_table.insert(tablet.tablet_id()); + } + + CDCStateTable cdc_state_table(test_client()); + bool seen_unexpected_tablets = false; + Status s; + auto table_range = + ASSERT_RESULT(cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeAll(), &s)); + for (auto row_result : table_range) { + ASSERT_OK(row_result); + auto& row = *row_result; + + if (row.key.stream_id == stream_id1) { + LOG(INFO) << "Read cdc_state table with tablet_id: " << row.key.tablet_id + << " stream_id: " << row.key.stream_id; + if (new_expected_tablets_in_state_table.contains(row.key.tablet_id)) { + new_expected_tablets_in_state_table.erase(row.key.tablet_id); + } + + if (tablets_not_expected_in_state_table.contains(row.key.tablet_id)) { + seen_unexpected_tablets = true; + break; + } + } + } + + bool seen_all_expected_tablets = new_expected_tablets_in_state_table.size() == 0 ? true : false; + ASSERT_FALSE(seen_unexpected_tablets); + ASSERT_TRUE(seen_all_expected_tablets); + LOG(INFO) << "CDC State table does not contain the children tablets of index's split tablet"; +} + +TEST_F( + CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToNonConsistentSnapshotStream) { + TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsistentSnapshotStream) { + TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + /* use_consistent_snapshot_stream */ true); +} + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index f981085788ff..8952c7396043 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -2742,7 +2742,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { return (tablets_after_split.size() == expected_num_tablets); }, - MonoDelta::FromSeconds(120), "Tabelt Split not succesful")); + MonoDelta::FromSeconds(120), "Tablet Split not succesful")); } void CDCSDKYsqlTest::CheckTabletsInCDCStateTable( diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 4e4fa39760c0..96eb4492100e 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -119,6 +119,8 @@ DECLARE_bool(enable_cdcsdk_setting_get_changes_response_byte_limit); DECLARE_uint64(cdcsdk_vwal_getchanges_resp_max_size_bytes); DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal); +DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream); +DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream); namespace yb { @@ -782,7 +784,7 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { std::string GetPubRefreshTimesString(vector pub_refresh_times); - void TestNonUserTableShouldNotGetAddedToCDCStream (bool create_consistent_snapshot_stream); + void TestNonEligibleTableShouldNotGetAddedToCDCStream(bool create_consistent_snapshot_stream); Status ExecuteYBAdminCommand( const std::string& command_name, const std::vector& command_args); @@ -799,6 +801,11 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { void TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( bool use_consistent_snapshot_stream); + + void TestNonEligibleTableRemovalFromCDCStream(bool use_consistent_snapshot_stream); + + void TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + bool use_consistent_snapshot_stream); }; } // namespace cdc diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 325fd397d039..2b1b2aea0aaf 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -45,7 +45,6 @@ #include #include "yb/cdc/cdc_service.pb.h" -#include "yb/cdc/cdc_state_table.h" #include "yb/cdc/xcluster_types.h" #include "yb/common/constants.h" #include "yb/common/entity_ids.h" @@ -144,6 +143,7 @@ enum RaftGroupStatePB; namespace cdc { class CDCStateTable; +struct CDCStateTableEntry; } // namespace cdc namespace master { @@ -1485,15 +1485,27 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Find all CDCSDK streams which do not have metadata for the newly added tables. Status FindCDCSDKStreamsForAddedTables(TableStreamIdsMap* table_to_unprocessed_streams_map); + // Find all CDCSDK streams that contain non eligible tables like indexes, mat views etc. in + // their metadata. + Status FindCDCSDKStreamsForNonEligibleTables(TableStreamIdsMap* non_user_tables_to_streams_map); + bool IsTableEligibleForCDCSDKStream( - const TableInfoPtr& table_info, const Schema& schema) const REQUIRES_SHARED(mutex_); + const TableInfoPtr& table_info, const std::optional& schema) const + REQUIRES_SHARED(mutex_); // This method compares all tables in the namespace to all the tables added to a CDCSDK stream, // to find tables which are not yet processed by the CDCSDK streams. void FindAllTablesMissingInCDCSDKStream( const xrepl::StreamId& stream_id, - const google::protobuf::RepeatedPtrField& table_ids, const NamespaceId& ns_id) - REQUIRES(mutex_); + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) REQUIRES(mutex_); + + // This method compares all tables in the namespace eligible for a CDCSDK stream to all the tables + // added to a CDCSDK stream, to find indexes / mat views that are part of the CDCSDK streams. + void FindAllNonEligibleTablesInCDCSDKStream( + const xrepl::StreamId& stream_id, + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) REQUIRES(mutex_); Status ValidateCDCSDKRequestProperties( const CreateCDCStreamRequestPB& req, const std::string& source_type_option_value, @@ -1506,6 +1518,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Status ProcessNewTablesForCDCSDKStreams( const TableStreamIdsMap& table_to_unprocessed_streams_map, const LeaderEpoch& epoch); + Status RemoveNonEligibleTablesFromCDCSDKStreams( + const TableStreamIdsMap& non_user_tables_to_streams_map, const LeaderEpoch& epoch); + // Find all the CDC streams that have been marked as provided state. Result> FindXReplStreamsMarkedForDeletion( SysCDCStreamEntryPB::State deletion_state); @@ -3112,6 +3127,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf, void RemoveTableFromCDCSDKUnprocessedMap(const TableId& table_id, const NamespaceId& ns_id); + void RemoveTableFromCDCSDKNonEligibleTableMap(const TableId& table_id, const NamespaceId& ns_id); + void ClearXReplState() REQUIRES(mutex_); Status LoadXReplStream() REQUIRES(mutex_); Status LoadUniverseReplication() REQUIRES(mutex_); @@ -3292,6 +3309,13 @@ class CatalogManager : public tserver::TabletPeerLookupIf, std::unordered_map> namespace_to_cdcsdk_unprocessed_table_map_ GUARDED_BY(cdcsdk_unprocessed_table_mutex_); + mutable MutexType cdcsdk_non_eligible_table_mutex_; + // In-memory map containing non-eligble tables like indexes/ materialized views which got added to + // CDCSDK stream's metadata. Will be refreshed on master restart / leadership change through the + // function: 'FindAllNonEligibleTablesInCDCSDKStream'. + std::unordered_map> + namespace_to_cdcsdk_non_eligible_table_map_ GUARDED_BY(cdcsdk_non_eligible_table_mutex_); + // Map of all consumer tables that are part of xcluster replication, to a map of the stream infos. std::unordered_map xcluster_consumer_table_stream_ids_map_ GUARDED_BY(mutex_); diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index 4ba1e3acda6b..7ddd5cbea56c 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -82,6 +82,7 @@ DEFINE_test_flag(bool, cdcsdk_skip_processing_dynamic_table_addition, false, DECLARE_bool(enable_ysql); DECLARE_bool(TEST_echo_service_enabled); +DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream); namespace yb { namespace master { @@ -297,6 +298,31 @@ void CatalogManagerBgTasks::Run() { } } + { + if (FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) { + // Find if there are any non eligible tables (indexes, mat views) present in cdcsdk + // stream that are not associated with a replication slot. + TableStreamIdsMap non_user_tables_to_streams_map; + // In case of master leader restart or leadership changes, we would have scanned all + // streams (without replication slot) in ACTIVE/DELETING METADATA state for non eligible + // tables and marked such tables for removal in + // namespace_to_cdcsdk_non_eligible_table_map_. + Status s = catalog_manager_->FindCDCSDKStreamsForNonEligibleTables( + &non_user_tables_to_streams_map); + + if (s.ok() && !non_user_tables_to_streams_map.empty()) { + s = catalog_manager_->RemoveNonEligibleTablesFromCDCSDKStreams( + non_user_tables_to_streams_map, l.epoch()); + } + if (!s.ok()) { + YB_LOG_EVERY_N(WARNING, 10) + << "Encountered failure while trying to remove non eligible " + "tables from cdc_state table: " + << s.ToString(); + } + } + } + // Ensure the master sys catalog tablet follows the cluster's affinity specification. if (FLAGS_sys_catalog_respect_affinity_task) { Status s = catalog_manager_->SysCatalogRespectLeaderAffinity(); diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index ea8c1d8622ad..f55787e37339 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -153,6 +153,13 @@ DEFINE_test_flag(bool, cdcsdk_skip_updating_cdc_state_entries_on_table_removal, "Skip updating checkpoint to max for cdc state table entries while removing a user table from " "CDCSDK stream."); +DEFINE_test_flag(bool, cdcsdk_add_indexes_to_stream, false, "Allows addition of index to a stream"); + +DEFINE_RUNTIME_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream, false, + "When enabled, all CDCSDK streams will be scanned for non-eligible tables like indexes, " + "materialised view etc. in their stream metadata and these tables will be marked for removal " + "by catalog manager background thread."); + DECLARE_bool(xcluster_wait_on_ddl_alter); DECLARE_int32(master_rpc_timeout_ms); DECLARE_bool(ysql_yb_enable_replication_commands); @@ -329,8 +336,17 @@ class CDCStreamLoader : public Visitor { if ((metadata.state() == SysCDCStreamEntryPB::ACTIVE || metadata.state() == SysCDCStreamEntryPB::DELETING_METADATA) && ns && ns->state() == SysNamespaceEntryPB::RUNNING) { + auto eligible_tables_info = catalog_manager_->FindAllTablesForCDCSDK(metadata.namespace_id()); catalog_manager_->FindAllTablesMissingInCDCSDKStream( - stream_id, metadata.table_id(), metadata.namespace_id()); + stream_id, metadata.table_id(), eligible_tables_info); + + // Check for any non-eligible tables like indexes, matview etc in CDC stream only if the + // stream is not associated with a replication slot. + if (FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream && + stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + catalog_manager_->FindAllNonEligibleTablesInCDCSDKStream( + stream_id, metadata.table_id(), eligible_tables_info); + } } LOG(INFO) << "Loaded metadata for CDC stream " << stream->ToString() << ": " @@ -1781,7 +1797,8 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( void CatalogManager::FindAllTablesMissingInCDCSDKStream( const xrepl::StreamId& stream_id, - const google::protobuf::RepeatedPtrField& table_ids, const NamespaceId& ns_id) { + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) { std::unordered_set stream_table_ids; // Store all table_ids associated with the stram in 'stream_table_ids'. for (const auto& table_id : table_ids) { @@ -1791,7 +1808,7 @@ void CatalogManager::FindAllTablesMissingInCDCSDKStream( // Get all the tables associated with the namespace. // If we find any table present only in the namespace, but not in the stream, we add the table // id to 'cdcsdk_unprocessed_tables'. - for (const auto& table_info : FindAllTablesForCDCSDK(ns_id)) { + for (const auto& table_info : eligible_tables_info) { auto ltm = table_info->LockForRead(); if (!stream_table_ids.contains(table_info->id())) { LOG(INFO) << "Found unprocessed table: " << table_info->id() @@ -1803,6 +1820,118 @@ void CatalogManager::FindAllTablesMissingInCDCSDKStream( } } +Status CatalogManager::FindCDCSDKStreamsForNonEligibleTables( + TableStreamIdsMap* non_user_tables_to_streams_map) { + std::unordered_map> namespace_to_non_user_table_map; + { + SharedLock lock(cdcsdk_non_eligible_table_mutex_); + int32_t found_non_user_tables = 0; + for (const auto& [ns_id, table_ids] : namespace_to_cdcsdk_non_eligible_table_map_) { + for (const auto& table_id : table_ids) { + namespace_to_non_user_table_map[ns_id].insert(table_id); + if (++found_non_user_tables >= FLAGS_cdcsdk_table_processing_limit_per_run) { + break; + } + } + } + } + + { + SharedLock lock(mutex_); + for (const auto& [stream_id, stream_info] : cdc_stream_map_) { + if (stream_info->namespace_id().empty()) { + continue; + } + + // Removal of non-eligible tables will only be done on CDC stream that are not associated with + // a replication slot. + if (!stream_info->GetCdcsdkYsqlReplicationSlotName().empty()) { + continue; + } + + const auto non_user_tables = + FindOrNull(namespace_to_non_user_table_map, stream_info->namespace_id()); + if (!non_user_tables) { + continue; + } + + auto ltm = stream_info->LockForRead(); + if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE || + ltm->pb.state() == SysCDCStreamEntryPB::DELETING_METADATA) { + for (const auto& non_user_table_id : *non_user_tables) { + auto table = tables_->FindTableOrNull(non_user_table_id); + if (!table) { + LOG_WITH_FUNC(WARNING) + << "Table " << non_user_table_id << " deleted before it could be removed"; + continue; + } + + if (std::find(ltm->table_id().begin(), ltm->table_id().end(), non_user_table_id) != + ltm->table_id().end()) { + (*non_user_tables_to_streams_map)[non_user_table_id].push_back(stream_info); + VLOG(1) << "Will try and remove table: " << non_user_table_id + << ", from stream: " << stream_info->id(); + } + } + } + } + } + + for (const auto& [ns_id, non_user_table_ids] : namespace_to_non_user_table_map) { + for (const auto& non_user_table_id : non_user_table_ids) { + if (!non_user_tables_to_streams_map->contains(non_user_table_id)) { + // This means we found no active CDCSDK stream where this table was present, hence we can + // remove this table from 'namespace_to_cdcsdk_non_eligible_table_map_'. + RemoveTableFromCDCSDKNonEligibleTableMap(non_user_table_id, ns_id); + } + } + } + + return Status::OK(); +} + +void CatalogManager::FindAllNonEligibleTablesInCDCSDKStream( + const xrepl::StreamId& stream_id, + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) { + // If we find any table present only in the the stream, but not in the list of eligible tables in + // namespace for CDC, we add the table id to 'namespace_to_cdcsdk_non_eligible_table_map_'. + std::unordered_set user_table_ids; + for (const auto& table_info : eligible_tables_info) { + user_table_ids.insert(table_info->id()); + } + + std::unordered_set stream_table_ids; + // Store all table_ids associated with the stream in 'stream_table_ids'. + for (const auto& table_id : table_ids) { + if (!user_table_ids.contains(table_id)) { + auto table_info = GetTableInfoUnlocked(table_id); + Schema schema; + Status status = table_info->GetSchema(&schema); + if (!status.ok()) { + LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name(); + // Skip this table for now, it will be revisited for removal on master restart/master leader + // change. + continue; + } + + // Re-confirm this table is not meant to be part of a CDC stream. + if (!IsTableEligibleForCDCSDKStream(table_info, schema)) { + LOG(INFO) << "Found a non-eligible table: " << table_info->id() + << ", for stream: " << stream_id; + LockGuard lock(cdcsdk_non_eligible_table_mutex_); + namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert( + table_info->id()); + } else { + // Ideally we are not expected to enter the else clause. + LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id + << " that is not present in the eligible list of tables " + "from the namespace for CDC"; + } + } + } +} + Status CatalogManager::ValidateCDCSDKRequestProperties( const CreateCDCStreamRequestPB& req, const std::string& source_type_option_value, const std::string& record_type_option_value, const std::string& id_type_option_value) { @@ -1899,24 +2028,32 @@ std::vector CatalogManager::FindAllTablesForCDCSDK(const Namespace } bool CatalogManager::IsTableEligibleForCDCSDKStream( - const TableInfoPtr& table_info, const Schema& schema) const { - bool has_pk = true; - bool has_invalid_pg_typeoid = false; - for (const auto& col : schema.columns()) { - if (col.order() == static_cast(PgSystemAttrNum::kYBRowId)) { - // ybrowid column is added for tables that don't have user-specified primary key. - VLOG(1) << "Table: " << table_info->id() - << ", will not be added to CDCSDK stream, since it does not have a primary key"; - has_pk = false; - break; + const TableInfoPtr& table_info, const std::optional& schema) const { + if (schema.has_value()) { + bool has_pk = true; + bool has_invalid_pg_typeoid = false; + for (const auto& col : schema->columns()) { + if (col.order() == static_cast(PgSystemAttrNum::kYBRowId)) { + // ybrowid column is added for tables that don't have user-specified primary key. + VLOG(1) << "Table: " << table_info->id() + << ", will not be added to CDCSDK stream, since it does not have a primary key"; + has_pk = false; + break; + } + if (col.pg_type_oid() == 0) { + has_invalid_pg_typeoid = true; + } } - if (col.pg_type_oid() == 0) { - has_invalid_pg_typeoid = true; + if (!has_pk || has_invalid_pg_typeoid) { + if (FLAGS_TEST_cdcsdk_add_indexes_to_stream) { + // allow adding user created indexes to CDC stream. + if (IsUserIndexUnlocked(*table_info)) { + return true; + } + } + return false; } } - if (!has_pk || has_invalid_pg_typeoid) { - return false; - } if (IsMatviewTable(*table_info)) { // Materialized view should not be added as they are not supported for streaming. @@ -2100,6 +2237,82 @@ Status CatalogManager::ProcessNewTablesForCDCSDKStreams( return Status::OK(); } +Status CatalogManager::RemoveNonEligibleTablesFromCDCSDKStreams( + const TableStreamIdsMap& non_user_tables_to_streams_map, const LeaderEpoch& epoch) { + int32_t removed_non_user_tables = 0; + for (const auto& [table_id, streams] : non_user_tables_to_streams_map) { + if (removed_non_user_tables >= FLAGS_cdcsdk_table_processing_limit_per_run) { + VLOG(1) + << "Reached the limit of number of non-eligible tables to be removed per iteration. Will " + "remove the remaining tables in the next iteration."; + break; + } + + // Delete the table from all streams now. + NamespaceId namespace_id; + bool stream_pending = false; + Status status; + for (const auto& stream : streams) { + if PREDICT_FALSE (stream == nullptr) { + LOG(WARNING) << "Could not find CDC stream: " << stream->id(); + continue; + } + + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + for(const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + // Explicitly remove the table from the set since we want to remove the tablet entries of this + // table from the cdc state table. + tables_in_stream_metadata.erase(table_id); + auto result = UpdateCheckpointForTabletEntriesInCDCState( + stream->StreamId(), tables_in_stream_metadata, table_id); + + if (!result.ok()) { + LOG(WARNING) << "Encountered error while trying to update/delete tablets entries of table: " + << table_id << ", from cdc_state table for stream" << stream->id() << ": " + << status; + stream_pending = true; + continue; + } + + { + auto stream_lock = stream->LockForWrite(); + if (stream_lock->is_deleting()) { + continue; + } + } + + Status status = RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id); + if (!status.ok()) { + LOG(WARNING) << "Encountered error while trying to remove non-eligible table " << table_id + << " from metadata of stream " << stream->StreamId() << " and maps. "; + stream_pending = true; + continue; + } + + LOG(INFO) + << "Succesfully removed non-eligible table " << table_id + << " from stream metadata and updated corresponding cdc_state table entries for stream: " + << stream->id(); + + namespace_id = stream->namespace_id(); + } + + // Remove non_user tables from 'namespace_to_cdcsdk_non_user_table_map_'. + if (!stream_pending) { + RemoveTableFromCDCSDKNonEligibleTableMap(table_id, namespace_id); + } + ++removed_non_user_tables; + } + + return Status::OK(); +} + void CatalogManager::RemoveTableFromCDCSDKUnprocessedMap( const TableId& table_id, const NamespaceId& ns_id) { LockGuard lock(cdcsdk_unprocessed_table_mutex_); @@ -2112,6 +2325,20 @@ void CatalogManager::RemoveTableFromCDCSDKUnprocessedMap( } } +void CatalogManager::RemoveTableFromCDCSDKNonEligibleTableMap( + const TableId& table_id, const NamespaceId& ns_id) { + LockGuard lock(cdcsdk_non_eligible_table_mutex_); + auto non_user_tables = FindOrNull(namespace_to_cdcsdk_non_eligible_table_map_, ns_id); + if (!non_user_tables) { + return; + } + + non_user_tables->erase(table_id); + if (non_user_tables->empty()) { + namespace_to_cdcsdk_non_eligible_table_map_.erase(ns_id); + } +} + Result> CatalogManager::FindXReplStreamsMarkedForDeletion( SysCDCStreamEntryPB::State deletion_state) { std::vector streams; @@ -4318,6 +4545,24 @@ Status CatalogManager::UpdateCDCProducerOnTabletSplit( std::vector streams; std::vector entries; for (const auto stream_type : {cdc::XCLUSTER, cdc::CDCSDK}) { + if (stream_type == cdc::CDCSDK && + FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) { + const auto& table_info = GetTableInfo(producer_table_id); + // Skip adding children tablet entries in cdc state if the table is an index or a mat view. + // These tables, if present in CDC stream, are anyway going to be removed by a bg thread. This + // check ensures even if there is a race condition where a tablet of a non-eligible table + // splits and concurrently we are removing such tables from stream, the child tables do not + // get added. + { + SharedLock lock(mutex_); + if (!IsTableEligibleForCDCSDKStream(table_info, std::nullopt)) { + LOG(INFO) << "Skipping adding children tablets to cdc state for table " + << producer_table_id << " as it is not meant to part of a CDC stream"; + continue; + } + } + } + { SharedLock lock(mutex_); streams = GetXReplStreamsForTable(producer_table_id, stream_type); @@ -6217,7 +6462,7 @@ Status CatalogManager::RemoveUserTableFromCDCSDKStream( if (!stream->IsDynamicTableAdditionDisabled()) { RETURN_INVALID_REQUEST_STATUS( "Cannot remove table unless dynamic table addition is disabled for the stream. Please use " - "the yb-admin command \"disable_dynamic_table_addition_in_change_data_stream\" to disable " + "the yb-admin command \"disable_dynamic_table_addition_on_change_data_stream\" to disable " "dynamic table addition on the stream."); } @@ -6265,8 +6510,8 @@ Status CatalogManager::RemoveUserTableFromCDCSDKStream( // table from the cdc state table. tables_in_stream_metadata.erase(table_id); RETURN_NOT_OK_PREPEND( - UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata), - "Error updating tablet entries from cdc state table"); + UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata, table_id), + "Error updating/deleting tablet entries from cdc state table"); } // Now remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist @@ -6275,9 +6520,9 @@ Status CatalogManager::RemoveUserTableFromCDCSDKStream( RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id), "Error removing table from stream metadata and maps"); - LOG_WITH_FUNC(INFO) - << "Successfully removed table " << table_id << " from CDC stream: " << stream_id - << " and updated the checkpoint to max for corresponding cdc state table entries."; + LOG_WITH_FUNC(INFO) << "Successfully removed table " << table_id + << " from CDC stream: " << stream_id + << " and updated/deleted corresponding cdc state table entries."; return Status::OK(); } @@ -7667,6 +7912,11 @@ CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( const TableId& table_to_be_removed) { std::unordered_set tablet_entries_to_be_removed; + // This will only contain entries for colocated tables that have a composite value in the + // stream_id column i.e. in the form of stream_id_table_id. Such entries will have to be directly + // deleted as UpdatePeersAndMetrics ignores these entries. + std::vector cdc_state_entries_to_be_deleted; + // If the table_id to be removed is provided, we will only find out cdc state table entries // corresponding to this table and update their checkpoints. Otherwise, we'll consider all state // table entries for checkpoint update. @@ -7698,11 +7948,19 @@ CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( const auto& entry = *entry_result; if (entry.key.stream_id == stream_id) { - // If table_id is provided, filter out state entries belonging to tablets of the table. - if (table_to_be_removed.empty() || - (!table_to_be_removed.empty() && - tablet_entries_to_be_removed.contains(entry.key.tablet_id))) { - cdc_state_tablet_entries.push_back(entry.key.tablet_id); + // For updating the checkpoint, only consider entries that do not have a colocated table_id as + // these will be manually deleted. + if (entry.key.colocated_table_id.empty()) { + // If table_id is provided, filter out state entries belonging to tablets of the table. + if (table_to_be_removed.empty() || + tablet_entries_to_be_removed.contains(entry.key.tablet_id)) { + cdc_state_tablet_entries.push_back(entry.key.tablet_id); + } + } else if (entry.key.colocated_table_id == table_to_be_removed) { + // If the entry contain a colocated_table_id, it belongs to one of the colocated + // tables on that tablet. If this colocated_table_id matches with the table being removed, + // then we'll delete this entry directly. + cdc_state_entries_to_be_deleted.push_back(entry.key); } } } @@ -7734,15 +7992,28 @@ CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( } if (!entries_to_update.empty()) { - LOG_WITH_FUNC(INFO) - << "Updating checkpoint to max for " << entries_to_update.size() - << " cdc state entries as part of validating cdc state table entries for CDC stream: " - << stream_id; + LOG_WITH_FUNC(INFO) << "Setting checkpoint to max for " << entries_to_update.size() + << " cdc state entries for CDC stream: " << stream_id; RETURN_NOT_OK_PREPEND( cdc_state_table_->UpdateEntries(entries_to_update), "Error setting checkpoint to OpId::Max() in cdc_state table"); } + if (!cdc_state_entries_to_be_deleted.empty()) { + // Only 1 entry is expected for a colocated table that is being removed. + RSTATUS_DCHECK( + cdc_state_entries_to_be_deleted.size() == 1, IllegalState, + "Found more than one cdc state table entry that needs to be deleted for removing table $0 " + "from CDC stream $1", + table_to_be_removed, stream_id); + + LOG_WITH_FUNC(INFO) << "Deleting cdc state table entry (tablet,stream) - " + << cdc_state_entries_to_be_deleted[0].ToString(); + RETURN_NOT_OK_PREPEND( + cdc_state_table_->DeleteEntries(cdc_state_entries_to_be_deleted), + "Error deleting entries from cdc_state table"); + } + return entries_to_update; } diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index 98d97205a870..b07878c66a77 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -4063,7 +4063,7 @@ Status ClusterAdminClient::ValidateAndSyncCDCStateEntriesForCDCSDKStream( req.set_stream_id(stream_id); RpcController rpc; - rpc.set_timeout(timeout_); + rpc.set_timeout(MonoDelta::FromSeconds(std::max(timeout_.ToSeconds(), 120.0))); RETURN_NOT_OK( master_replication_proxy_->ValidateAndSyncCDCStateEntriesForCDCSDKStream(req, &resp, &rpc));