From 3a375a11a4036f322787c73ef85f38a10d23f8eb Mon Sep 17 00:00:00 2001 From: Siddharth Shah Date: Sat, 29 Jun 2024 02:28:43 +0530 Subject: [PATCH] [BACKPORT 2.20][#22876][#22835][#22773] CDCSDK: Remove user and non-eligible tables from CDCSDK stream Summary: **Backport description:** Faced merge conflicts because of missing macro, utility methods. Fixed some and added some missing methods. **Original description:** Original commits: 92827b32fdc8499ecc1c948465dd143d5bf04982 / D35870 4e9a81c58461c6e4dd16e64d307f40b2f1f19b29 / D36031 af49a1e080348abe8a8819c1594146666ca1fc47 / D36240 Please refer the original commit for full summary of each diff. **Diff-1 (D35870): Add new yb-admin command to remove user table from CDCSDK stream** This diff introduces three new yb-admin commands required to remove a **user table** from a CDCSDK stream. **`NOTE: All three commands are only meant to be used on CDC streams that are not associated with a replication slot.`** **Command-1**: yb-admin command to disable dynamic table addition in a CDC stream. Only works when the new auto flag `enable_cdcsdk_dynamic_tables_disable_option` is set to true. **Note, post execution of this command, no dynamic tables (user/non-user) will get added to CDC stream. Additionally, there is no option to re-enable dynamic table addition for the stream.** ``` yb-admin \ -master_addresses \ disable_dynamic_table_addition_on_change_data_stream ``` The command works with a single stream_id. **Command-2**: yb-admin command to remove only a particular **user** table from the CDC stream metadata as well as update the checkpoint for corresponding state table entries to OpId max. Since, the checkpoint is set to max, these entries will be later deleted from the cdc state table by a separate thread (UpdatePeersAndMetrics). ``` yb-admin \ -master_addresses \ remove_user_table_from_change_data_stream ``` The command works with a single stream_id & table_id. **Command-3**: yb-admin command to validate cdc state table entries for a particular stream. As part of validation, if the table of any cdc state table entry is not present in the CDC stream metadata, then checkpoint of such entries will be updated to OpID max, and they'll be later deleted by a separate thread (UpdatePeersAndMetrics). ``` yb-admin \ -master_addresses \ validate_and_sync_cdc_state_table_entries_on_change_data_stream ``` The command works with a single stream_id. **Diff-2 (D36031): Remove non-eligible tables for CDC from existing CDCSDK stream** Some non-eligible tables like indexes etc. created after creation of a CDC stream were getting added to the CDC stream due to this missing logic in addition of dynamic tables codepath. We do not hold retention barriers on tablets of such tables until and unless they split, in which case, we start holding retention barriers on the children tablets. This leads to heavy resource usage over time and since the CDC stream never polls on tables of such tables, retention barriers are not lifted until the active time of these tablets exceed cdc_intent_retention_ms. Therefore, to prevent resource consumption from such tables, we want to achieve the following: Remove these non-eligible tables from the stream metadata so that any further tablet splitting on these tables do not lead to addition of children tablets in cdc state table. Release retention barriers on the existing tablets that are part of the cdc state table and finally remove these state table entries. **Diff-3 (D36240): Add new auto flag to identify non-eligible tables in CDC stream** This diff is an extension of D36031 which introduced cleanup mechanism for non-eligible tables. The mechanism involves two steps: Identification of indexes -> happens during loading of CDC streams into memory on a master restart/leadership change. Removal of these identified indexes by the bg thread. Without this diff, both these steps were guarded under a non-auto flag - enable_cleanup_of_non_eligible_tables_from_cdcsdk_stream Therefore, post upgrade, step-1 requires the user to set the above flag and explicitly do a master restart/leader change. To avoid this explicit master restart/leader change and still give control to users over this cleanup, we are introducing a new auto flag `cdcsdk_enable_identification_of_non_eligible_tables` that will guard the identification step. **Upgrade/Rollback safety:** //cdcsdk_disable_dynamic_table_addition// - added a new optional field in existing protos SysCDCStreamEntryPB, CDCStreamInfoPB. This field is protected and will only be read when the new auto flag `cdcsdk_enable_dynamic_tables_disable_option` is set. Introduced request, response proto for new RPCs: - DisableDynamicTableAdditionOnCDCSDKStream - DisableDynamicTableAdditionOnCDCSDKStreamRequestPB, DisableDynamicTableAdditionOnCDCSDKStreamResponsePB - RemoveUserTableFromCDCSDKStream - RemoveUserTableFromCDCSDKStreamRequestPB, RemoveUserTableFromCDCSDKStreamResponsePB - ValidateAndSyncCDCStateEntriesForCDCSDKStream - ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB, ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB Jira: DB-11778, DB-11733, DB-11676 Test Plan: ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestDisableOfDynamicTableAdditionOnNonConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestDisableOfDynamicTableAdditionOnConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestUserTableRemovalFromNonConsistentSnapshotCDCStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestUserTableRemovalFromConsistentSnapshotCDCStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTableRemovalFromNonConsistentSnapshotCDCStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTableRemovalFromConsistentSnapshotCDCStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestChildTabletsOfNonEligibleTableDoNotGetAddedToNonConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnNonConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnConsistentSnapshotStream Reviewers: skumar, asrinivasan, stiwary, xCluster, hsunder Reviewed By: asrinivasan Subscribers: ybase, ycdcxcluster Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D36321 --- src/yb/integration-tests/cdcsdk_ysql-test.cc | 687 ++++++++++++++++- .../cdcsdk_ysql_test_base.cc | 50 +- .../integration-tests/cdcsdk_ysql_test_base.h | 27 +- src/yb/master/catalog_entity_info.cc | 16 + src/yb/master/catalog_entity_info.h | 4 + src/yb/master/catalog_entity_info.proto | 5 + src/yb/master/catalog_manager.h | 53 +- src/yb/master/catalog_manager_bg_tasks.cc | 26 + src/yb/master/master_replication.proto | 42 ++ src/yb/master/master_replication_service.cc | 3 + src/yb/master/xrepl_catalog_manager.cc | 692 +++++++++++++++++- src/yb/tools/yb-admin_cli.cc | 47 ++ src/yb/tools/yb-admin_client.cc | 78 ++ src/yb/tools/yb-admin_client.h | 6 + 14 files changed, 1707 insertions(+), 29 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 6da49d8db960..4aa6680b78a6 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -7915,7 +7915,7 @@ TEST_F(CDCSDKYsqlTest, TestUpdateOnNonExistingEntry) { ASSERT_EQ(change_resp.cdc_sdk_proto_records().Get(2).row_message().op(), RowMessage::COMMIT); } -void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( +void CDCSDKYsqlTest::TestNonEligibleTableShouldNotGetAddedToCDCStream( bool create_consistent_snapshot_stream) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = create_consistent_snapshot_stream; @@ -7937,7 +7937,7 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); ASSERT_EQ(table1_tablets.size(), 3); - // Create non-user tables like index, mat views BEFORE the stream has been created + // Create non-eligible tables like index, mat views BEFORE the stream has been created ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx1 ON $0(a ASC)", tableName1)); ASSERT_OK( conn.ExecuteFormat("CREATE MATERIALIZED VIEW $0_mv1 AS SELECT COUNT(*) FROM $0", tableName1)); @@ -7946,7 +7946,7 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( ? ASSERT_RESULT(CreateConsistentSnapshotStream()) : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); - // // Create non-user tables AFTER the stream has been created + // // Create non-eligible tables AFTER the stream has been created ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx2 ON $0(b ASC)", tableName1)); ASSERT_OK( conn.ExecuteFormat("CREATE MATERIALIZED VIEW $0_mv2 AS SELECT COUNT(*) FROM $0", tableName1)); @@ -8006,12 +8006,687 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( ASSERT_EQ(expected_tablets, actual_tablets); } -TEST_F(CDCSDKYsqlTest, TestNonUserTableShouldNotGetAddedToNonConsistentSnapshotCDCStream) { - TestNonUserTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ false); +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableShouldNotGetAddedToNonConsistentSnapshotCDCStream) { + TestNonEligibleTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ false); } TEST_F(CDCSDKYsqlTest, TestNonUserTableShouldNotGetAddedToConsistentSnapshotCDCStream) { - TestNonUserTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ true); + TestNonEligibleTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ true); +} + +void CDCSDKYsqlTest::TestDisableOfDynamicTableAdditionOnCDCStream( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_catalog_manager_bg_task_wait_ms) = 100; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + + const vector table_list_suffix = {"_0", "_1", "_2", "_3", "_4"}; + const int kNumTables = 5; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the first two tables. + for (idx = 0; idx < 2; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id1 = use_consistent_snapshot_stream ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(EXPLICIT)); + auto stream_id2 = use_consistent_snapshot_stream ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(EXPLICIT)); + + std::unordered_set expected_table_ids = {table[0].table_id(), table[1].table_id()}; + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, "Waiting for stream metadata after stream creation."); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids, "Waiting for stream metadata after stream creation."); + + // Since dynamic table addition is not yet disabled, create a new table and verify that it gets + // added to stream metadata of both the streams. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + expected_table_ids.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, "Waiting for GetDBStreamInfo after creating a new table."); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids, "Waiting for GetDBStreamInfo after creating a new table."); + + // Disable dynamic table addition on stream1 via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id1)); + + // Create a new table and verify that it only gets added to stream2's metadata. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + // wait for the bg thread responsible for dynamic table addition to complete its processing. + SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier)); + + // Stream1's metadata should not contain table_4 as dynamic table addition is disabled. Therefore, + // the expected set of tables remains same as before. + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, + "Waiting for GetDBStreamInfo after disabling dynamic table addition on stream1."); + + // Stream2's metadata should contain table_4 as dynamic table addition is not disabled. + auto expected_table_ids_for_stream2 = expected_table_ids; + expected_table_ids_for_stream2.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids_for_stream2, + "Waiting for GetDBStreamInfo after disabling dynamic table addition on stream1."); + + // Verify tablets of table_4 have only been added to cdc_state table for stream2. + std::unordered_set expected_tablets_for_stream1; + std::unordered_set expected_tablets_for_stream2; + for (int i = 0; i < idx; i++) { + if (i < 3) { + expected_tablets_for_stream1.insert(tablets[i].Get(0).tablet_id()); + } + expected_tablets_for_stream2.insert(tablets[i].Get(0).tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets_for_stream1, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets_for_stream2, test_client(), stream_id2); + + // Even on a master restart, table_4 should not get added to the stream1. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Any newly created table after master restart should not get added to stream1. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + // wait for the bg thread responsible for dynamic table addition to complete its processing. + SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier)); + + // Stream1's metadata should not contain table_5 as dynamic table addition is disabled. + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, + "Waiting for GetDBStreamInfo after creating new table on master restart."); + + // Stream2's metadata should contain table_5 as dynamic table addition is not disabled. + expected_table_ids_for_stream2.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids_for_stream2, + "Waiting for GetDBStreamInfo after creating new table on master restart."); + + // verify tablets of table_4 & table_5 have not been added to cdc_state table for stream1. + CheckTabletsInCDCStateTable(expected_tablets_for_stream1, test_client(), stream_id1); + + // Tablets of table_5 should be added to cdc state table for stream2. + expected_tablets_for_stream2.insert(tablets[idx - 1].Get(0).tablet_id()); + CheckTabletsInCDCStateTable(expected_tablets_for_stream2, test_client(), stream_id2); +} + +TEST_F(CDCSDKYsqlTest, TestDisableOfDynamicTableAdditionOnNonConsistentSnapshotStream) { + TestDisableOfDynamicTableAdditionOnCDCStream( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestDisableOfDynamicTableAdditionOnConsistentSnapshotStream) { + TestDisableOfDynamicTableAdditionOnCDCStream( + /* use_consistent_snapshot_stream */ true); +} + +void CDCSDKYsqlTest::TestUserTableRemovalFromCDCStream(bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(1, 1, false)); + + const vector table_list_suffix = {"_0", "_1", "_2"}; + const int kNumTables = 3; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the all 3 tables. + for (idx = 0; idx < kNumTables; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Before we remove a table, get the initial stream metadata as well as cdc state table entries. + std::unordered_set expected_tables; + for (const auto& table_entry : table) { + expected_tables.insert(table_entry.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id, expected_tables, "Waiting for GetDBStreamInfo after stream creation"); + + std::unordered_set expected_tablets; + for (const auto& tablets_entries : tablets) { + for (const auto& tablet : tablets_entries) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Disable dynamic table addition on stream via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id)); + + // Remove table_1 from stream using yb-admin command. This command will remove table from stream + // metadata as well as update its corresponding state table tablet entries with checkpoint as max. + ASSERT_OK(RemoveUserTableFromCDCSDKStream(stream_id, table[0].table_id())); + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Stream metadata should no longer contain the removed table i.e. table_1. + expected_tables.erase(table[0].table_id()); + std::unordered_set expected_tables_after_table_removal = expected_tables; + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetDBStreamInfo after table removal from CDC stream."); + + // Since checkpoint will be set to max for table_1's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (int i = 1; i < idx; i++) { + for (const auto& tablet : tablets[i]) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + ASSERT_OK(test_client()->FlushTables( + {table[0].table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split table_1's tablet. + WaitUntilSplitIsSuccesful(tablets[0].Get(0).tablet_id(), table[0], 2); + google::protobuf::RepeatedPtrField table1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + table[0], 0, &table1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(table1_tablets_after_split.size(), 2); + + // Wait for sometime so that tablet split codepath has completed adding new cdc state entries. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + // Children tablets of table_1 shouldnt get added to cdc state table since the table no longer + // exists in stream metadata. + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + + // Even after a restart, we shouldn't see table_1 in stream metadata as well as cdc state table + // entries shouldnt contain any of the table_1 tablets. + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetBStreamInfo after master restart."); + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); +} + +TEST_F(CDCSDKYsqlTest, TestUserTableRemovalFromNonConsistentSnapshotCDCStream) { + TestUserTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestUserTableRemovalFromConsistentSnapshotCDCStream) { + TestUserTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ true); +} + +void CDCSDKYsqlTest::TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal) = + true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + + const vector table_list_suffix = {"_0", "_1", "_2"}; + const int kNumTables = 3; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the all 3 tables. + for (idx = 0; idx < kNumTables; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 3, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Before we remove a table, get the initial stream metadata as well as cdc state table entries. + std::unordered_set expected_tables; + for (const auto& table_entry : table) { + expected_tables.insert(table_entry.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id, expected_tables, "Waiting for GetDBStreamInfo after stream creation"); + + std::unordered_set expected_tablets; + for (const auto& tablets_entries : tablets) { + for (const auto& tablet : tablets_entries) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Disable dynamic table addition on stream via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id)); + + // Remove table_1 from stream using yb-admin command. This command will remove table from stream + // metadata but skip updating cdc state entries because the test flag + // skip_updating_cdc_state_entries_on_table_removal is set. + ASSERT_OK(RemoveUserTableFromCDCSDKStream(stream_id, table[0].table_id())); + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Stream metadata should no longer contain the removed table i.e. table_1. + expected_tables.erase(table[0].table_id()); + std::unordered_set expected_tables_after_table_removal = expected_tables; + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetDBStreamInfo after table removal from CDC stream."); + + // Verify that cdc state table still contains entries for the table that was removed. + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Now, validate the cdc state entries using the yb-admin command + // 'validate_cdc_state_table_entries_on_change_data_stream'. It will find state table entries for + // table_1 and update their checkpoints to max. + ASSERT_OK(ValidateAndSyncCDCStateEntriesForCDCSDKStream(stream_id)); + + // Since checkpoint will be set to max for table_1's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (int i = 1; i < idx; i++) { + for (const auto& tablet : tablets[i]) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); +} + +TEST_F( + CDCSDKYsqlTest, + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnNonConsistentSnapshotStream) { + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F( + CDCSDKYsqlTest, + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnConsistentSnapshotStream) { + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + /* use_consistent_snapshot_stream */ true); +} + +// This test performs the following: +// 1. Create a table t1 +// 2. Create a CDC stream +// 3. Create an index i1 on t1 - since test flag to add index is enabled, i1 should get added to CDC +// stream. +// 4. Confirm t1 & i1 are part of CDC stream metadata and cdc state table. +// 5. Restart master -> i1 will be marked for removal and bg thread will actually remove it from CDC +// stream metadata and update the checkpoint for state entries to max. +// 6. Verify i1 no longer exists in stream metadata and state entries have been deleted. +// 7. Create a table t2 +// 8. Verify it gets added to stream metadata and cdc state table. +void CDCSDKYsqlTest::TestNonEligibleTableRemovalFromCDCStream(bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_catalog_manager_bg_task_wait_ms) = 100; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams( + 1, 1, false /* colocated */, false /* cdc_populate_safepoint_record */, + true /* set_pgsql_proxy_bind_address */)); + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + const auto tableName1 = "test_table_1"; + const auto tableName2 = "test_table_2"; + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName1)); + auto table1 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName1)); + google::protobuf::RepeatedPtrField table1_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table1_tablets.size(), 3); + + xrepl::StreamId stream_id1 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + xrepl::StreamId stream_id2 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + const vector index_list_suffix = {"_0", "_1", "_2", "_3"}; + const int kNumIndexes = 4; + vector indexes(kNumIndexes); + int i = 0; + vector> idx_tablets(kNumIndexes); + + while (i < kNumIndexes) { + // Create an index AFTER the stream has been created + ASSERT_OK( + conn.ExecuteFormat("CREATE INDEX $0_idx$1 ON $0(b ASC)", tableName1, index_list_suffix[i])); + indexes[i] = ASSERT_RESULT(GetTable( + &test_cluster_, kNamespaceName, Format("$0_idx$1", tableName1, index_list_suffix[i]))); + // Wait for the bg thread to complete finding out new tables added in the namespace and add + // them to CDC stream if relevant. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + ASSERT_OK(test_client()->GetTablets( + indexes[i], 0, &idx_tablets[i], /* partition_list_version=*/nullptr)); + ASSERT_EQ(idx_tablets[i].size(), 1); + i++; + } + + // Verify CDC stream metadata contains both table1 and the index table. + std::unordered_set expected_tables = {table1.table_id()}; + for (const auto& idx : indexes) { + expected_tables.insert(idx.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating an index after stream creation"); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after creating an index after stream creation"); + + // Verify cdc state table contains entries from both table1 & index table. + std::unordered_set expected_tablets; + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + for (const auto& tablets : idx_tablets) { + for (const auto& tablet : tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream contains the user table as well as indexes"; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = false; + // Non-eligible tables like the index will be removed from stream on a master restart. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + + // wait for the bg thread to remove the index from stream metadata and update the checkpoint for + // corresponding state table entries to max. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + // Stream metadata should no longer contain the index. + expected_tables.clear(); + expected_tables.insert(table1.table_id()); + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after non-user table removal from CDC stream."); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after non-user table removal from CDC stream."); + + // Since checkpoint will be set to max for index's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream, after master restart, only contains the user table."; + + // Create a dynamic table and create non eligible tables on this dynamic table. + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName2)); + auto table2 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName2)); + google::protobuf::RepeatedPtrField table2_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table2, 0, &table2_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table2_tablets.size(), 3); + + expected_tables.insert(table2.table_id()); + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating a new user table post master restart."); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after creating a new user table post master restart."); + + for (const auto& tablet : table2_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream contains both the user tables."; +} + +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableRemovalFromNonConsistentSnapshotCDCStream) { + TestNonEligibleTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableRemovalFromConsistentSnapshotCDCStream) { + TestNonEligibleTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ true); +} + +// This test performs the following: +// 1. Create a table t1 +// 2. Create a CDC stream +// 3. Create an index i1 on t1 - since test flag to add index is enabled, i1 should get added to CDC +// stream. +// 4. Confirm t1 & i1 are part of CDC stream metadata and cdc state table. +// 5. Split one tablet each of index i1 and table t1. +// 6. Verify none of the children tablets of i1 are added to cdc state table. +// 7. Verify both children tablets of table t1 have been added to cdc state table. +void CDCSDKYsqlTest::TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_cdcsdk_streamed_tables) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cleanup_split_tablets_interval_sec) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams( + 1, 1, false /* colocated */, false /* cdc_populate_safepoint_record */, + true /* set_pgsql_proxy_bind_address */)); + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + const auto tableName1 = "test_table_1"; + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName1)); + auto table1 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName1)); + google::protobuf::RepeatedPtrField table1_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table1_tablets.size(), 3); + + int num_inserts = 10; + for (int i = 0; i < num_inserts; i++) { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1, $2)", tableName1, i, i + 1)); + } + + xrepl::StreamId stream_id1 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Create an index AFTER the stream has been created + ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx1 ON $0(b ASC)", tableName1)); + auto idx1 = + ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, Format("$0_idx1", tableName1))); + // Wait for the bg thread to complete finding out new tables added in the namespace and add + // them to CDC stream if relevant. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + google::protobuf::RepeatedPtrField idx1_tablets; + ASSERT_OK(test_client()->GetTablets(idx1, 0, &idx1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(idx1_tablets.size(), 1); + + // Verify CDC stream metadata contains both table1 and the index table. + std::unordered_set expected_tables = {table1.table_id(), idx1.table_id()}; + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating an index creation after stream creation"); + + // Verify cdc state table contains entries from both table1 & index table. + std::unordered_set expected_tablets; + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + for (const auto& tablet : idx1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + LOG(INFO) << "Stream contains the user table as well as index"; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = false; + + ASSERT_OK(test_client()->FlushTables( + {idx1.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split the index's tablet. + WaitUntilSplitIsSuccesful(idx1_tablets.Get(0).tablet_id(), idx1, 2); + google::protobuf::RepeatedPtrField idx1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + idx1, 0, &idx1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(idx1_tablets_after_split.size(), 2); + + ASSERT_OK(test_client()->FlushTables( + {table1.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split the table1's tablet. + WaitUntilSplitIsSuccesful(table1_tablets.Get(0).tablet_id(), table1, 4); + google::protobuf::RepeatedPtrField table1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + table1, 0, &table1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(table1_tablets_after_split.size(), 4); + + // wait for sometime so that tablet split codepath has completed adding new cdc state entries. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + std::unordered_set new_expected_tablets_in_state_table; + for (const auto& tablet : table1_tablets_after_split) { + new_expected_tablets_in_state_table.insert(tablet.tablet_id()); + } + + std::unordered_set tablets_not_expected_in_state_table; + for (const auto& tablet : idx1_tablets_after_split) { + tablets_not_expected_in_state_table.insert(tablet.tablet_id()); + } + + CDCStateTable cdc_state_table(test_client()); + bool seen_unexpected_tablets = false; + Status s; + auto table_range = + ASSERT_RESULT(cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeAll(), &s)); + for (auto row_result : table_range) { + ASSERT_OK(row_result); + auto& row = *row_result; + + if (row.key.stream_id == stream_id1) { + LOG(INFO) << "Read cdc_state table with tablet_id: " << row.key.tablet_id + << " stream_id: " << row.key.stream_id; + if (new_expected_tablets_in_state_table.contains(row.key.tablet_id)) { + new_expected_tablets_in_state_table.erase(row.key.tablet_id); + } + + if (tablets_not_expected_in_state_table.contains(row.key.tablet_id)) { + seen_unexpected_tablets = true; + break; + } + } + } + + bool seen_all_expected_tablets = new_expected_tablets_in_state_table.size() == 0 ? true : false; + ASSERT_FALSE(seen_unexpected_tablets); + ASSERT_TRUE(seen_all_expected_tablets); + LOG(INFO) << "CDC State table does not contain the children tablets of index's split tablet"; +} + +TEST_F( + CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToNonConsistentSnapshotStream) { + TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsistentSnapshotStream) { + TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + /* use_consistent_snapshot_stream */ true); } } // namespace cdc diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index a2134ea6f191..4e74ebb707b2 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -2283,7 +2283,7 @@ namespace cdc { return (tablets_after_split.size() == expected_num_tablets); }, - MonoDelta::FromSeconds(120), "Tabelt Split not succesful")); + MonoDelta::FromSeconds(120), "Tablet Split not succesful")); } void CDCSDKYsqlTest::CheckTabletsInCDCStateTable( @@ -3695,5 +3695,53 @@ namespace cdc { } + Status CDCSDKYsqlTest::ExecuteYBAdminCommand( + const std::string& command_name, const std::vector& command_args) { + string tool_path = GetToolPath("../bin", "yb-admin"); + vector argv; + argv.push_back(tool_path); + argv.push_back("--master_addresses"); + argv.push_back(AsString(test_cluster_.mini_cluster_->GetMasterAddresses())); + argv.push_back(command_name); + for (const auto& command_arg : command_args) { + argv.push_back(command_arg); + } + + RETURN_NOT_OK(Subprocess::Call(argv)); + + return Status::OK(); + } + + Status CDCSDKYsqlTest::DisableDynamicTableAdditionOnCDCSDKStream( + const xrepl::StreamId& stream_id) { + std::string yb_admin_command = "disable_dynamic_table_addition_on_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + return Status::OK(); + } + + Status CDCSDKYsqlTest::RemoveUserTableFromCDCSDKStream( + const xrepl::StreamId& stream_id, const TableId& table_id) { + std::string yb_admin_command = "remove_user_table_from_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + command_args.push_back(table_id); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + + return Status::OK(); + } + + Status CDCSDKYsqlTest::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const xrepl::StreamId& stream_id) { + std::string yb_admin_command = + "validate_and_sync_cdc_state_table_entries_on_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + + return Status::OK(); + } + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 163819e7b01c..1ff1646cf6b7 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -136,6 +136,10 @@ DECLARE_bool(yb_enable_cdc_consistent_snapshot_streams); DECLARE_uint32(cdcsdk_tablet_not_of_interest_timeout_secs); DECLARE_uint32(cdcsdk_retention_barrier_no_revision_interval_secs); DECLARE_bool(TEST_cdcsdk_skip_processing_dynamic_table_addition); +DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); +DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal); +DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream); +DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream); namespace yb { @@ -647,7 +651,28 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { google::protobuf::RepeatedPtrField tablets, CDCSDKCheckpointPB checkpoint, GetChangesResponsePB* change_resp); - void TestNonUserTableShouldNotGetAddedToCDCStream (bool create_consistent_snapshot_stream); + void TestNonEligibleTableShouldNotGetAddedToCDCStream(bool create_consistent_snapshot_stream); + + Status ExecuteYBAdminCommand( + const std::string& command_name, const std::vector& command_args); + + Status DisableDynamicTableAdditionOnCDCSDKStream(const xrepl::StreamId& stream_id); + + void TestDisableOfDynamicTableAdditionOnCDCStream(bool use_consistent_snapshot_stream); + + Status RemoveUserTableFromCDCSDKStream(const xrepl::StreamId& stream_id, const TableId& table_id); + + void TestUserTableRemovalFromCDCStream(bool use_consistent_snapshot_stream); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream(const xrepl::StreamId& stream_id); + + void TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + bool use_consistent_snapshot_stream); + + void TestNonEligibleTableRemovalFromCDCStream(bool use_consistent_snapshot_stream); + + void TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + bool use_consistent_snapshot_stream); }; } // namespace cdc diff --git a/src/yb/master/catalog_entity_info.cc b/src/yb/master/catalog_entity_info.cc index fccad8905341..bc490e87e5ce 100644 --- a/src/yb/master/catalog_entity_info.cc +++ b/src/yb/master/catalog_entity_info.cc @@ -59,6 +59,7 @@ using std::string; using strings::Substitute; DECLARE_int32(tserver_unresponsive_timeout_ms); +DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); DEFINE_RUNTIME_AUTO_bool( use_parent_table_id_field, kLocalPersisted, false, true, @@ -1282,6 +1283,11 @@ const NamespaceId CDCStreamInfo::namespace_id() const { return LockForRead()->pb.namespace_id(); } +bool CDCStreamInfo::IsCDCSDKStream() const { + auto l = LockForRead(); + return l->pb.has_namespace_id() && !l->pb.namespace_id().empty(); +} + const ReplicationSlotName CDCStreamInfo::GetCdcsdkYsqlReplicationSlotName() const { auto l = LockForRead(); return ReplicationSlotName(l->pb.cdcsdk_ysql_replication_slot_name()); @@ -1293,6 +1299,16 @@ bool CDCStreamInfo::IsConsistentSnapshotStream() const { l->pb.cdcsdk_stream_metadata().has_consistent_snapshot_option(); } +bool CDCStreamInfo::IsDynamicTableAdditionDisabled() const { + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + return false; + } + + auto l = LockForRead(); + return l->pb.has_cdcsdk_disable_dynamic_table_addition() && + l->pb.cdcsdk_disable_dynamic_table_addition(); +} + std::string CDCStreamInfo::ToString() const { auto l = LockForRead(); if (l->pb.has_namespace_id()) { diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 03aefb35f834..2655438f79d4 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -1258,6 +1258,10 @@ class CDCStreamInfo : public RefCountedThreadSafe, bool IsConsistentSnapshotStream() const; + bool IsCDCSDKStream() const; + + bool IsDynamicTableAdditionDisabled() const; + std::string ToString() const override; private: diff --git a/src/yb/master/catalog_entity_info.proto b/src/yb/master/catalog_entity_info.proto index 4783cbfdff69..af4db43db75c 100644 --- a/src/yb/master/catalog_entity_info.proto +++ b/src/yb/master/catalog_entity_info.proto @@ -519,6 +519,11 @@ message SysCDCStreamEntryPB { optional string cdcsdk_ysql_replication_slot_name = 6; optional CDCSDKStreamEntryPB cdcsdk_stream_metadata = 7; optional uint64 stream_creation_time = 8; + + // Dynamic tables are the tables which are created after the creation of the stream. + // This field controls if dynamic tables should automatically be added to the CDC stream or not. + // If set to true, dynamic table wont get added to the CDC stream. + optional bool cdcsdk_disable_dynamic_table_addition = 11; } diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index b9cf2d7520c7..7715eccc064e 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -138,6 +138,7 @@ enum RaftGroupStatePB; namespace cdc { class CDCStateTable; +struct CDCStateTableEntry; } // namespace cdc namespace master { @@ -1311,6 +1312,18 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Status UpdateCDCStream( const UpdateCDCStreamRequestPB* req, UpdateCDCStreamResponsePB* resp, rpc::RpcContext* rpc); + Status DisableDynamicTableAdditionOnCDCSDKStream( + const DisableDynamicTableAdditionOnCDCSDKStreamRequestPB* req, + DisableDynamicTableAdditionOnCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + + Status RemoveUserTableFromCDCSDKStream( + const RemoveUserTableFromCDCSDKStreamRequestPB* req, + RemoveUserTableFromCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB* req, + ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + // Query if Bootstrapping is required for a CDC stream (e.g. Are we missing logs). Status IsBootstrapRequired( const IsBootstrapRequiredRequestPB* req, @@ -1439,15 +1452,27 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Find all CDCSDK streams which do not have metadata for the newly added tables. Status FindCDCSDKStreamsForAddedTables(TableStreamIdsMap* table_to_unprocessed_streams_map); - bool CanTableBeAddedToCDCSDKStream( - const TableInfoPtr& table_info, const Schema& schema) const REQUIRES_SHARED(mutex_); + // Find all CDCSDK streams that contain non eligible tables like indexes, mat views etc. in + // their metadata. + Status FindCDCSDKStreamsForNonEligibleTables(TableStreamIdsMap* non_user_tables_to_streams_map); + + bool IsTableEligibleForCDCSDKStream( + const TableInfoPtr& table_info, const std::optional& schema) const + REQUIRES_SHARED(mutex_); // This method compares all tables in the namespace to all the tables added to a CDCSDK stream, // to find tables which are not yet processed by the CDCSDK streams. void FindAllTablesMissingInCDCSDKStream( const xrepl::StreamId& stream_id, - const google::protobuf::RepeatedPtrField& table_ids, const NamespaceId& ns_id) - REQUIRES(mutex_); + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) REQUIRES(mutex_); + + // This method compares all tables in the namespace eligible for a CDCSDK stream to all the tables + // added to a CDCSDK stream, to find indexes / mat views that are part of the CDCSDK streams. + void FindAllNonEligibleTablesInCDCSDKStream( + const xrepl::StreamId& stream_id, + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) REQUIRES(mutex_); Status ValidateCDCSDKRequestProperties( const CreateCDCStreamRequestPB& req, const std::string& source_type_option_value, @@ -1463,6 +1488,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Find all the CDC streams that have been marked as DELETED. Status FindCDCStreamsMarkedAsDeleting(std::vector* streams); + Status RemoveNonEligibleTablesFromCDCSDKStreams( + const TableStreamIdsMap& non_user_tables_to_streams_map, const LeaderEpoch& epoch); + // Find all the CDC streams that have been marked as provided state. Status FindCDCStreamsMarkedForMetadataDeletion( std::vector* streams, SysCDCStreamEntryPB::State state); @@ -3018,6 +3046,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf, void RemoveTableFromCDCSDKUnprocessedMap(const TableId& table_id, const NamespaceId& ns_id); + void RemoveTableFromCDCSDKNonEligibleTableMap(const TableId& table_id, const NamespaceId& ns_id); + void ClearXReplState() REQUIRES(mutex_); Status LoadXReplStream() REQUIRES(mutex_); Status LoadUniverseReplication() REQUIRES(mutex_); @@ -3103,6 +3133,14 @@ class CatalogManager : public tserver::TabletPeerLookupIf, void ValidateIndexTablesPostLoad(std::unordered_map&& indexes_map, TableIdSet* tables_to_persist) EXCLUDES(mutex_); + Result> UpdateCheckpointForTabletEntriesInCDCState( + const xrepl::StreamId& stream_id, + const std::unordered_set& tables_in_stream_metadata, + const TableId& table_to_be_removed = ""); + + Status RemoveTableFromCDCStreamMetadataAndMaps( + const CDCStreamInfoPtr stream, const TableId table_id); + // Should be bumped up when tablet locations are changed. std::atomic tablet_locations_version_{0}; @@ -3165,6 +3203,13 @@ class CatalogManager : public tserver::TabletPeerLookupIf, std::unordered_map> namespace_to_cdcsdk_unprocessed_table_map_ GUARDED_BY(cdcsdk_unprocessed_table_mutex_); + mutable MutexType cdcsdk_non_eligible_table_mutex_; + // In-memory map containing non-eligble tables like indexes/ materialized views which got added to + // CDCSDK stream's metadata. Will be refreshed on master restart / leadership change through the + // function: 'FindAllNonEligibleTablesInCDCSDKStream'. + std::unordered_map> + namespace_to_cdcsdk_non_eligible_table_map_ GUARDED_BY(cdcsdk_non_eligible_table_mutex_); + // Map of tables -> set of cdc streams they are producers for. std::unordered_map> xcluster_producer_tables_to_stream_map_ GUARDED_BY(mutex_); diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index 944826929b1a..12a49e4f734f 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -83,6 +83,7 @@ DEFINE_test_flag(bool, cdcsdk_skip_processing_dynamic_table_addition, false, DECLARE_bool(enable_ysql); DECLARE_bool(TEST_echo_service_enabled); +DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream); namespace yb { namespace master { @@ -289,6 +290,31 @@ void CatalogManagerBgTasks::Run() { } } + { + if (FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) { + // Find if there are any non eligible tables (indexes, mat views) present in cdcsdk + // stream that are not associated with a replication slot. + TableStreamIdsMap non_user_tables_to_streams_map; + // In case of master leader restart or leadership changes, we would have scanned all + // streams (without replication slot) in ACTIVE/DELETING METADATA state for non eligible + // tables and marked such tables for removal in + // namespace_to_cdcsdk_non_eligible_table_map_. + Status s = catalog_manager_->FindCDCSDKStreamsForNonEligibleTables( + &non_user_tables_to_streams_map); + + if (s.ok() && !non_user_tables_to_streams_map.empty()) { + s = catalog_manager_->RemoveNonEligibleTablesFromCDCSDKStreams( + non_user_tables_to_streams_map, l.epoch()); + } + if (!s.ok()) { + YB_LOG_EVERY_N(WARNING, 10) + << "Encountered failure while trying to remove non eligible " + "tables from cdc_state table: " + << s.ToString(); + } + } + } + // Ensure the master sys catalog tablet follows the cluster's affinity specification. if (FLAGS_sys_catalog_respect_affinity_task) { Status s = catalog_manager_->SysCatalogRespectLeaderAffinity(); diff --git a/src/yb/master/master_replication.proto b/src/yb/master/master_replication.proto index bd9347f747d9..8fa75e49061a 100644 --- a/src/yb/master/master_replication.proto +++ b/src/yb/master/master_replication.proto @@ -36,6 +36,11 @@ message CDCStreamInfoPB { optional uint64 cdcsdk_consistent_snapshot_time = 7; optional CDCSDKSnapshotOption cdcsdk_consistent_snapshot_option = 8; optional uint64 stream_creation_time = 9; + + // Dynamic tables are the tables which are created after the creation of the stream. + // This field controls if dynamic tables should automatically be added to the CDC stream or not. + // If set to true, dynamic table wont get added to the CDC stream. + optional bool cdcsdk_disable_dynamic_table_addition = 12; } message ValidateReplicationInfoRequestPB { @@ -498,6 +503,32 @@ message XClusterReportNewAutoFlagConfigVersionResponsePB { optional MasterErrorPB error = 1; } +message DisableDynamicTableAdditionOnCDCSDKStreamRequestPB { + optional string stream_id = 1; +} + +message DisableDynamicTableAdditionOnCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; +} + +message RemoveUserTableFromCDCSDKStreamRequestPB { + optional string stream_id = 1; + optional string table_id = 2; +} + +message RemoveUserTableFromCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; +} + +message ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB { + optional string stream_id = 1; +} + +message ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; + repeated string updated_tablet_entries = 2; +} + service MasterReplication { option (yb.rpc.custom_service_name) = "yb.master.MasterService"; @@ -563,4 +594,15 @@ service MasterReplication { rpc XClusterReportNewAutoFlagConfigVersion( XClusterReportNewAutoFlagConfigVersionRequestPB) returns (XClusterReportNewAutoFlagConfigVersionResponsePB); + + // Introduced for bug (#22876, #22773) + rpc DisableDynamicTableAdditionOnCDCSDKStream (DisableDynamicTableAdditionOnCDCSDKStreamRequestPB) + returns (DisableDynamicTableAdditionOnCDCSDKStreamResponsePB); + // Introduced for bug (#22876, #22773) + rpc RemoveUserTableFromCDCSDKStream (RemoveUserTableFromCDCSDKStreamRequestPB) + returns (RemoveUserTableFromCDCSDKStreamResponsePB); + // Introduced for bug (#22876, #22773) + rpc ValidateAndSyncCDCStateEntriesForCDCSDKStream( + ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB) + returns (ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB); } diff --git a/src/yb/master/master_replication_service.cc b/src/yb/master/master_replication_service.cc index b152f6c9149c..f3bcc73cc929 100644 --- a/src/yb/master/master_replication_service.cc +++ b/src/yb/master/master_replication_service.cc @@ -56,6 +56,9 @@ class MasterReplicationServiceImpl : public MasterServiceBase, public MasterRepl (GetTableSchemaFromSysCatalog) (ChangeXClusterRole) (BootstrapProducer) + (DisableDynamicTableAdditionOnCDCSDKStream) + (RemoveUserTableFromCDCSDKStream) + (ValidateAndSyncCDCStateEntriesForCDCSDKStream) ) MASTER_SERVICE_IMPL_ON_LEADER_WITH_LOCK( diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index bd3ebc15f2e7..94b9361b8f42 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -135,6 +135,40 @@ DEFINE_test_flag(bool, fail_universe_replication_merge, false, "Causes MergeUniv DEFINE_test_flag(bool, xcluster_fail_setup_stream_update, false, "Fail UpdateCDCStream RPC call"); +DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_dynamic_tables_disable_option, + kLocalPersisted, + false, + true, + "This flag needs to be true in order to disable addition of dynamic tables " + "to CDC stream. This flag is required to be to true for execution of " + "yb-admin commands - " + "\'disable_dynamic_table_addition_on_change_data_stream\', " + "\'remove_user_table_from_change_data_stream\'"); +TAG_FLAG(cdcsdk_enable_dynamic_tables_disable_option, advanced); +TAG_FLAG(cdcsdk_enable_dynamic_tables_disable_option, hidden); + +DEFINE_test_flag(bool, cdcsdk_skip_updating_cdc_state_entries_on_table_removal, false, + "Skip updating checkpoint to max for cdc state table entries while removing a user table from " + "CDCSDK stream."); + +DEFINE_test_flag(bool, cdcsdk_add_indexes_to_stream, false, "Allows addition of index to a stream"); + +DEFINE_RUNTIME_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream, false, + "When enabled, all CDCSDK streams will be scanned for non-eligible tables like indexes, " + "materialised view etc. in their stream metadata and these tables will be marked for removal " + "by catalog manager background thread."); + +DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_identification_of_non_eligible_tables, + kLocalPersisted, + false, + true, + "This flag, when true, identifies all non-eligible tables that are part of" + " a CDC stream metadata while loading the CDC streams on a master " + "restart/leadership change. This identification happens on all CDC " + "streams in the universe"); +TAG_FLAG(cdcsdk_enable_identification_of_non_eligible_tables, advanced); +TAG_FLAG(cdcsdk_enable_identification_of_non_eligible_tables, hidden); + DECLARE_bool(xcluster_wait_on_ddl_alter); DECLARE_int32(master_rpc_timeout_ms); DECLARE_bool(enable_xcluster_auto_flag_validation); @@ -287,8 +321,17 @@ class CDCStreamLoader : public Visitor { if ((metadata.state() == SysCDCStreamEntryPB::ACTIVE || metadata.state() == SysCDCStreamEntryPB::DELETING_METADATA) && ns && ns->state() == SysNamespaceEntryPB::RUNNING) { + auto eligible_tables_info = catalog_manager_->FindAllTablesForCDCSDK(metadata.namespace_id()); catalog_manager_->FindAllTablesMissingInCDCSDKStream( - stream_id, metadata.table_id(), metadata.namespace_id()); + stream_id, metadata.table_id(), eligible_tables_info); + + // Check for any non-eligible tables like indexes, matview etc in CDC stream only if the + // stream is not associated with a replication slot. + if (FLAGS_cdcsdk_enable_identification_of_non_eligible_tables && + stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + catalog_manager_->FindAllNonEligibleTablesInCDCSDKStream( + stream_id, metadata.table_id(), eligible_tables_info); + } } LOG(INFO) << "Loaded metadata for CDC stream " << stream->ToString() << ": " @@ -1544,6 +1587,11 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( continue; } + // skip streams on which dynamic table addition is disabled. + if(stream_info->IsDynamicTableAdditionDisabled()) { + continue; + } + auto const unprocessed_tables = FindOrNull(namespace_to_unprocessed_table_map, stream_info->namespace_id()); if (!unprocessed_tables) { @@ -1567,7 +1615,7 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( continue; } - if (!CanTableBeAddedToCDCSDKStream(table, schema)) { + if (!IsTableEligibleForCDCSDKStream(table, schema)) { RemoveTableFromCDCSDKUnprocessedMap(unprocessed_table_id, stream_info->namespace_id()); continue; } @@ -1597,7 +1645,8 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( void CatalogManager::FindAllTablesMissingInCDCSDKStream( const xrepl::StreamId& stream_id, - const google::protobuf::RepeatedPtrField& table_ids, const NamespaceId& ns_id) { + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) { std::unordered_set stream_table_ids; // Store all table_ids associated with the stram in 'stream_table_ids'. for (const auto& table_id : table_ids) { @@ -1607,7 +1656,7 @@ void CatalogManager::FindAllTablesMissingInCDCSDKStream( // Get all the tables associated with the namespace. // If we find any table present only in the namespace, but not in the stream, we add the table // id to 'cdcsdk_unprocessed_tables'. - for (const auto& table_info : FindAllTablesForCDCSDK(ns_id)) { + for (const auto& table_info : eligible_tables_info) { auto ltm = table_info->LockForRead(); if (!stream_table_ids.contains(table_info->id())) { LOG(INFO) << "Found unprocessed table: " << table_info->id() @@ -1619,6 +1668,118 @@ void CatalogManager::FindAllTablesMissingInCDCSDKStream( } } +Status CatalogManager::FindCDCSDKStreamsForNonEligibleTables( + TableStreamIdsMap* non_user_tables_to_streams_map) { + std::unordered_map> namespace_to_non_user_table_map; + { + SharedLock lock(cdcsdk_non_eligible_table_mutex_); + int32_t found_non_user_tables = 0; + for (const auto& [ns_id, table_ids] : namespace_to_cdcsdk_non_eligible_table_map_) { + for (const auto& table_id : table_ids) { + namespace_to_non_user_table_map[ns_id].insert(table_id); + if (++found_non_user_tables >= FLAGS_cdcsdk_table_processing_limit_per_run) { + break; + } + } + } + } + + { + SharedLock lock(mutex_); + for (const auto& [stream_id, stream_info] : cdc_stream_map_) { + if (stream_info->namespace_id().empty()) { + continue; + } + + // Removal of non-eligible tables will only be done on CDC stream that are not associated with + // a replication slot. + if (!stream_info->GetCdcsdkYsqlReplicationSlotName().empty()) { + continue; + } + + const auto non_user_tables = + FindOrNull(namespace_to_non_user_table_map, stream_info->namespace_id()); + if (!non_user_tables) { + continue; + } + + auto ltm = stream_info->LockForRead(); + if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE || + ltm->pb.state() == SysCDCStreamEntryPB::DELETING_METADATA) { + for (const auto& non_user_table_id : *non_user_tables) { + auto table = tables_->FindTableOrNull(non_user_table_id); + if (!table) { + LOG_WITH_FUNC(WARNING) + << "Table " << non_user_table_id << " deleted before it could be removed"; + continue; + } + + if (std::find(ltm->table_id().begin(), ltm->table_id().end(), non_user_table_id) != + ltm->table_id().end()) { + (*non_user_tables_to_streams_map)[non_user_table_id].push_back(stream_info); + VLOG(1) << "Will try and remove table: " << non_user_table_id + << ", from stream: " << stream_info->id(); + } + } + } + } + } + + for (const auto& [ns_id, non_user_table_ids] : namespace_to_non_user_table_map) { + for (const auto& non_user_table_id : non_user_table_ids) { + if (!non_user_tables_to_streams_map->contains(non_user_table_id)) { + // This means we found no active CDCSDK stream where this table was present, hence we can + // remove this table from 'namespace_to_cdcsdk_non_eligible_table_map_'. + RemoveTableFromCDCSDKNonEligibleTableMap(non_user_table_id, ns_id); + } + } + } + + return Status::OK(); +} + +void CatalogManager::FindAllNonEligibleTablesInCDCSDKStream( + const xrepl::StreamId& stream_id, + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) { + // If we find any table present only in the the stream, but not in the list of eligible tables in + // namespace for CDC, we add the table id to 'namespace_to_cdcsdk_non_eligible_table_map_'. + std::unordered_set user_table_ids; + for (const auto& table_info : eligible_tables_info) { + user_table_ids.insert(table_info->id()); + } + + std::unordered_set stream_table_ids; + // Store all table_ids associated with the stream in 'stream_table_ids'. + for (const auto& table_id : table_ids) { + if (!user_table_ids.contains(table_id)) { + auto table_info = GetTableInfoUnlocked(table_id); + Schema schema; + Status status = table_info->GetSchema(&schema); + if (!status.ok()) { + LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name(); + // Skip this table for now, it will be revisited for removal on master restart/master leader + // change. + continue; + } + + // Re-confirm this table is not meant to be part of a CDC stream. + if (!IsTableEligibleForCDCSDKStream(table_info, schema)) { + LOG(INFO) << "Found a non-eligible table: " << table_info->id() + << ", for stream: " << stream_id; + LockGuard lock(cdcsdk_non_eligible_table_mutex_); + namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert( + table_info->id()); + } else { + // Ideally we are not expected to enter the else clause. + LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id + << " that is not present in the eligible list of tables " + "from the namespace for CDC"; + } + } + } +} + Status CatalogManager::ValidateCDCSDKRequestProperties( const CreateCDCStreamRequestPB& req, const std::string& source_type_option_value, const std::string& id_type_option_value) { @@ -1662,7 +1823,7 @@ std::vector CatalogManager::FindAllTablesForCDCSDK(const Namespace } } - if (!CanTableBeAddedToCDCSDKStream(table_info.get(), schema)) { + if (!IsTableEligibleForCDCSDKStream(table_info.get(), schema)) { continue; } @@ -1672,20 +1833,29 @@ std::vector CatalogManager::FindAllTablesForCDCSDK(const Namespace return tables; } -bool CatalogManager::CanTableBeAddedToCDCSDKStream( - const TableInfoPtr& table_info, const Schema& schema) const { - bool has_pk = true; - for (const auto& col : schema.columns()) { - if (col.order() == static_cast(PgSystemAttrNum::kYBRowId)) { - // ybrowid column is added for tables that don't have user-specified primary key. - VLOG(1) << "Table: " << table_info->id() - << ", will not be added to CDCSDK stream, since it does not have a primary key"; - has_pk = false; - break; +bool CatalogManager::IsTableEligibleForCDCSDKStream( + const TableInfoPtr& table_info, const std::optional& schema) const { + if (schema.has_value()) { + bool has_pk = true; + for (const auto& col : schema->columns()) { + if (col.order() == static_cast(PgSystemAttrNum::kYBRowId)) { + // ybrowid column is added for tables that don't have user-specified primary key. + VLOG(1) << "Table: " << table_info->id() + << ", will not be added to CDCSDK stream, since it does not have a primary key"; + has_pk = false; + break; + } + } + if (!has_pk) { + return false; } } - if (!has_pk) { - return false; + + if (FLAGS_TEST_cdcsdk_add_indexes_to_stream) { + // allow adding user created indexes to CDC stream. + if (IsUserIndexUnlocked(*table_info)) { + return true; + } } if (IsMatviewTable(*table_info)) { @@ -1829,6 +1999,82 @@ Status CatalogManager::ProcessNewTablesForCDCSDKStreams( return Status::OK(); } +Status CatalogManager::RemoveNonEligibleTablesFromCDCSDKStreams( + const TableStreamIdsMap& non_user_tables_to_streams_map, const LeaderEpoch& epoch) { + int32_t removed_non_user_tables = 0; + for (const auto& [table_id, streams] : non_user_tables_to_streams_map) { + if (removed_non_user_tables >= FLAGS_cdcsdk_table_processing_limit_per_run) { + VLOG(1) + << "Reached the limit of number of non-eligible tables to be removed per iteration. Will " + "remove the remaining tables in the next iteration."; + break; + } + + // Delete the table from all streams now. + NamespaceId namespace_id; + bool stream_pending = false; + Status status; + for (const auto& stream : streams) { + if PREDICT_FALSE (stream == nullptr) { + LOG(WARNING) << "Could not find CDC stream: " << stream->id(); + continue; + } + + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + for(const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + // Explicitly remove the table from the set since we want to remove the tablet entries of this + // table from the cdc state table. + tables_in_stream_metadata.erase(table_id); + auto result = UpdateCheckpointForTabletEntriesInCDCState( + stream->StreamId(), tables_in_stream_metadata, table_id); + + if (!result.ok()) { + LOG(WARNING) << "Encountered error while trying to update/delete tablets entries of table: " + << table_id << ", from cdc_state table for stream" << stream->id() << ": " + << status; + stream_pending = true; + continue; + } + + { + auto stream_lock = stream->LockForWrite(); + if (stream_lock->is_deleting()) { + continue; + } + } + + Status status = RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id); + if (!status.ok()) { + LOG(WARNING) << "Encountered error while trying to remove non-eligible table " << table_id + << " from metadata of stream " << stream->StreamId() << " and maps. "; + stream_pending = true; + continue; + } + + LOG(INFO) + << "Succesfully removed non-eligible table " << table_id + << " from stream metadata and updated corresponding cdc_state table entries for stream: " + << stream->id(); + + namespace_id = stream->namespace_id(); + } + + // Remove non_user tables from 'namespace_to_cdcsdk_non_user_table_map_'. + if (!stream_pending) { + RemoveTableFromCDCSDKNonEligibleTableMap(table_id, namespace_id); + } + ++removed_non_user_tables; + } + + return Status::OK(); +} + void CatalogManager::RemoveTableFromCDCSDKUnprocessedMap( const TableId& table_id, const NamespaceId& ns_id) { LockGuard lock(cdcsdk_unprocessed_table_mutex_); @@ -1841,6 +2087,20 @@ void CatalogManager::RemoveTableFromCDCSDKUnprocessedMap( } } +void CatalogManager::RemoveTableFromCDCSDKNonEligibleTableMap( + const TableId& table_id, const NamespaceId& ns_id) { + LockGuard lock(cdcsdk_non_eligible_table_mutex_); + auto non_user_tables = FindOrNull(namespace_to_cdcsdk_non_eligible_table_map_, ns_id); + if (!non_user_tables) { + return; + } + + non_user_tables->erase(table_id); + if (non_user_tables->empty()) { + namespace_to_cdcsdk_non_eligible_table_map_.erase(ns_id); + } +} + Status CatalogManager::FindCDCStreamsMarkedAsDeleting(std::vector* streams) { return FindCDCStreamsMarkedForMetadataDeletion(streams, SysCDCStreamEntryPB::DELETING); } @@ -2214,6 +2474,12 @@ Status CatalogManager::GetCDCStream( stream_info->set_stream_creation_time(stream_lock->pb.stream_creation_time()); } + if (FLAGS_cdcsdk_enable_dynamic_tables_disable_option && + stream_lock->pb.has_cdcsdk_disable_dynamic_table_addition()) { + stream_info->set_cdcsdk_disable_dynamic_table_addition( + stream_lock->pb.cdcsdk_disable_dynamic_table_addition()); + } + return Status::OK(); } @@ -2339,6 +2605,11 @@ Status CatalogManager::ListCDCStreams( stream->set_stream_creation_time(ltm->pb.stream_creation_time()); } + if (FLAGS_cdcsdk_enable_dynamic_tables_disable_option && + ltm->pb.has_cdcsdk_disable_dynamic_table_addition()) { + stream->set_cdcsdk_disable_dynamic_table_addition( + ltm->pb.cdcsdk_disable_dynamic_table_addition()); + } } return Status::OK(); } @@ -4017,6 +4288,24 @@ Status CatalogManager::UpdateCDCProducerOnTabletSplit( std::vector streams; std::vector entries; for (const auto stream_type : {cdc::XCLUSTER, cdc::CDCSDK}) { + if (stream_type == cdc::CDCSDK && + FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) { + const auto& table_info = GetTableInfo(producer_table_id); + // Skip adding children tablet entries in cdc state if the table is an index or a mat view. + // These tables, if present in CDC stream, are anyway going to be removed by a bg thread. This + // check ensures even if there is a race condition where a tablet of a non-eligible table + // splits and concurrently we are removing such tables from stream, the child tables do not + // get added. + { + SharedLock lock(mutex_); + if (!IsTableEligibleForCDCSDKStream(table_info, std::nullopt)) { + LOG(INFO) << "Skipping adding children tablets to cdc state for table " + << producer_table_id << " as it is not meant to part of a CDC stream"; + continue; + } + } + } + { SharedLock lock(mutex_); streams = FindCDCStreamsForTableUnlocked(producer_table_id, stream_type); @@ -5869,6 +6158,231 @@ Status CatalogManager::GetReplicationStatus( return Status::OK(); } +Status CatalogManager::DisableDynamicTableAdditionOnCDCSDKStream( + const DisableDynamicTableAdditionOnCDCSDKStreamRequestPB* req, + DisableDynamicTableAdditionOnCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing DisableDynamicTableAdditionOnCDCSDKStream request from " + << RequestorString(rpc) << ": " << req->ShortDebugString(); + + if (!req->has_stream_id()) { + RETURN_INVALID_REQUEST_STATUS("CDC Stream ID must be provided", (*req)); + } + + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + RETURN_INVALID_REQUEST_STATUS( + "Disabling addition of dynamic tables to CDC stream is disallowed in the middle of an " + "upgrade. Finalize the upgrade and try again", (*req)); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream", (*req)); + } + + // We only want to allow disabling dynamic table addition on older streams that are not associated + // with a replication slot. + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot disable dynamic table addition on CDC streams associated with a replication slot", + (*req)); + } + + if (stream->IsDynamicTableAdditionDisabled()) { + return STATUS(AlreadyPresent, "Dynamic table addition already disabled on the CDC stream"); + } + + // Disable dynamic table addition by setting the stream metadata field to true. + { + auto stream_lock = stream->LockForWrite(); + auto& pb = stream_lock.mutable_data()->pb; + + pb.set_cdcsdk_disable_dynamic_table_addition(true); + + RETURN_ACTION_NOT_OK( + sys_catalog_->Upsert(leader_ready_term(), stream), "Updating CDC stream in system catalog"); + + stream_lock.Commit(); + } + + LOG_WITH_FUNC(INFO) << "Successfully disabled dynamic table addition on CDC stream: " + << stream_id; + + return Status::OK(); +} + +Status CatalogManager::RemoveUserTableFromCDCSDKStream( + const RemoveUserTableFromCDCSDKStreamRequestPB* req, + RemoveUserTableFromCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing RemoveUserTableFromCDCSDKStream request from " << RequestorString(rpc) + << ": " << req->ShortDebugString(); + + if (!req->has_stream_id() || !req->has_table_id()) { + RETURN_INVALID_REQUEST_STATUS("Both CDC Stream ID and table ID must be provided", (*req)); + } + + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + RETURN_INVALID_REQUEST_STATUS( + "Removal of user table from CDC stream is disallowed in the middle of an " + "upgrade. Finalize the upgrade and try again", (*req)); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + auto table_id = req->table_id(); + + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream", (*req)); + } + + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot remove table from CDC streams that are associated with a replication slot", (*req)); + } + + if (!stream->IsDynamicTableAdditionDisabled()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot remove table unless dynamic table addition is disabled for the stream. Please use " + "the yb-admin command \"disable_dynamic_table_addition_on_change_data_stream\" to disable " + "dynamic table addition on the stream.", (*req)); + } + + auto stream_ns_id = stream->LockForRead()->namespace_id(); + + scoped_refptr table; + { + SharedLock lock(mutex_); + table = tables_->FindTableOrNull(table_id); + } + + if (table == nullptr || table->LockForRead()->is_deleting()) { + return STATUS(NotFound, "Could not find table", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + Schema schema; + Status status = table->GetSchema(&schema); + if (!status.ok()) { + return STATUS(InternalError, Format("Error while getting schema for table: $0", table->name())); + } + + { + SharedLock lock(mutex_); + if (!IsTableEligibleForCDCSDKStream(table, schema)) { + RETURN_INVALID_REQUEST_STATUS( + "Only allowed to remove user tables from CDC streams via this command.", (*req)); + } + } + + auto table_ns_id = table->LockForRead()->namespace_id(); + if (table_ns_id != stream_ns_id) { + RETURN_INVALID_REQUEST_STATUS("Stream and Table are not under the same namespace", (*req)); + } + + if (!FLAGS_TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal) { + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + for (const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + // Explicitly remove the table from the set since we want to remove the tablet entries of this + // table from the cdc state table. + tables_in_stream_metadata.erase(table_id); + RETURN_NOT_OK_PREPEND( + UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata, table_id), + "Error updating/deleting tablet entries from cdc state table"); + } + + // Now remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist + // the updated metadata. + RETURN_NOT_OK_PREPEND( + RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id), + "Error removing table from stream metadata and maps"); + + LOG_WITH_FUNC(INFO) << "Successfully removed table " << table_id + << " from CDC stream: " << stream_id + << " and updated/deleted corresponding cdc state table entries."; + + return Status::OK(); +} + +Status CatalogManager::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB* req, + ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing ValidateAndSyncCDCStateEntriesForCDCSDKStream request from " + << RequestorString(rpc) << ": " << req->ShortDebugString(); + + if (!req->has_stream_id()) { + RETURN_INVALID_REQUEST_STATUS("CDC Stream ID must be provided", (*req)); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream", (*req)); + } + + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot validate and sync cdc state table entries for CDC streams that are associated with " + "a replication slot", (*req)); + } + + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + tables_in_stream_metadata.reserve(stream_lock->table_id().size()); + for (const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + auto updated_state_table_entries = VERIFY_RESULT( + UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata)); + + for (const auto& entry : updated_state_table_entries) { + resp->add_updated_tablet_entries(entry.key.tablet_id); + } + + LOG_WITH_FUNC(INFO) + << "Successfully validated and synced cdc state table entries for CDC stream: " << stream_id; + + return Status::OK(); +} + Status CatalogManager::TEST_CDCSDKFailCreateStreamRequestIfNeeded(const std::string& sync_point) { bool fail_create_cdc_stream_request = false; TEST_SYNC_POINT_CALLBACK(sync_point, &fail_create_cdc_stream_request); @@ -7434,5 +7948,149 @@ Status CatalogManager::XClusterRefreshLocalAutoFlagConfig(const LeaderEpoch& epo return Status::OK(); } +Result> +CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( + const xrepl::StreamId& stream_id, const std::unordered_set& tables_in_stream_metadata, + const TableId& table_to_be_removed) { + std::unordered_set tablet_entries_to_be_removed; + + // This will only contain entries for colocated tables that have a composite value in the + // stream_id column i.e. in the form of stream_id_table_id. Such entries will have to be directly + // deleted as UpdatePeersAndMetrics ignores these entries. + std::vector cdc_state_entries_to_be_deleted; + + // If the table_id to be removed is provided, we will only find out cdc state table entries + // corresponding to this table and update their checkpoints. Otherwise, we'll consider all state + // table entries for checkpoint update. + if (!table_to_be_removed.empty()) { + scoped_refptr table; + { + SharedLock lock(mutex_); + table = tables_->FindTableOrNull(table_to_be_removed); + } + + // First we'll update the checkpoint to OpId max for all the cdc state entries correponding to + // the table. Therefore, get all the tablets for the table to be removed. + TabletInfos tablets; + tablets = table->GetTablets(IncludeInactive::kTrue); + + for (const auto& tablet : tablets) { + tablet_entries_to_be_removed.insert(tablet->tablet_id()); + } + } + + Status iteration_status; + auto all_entry_keys = + VERIFY_RESULT(cdc_state_table_->GetTableRange({} /* just key columns */, &iteration_status)); + std::vector entries_to_update; + // Get all the tablet, stream pairs from cdc_state for the given stream. + std::vector cdc_state_tablet_entries; + for (const auto& entry_result : all_entry_keys) { + RETURN_NOT_OK(entry_result); + const auto& entry = *entry_result; + + if (entry.key.stream_id == stream_id) { + // For updating the checkpoint, only consider entries that do not have a colocated table_id as + // these will be manually deleted. + if (entry.key.colocated_table_id.empty()) { + // If table_id is provided, filter out state entries belonging to tablets of the table. + if (table_to_be_removed.empty() || + tablet_entries_to_be_removed.contains(entry.key.tablet_id)) { + cdc_state_tablet_entries.push_back(entry.key.tablet_id); + } + } else if (entry.key.colocated_table_id == table_to_be_removed) { + // If the entry contain a colocated_table_id, it belongs to one of the colocated + // tables on that tablet. If this colocated_table_id matches with the table being removed, + // then we'll delete this entry directly. + cdc_state_entries_to_be_deleted.push_back(entry.key); + } + } + } + RETURN_NOT_OK(iteration_status); + + // Get the tablet info for state table entries of the stream. + auto tablet_infos = GetTabletInfos(cdc_state_tablet_entries); + + // For each state table entry present in cdc_state_tablet_entries, verify that the tablet's table + // is present in the CDC stream metadata. If not, update checkpoint of such tablet entries to + // OpId::Max. For colocated tables, even if one of the colocated table is present in the CDC + // stream metadata, skip updating the checkpoint for that tablet, stream pair. + for (const auto& tablet_info : tablet_infos) { + bool table_found = false; + for (const auto& table_id : tablet_info->GetTableIds()) { + if (tables_in_stream_metadata.contains(table_id)) { + table_found = true; + } + } + + if (!table_found) { + cdc::CDCStateTableEntry update_entry(tablet_info->tablet_id(), stream_id); + update_entry.checkpoint = OpId::Max(); + entries_to_update.emplace_back(std::move(update_entry)); + LOG_WITH_FUNC(INFO) + << "Setting checkpoint to OpId::Max() for cdc state table entry (tablet,stream) - " + << update_entry.ToString(); + } + } + + if (!entries_to_update.empty()) { + LOG_WITH_FUNC(INFO) << "Setting checkpoint to max for " << entries_to_update.size() + << " cdc state entries for CDC stream: " << stream_id; + RETURN_NOT_OK_PREPEND( + cdc_state_table_->UpdateEntries(entries_to_update), + "Error setting checkpoint to OpId::Max() in cdc_state table"); + } + + if (!cdc_state_entries_to_be_deleted.empty()) { + // Only 1 entry is expected for a colocated table that is being removed. + RSTATUS_DCHECK( + cdc_state_entries_to_be_deleted.size() == 1, IllegalState, + "Found more than one cdc state table entry that needs to be deleted for removing table $0 " + "from CDC stream $1", + table_to_be_removed, stream_id); + + LOG_WITH_FUNC(INFO) << "Deleting cdc state table entry (tablet, stream, table) - " + << cdc_state_entries_to_be_deleted[0].ToString(); + RETURN_NOT_OK_PREPEND( + cdc_state_table_->DeleteEntries(cdc_state_entries_to_be_deleted), + "Error deleting entries from cdc_state table"); + } + + return entries_to_update; +} + +Status CatalogManager::RemoveTableFromCDCStreamMetadataAndMaps( + const CDCStreamInfoPtr stream, const TableId table_id) { + // Remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist + // the updated metadata. + { + auto ltm = stream->LockForWrite(); + bool need_to_update_stream = false; + + auto table_id_iter = std::find(ltm->table_id().begin(), ltm->table_id().end(), table_id); + if (table_id_iter != ltm->table_id().end()) { + need_to_update_stream = true; + ltm.mutable_data()->pb.mutable_table_id()->erase(table_id_iter); + } + + if (need_to_update_stream) { + RETURN_ACTION_NOT_OK( + sys_catalog_->Upsert(leader_ready_term(), stream), + "Updating CDC streams in system catalog"); + } + + ltm.Commit(); + + if (need_to_update_stream) { + { + LockGuard lock(mutex_); + cdcsdk_tables_to_stream_map_[table_id].erase(stream->StreamId()); + } + } + } + + return Status::OK(); +} + } // namespace master } // namespace yb diff --git a/src/yb/tools/yb-admin_cli.cc b/src/yb/tools/yb-admin_cli.cc index e6e4d16c651c..53cceb4d2ffc 100644 --- a/src/yb/tools/yb-admin_cli.cc +++ b/src/yb/tools/yb-admin_cli.cc @@ -1846,6 +1846,50 @@ Status get_change_data_stream_info_action( return Status::OK(); } +const auto disable_dynamic_table_addition_on_change_data_stream_args = ""; +Status disable_dynamic_table_addition_on_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 1) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + string msg = Format("Failed to disable dynamic table addition on CDC stream $0", stream_id); + + RETURN_NOT_OK_PREPEND(client->DisableDynamicTableAdditionOnCDCSDKStream(stream_id), msg); + return Status::OK(); +} + +const auto remove_user_table_from_change_data_stream_args = " "; +Status remove_user_table_from_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 2) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + const string table_id = args[1]; + string msg = Format("Failed to remove table $0 from CDC stream $1", table_id, stream_id); + + RETURN_NOT_OK_PREPEND(client->RemoveUserTableFromCDCSDKStream(stream_id, table_id), msg); + return Status::OK(); +} + +const auto validate_and_sync_cdc_state_table_entries_on_change_data_stream_args = ""; +Status validate_and_sync_cdc_state_table_entries_on_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 1) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + string msg = + Format("Failed to validate and sync cdc state table entries for CDC stream $0", stream_id); + + RETURN_NOT_OK_PREPEND(client->ValidateAndSyncCDCStateEntriesForCDCSDKStream(stream_id), msg); + return Status::OK(); +} + const auto setup_universe_replication_args = " " " [] " @@ -2251,6 +2295,9 @@ void ClusterAdminCli::RegisterCommandHandlers() { REGISTER_COMMAND(list_cdc_streams); REGISTER_COMMAND(list_change_data_streams); REGISTER_COMMAND(get_change_data_stream_info); + REGISTER_COMMAND(disable_dynamic_table_addition_on_change_data_stream); + REGISTER_COMMAND(remove_user_table_from_change_data_stream); + REGISTER_COMMAND(validate_and_sync_cdc_state_table_entries_on_change_data_stream); REGISTER_COMMAND(setup_universe_replication); REGISTER_COMMAND(delete_universe_replication); REGISTER_COMMAND(alter_universe_replication); diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index 158b5f557c4a..97fe6bc32a2c 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -3915,6 +3915,84 @@ Status ClusterAdminClient::GetCDCDBStreamInfo(const std::string& db_stream_id) { return Status::OK(); } +Status ClusterAdminClient::DisableDynamicTableAdditionOnCDCSDKStream(const std::string& stream_id) { + master::DisableDynamicTableAdditionOnCDCSDKStreamRequestPB req; + master::DisableDynamicTableAdditionOnCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + + RpcController rpc; + rpc.set_timeout(timeout_); + RETURN_NOT_OK( + master_replication_proxy_->DisableDynamicTableAdditionOnCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error disabling dynamic table addition from CDC stream: " + << resp.error().status().message() << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully disabled dynamic table addition on CDC stream: " << stream_id << "\n"; + + return Status::OK(); +} + +Status ClusterAdminClient::RemoveUserTableFromCDCSDKStream( + const std::string& stream_id, const std::string& table_id) { + master::RemoveUserTableFromCDCSDKStreamRequestPB req; + master::RemoveUserTableFromCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + req.set_table_id(table_id); + + RpcController rpc; + // Set a higher timeout since this RPC verifes that each cdc state table entry for the stream + // belongs to one of the tables in the stream metadata. + rpc.set_timeout(MonoDelta::FromSeconds(std::max(timeout_.ToSeconds(), 120.0))); + RETURN_NOT_OK(master_replication_proxy_->RemoveUserTableFromCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error removing user table from CDC stream: " << resp.error().status().message() + << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully removed user table: " << table_id << " from CDC stream: " << stream_id + << "\n"; + + return Status::OK(); +} + +Status ClusterAdminClient::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const std::string& stream_id) { + master::ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB req; + master::ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + + RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(std::max(timeout_.ToSeconds(), 120.0))); + RETURN_NOT_OK( + master_replication_proxy_->ValidateAndSyncCDCStateEntriesForCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error validating CDC state table entries on CDC stream: " + << resp.error().status().message() << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully validated and synced CDC state table entries on CDC stream: " << stream_id + << "\n"; + if (resp.updated_tablet_entries().size() > 0) { + cout << "Updated checkpoint for the stream's cdc state table entries for following tablet_ids: " + << AsString(resp.updated_tablet_entries()) << "\n"; + } else { + cout << "No additional entries found in cdc state table that requires update. \n"; + } + + return Status::OK(); +} + Status ClusterAdminClient::WaitForSetupUniverseReplicationToFinish( const string& replication_group_id) { master::IsSetupUniverseReplicationDoneRequestPB req; diff --git a/src/yb/tools/yb-admin_client.h b/src/yb/tools/yb-admin_client.h index e5f9cce51956..0f63f621a829 100644 --- a/src/yb/tools/yb-admin_client.h +++ b/src/yb/tools/yb-admin_client.h @@ -400,6 +400,12 @@ class ClusterAdminClient { Status GetCDCDBStreamInfo(const std::string& db_stream_id); + Status DisableDynamicTableAdditionOnCDCSDKStream(const std::string& stream_id); + + Status RemoveUserTableFromCDCSDKStream(const std::string& stream_id, const std::string& table_id); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream(const std::string& stream_id); + Status SetupNamespaceReplicationWithBootstrap(const std::string& replication_id, const std::vector& producer_addresses, const TypedNamespaceName& ns,