From 7c99ff90bc2502ebe86617516fdff5ffecaa136c Mon Sep 17 00:00:00 2001 From: Siddharth Shah Date: Tue, 25 Jun 2024 19:29:52 +0530 Subject: [PATCH] [#22876][#22773] CDCSDK: Add new yb-admin command to remove user table from CDCSDK stream Summary: This diff introduces three new yb-admin commands required to remove a **user table** from a CDCSDK stream. **`NOTE: All three commands are only meant to be used on CDC streams that are not associated with a replication slot.`** **Command-1**: yb-admin command to disable dynamic table addition in a CDC stream. Only works when the new auto flag `enable_cdcsdk_dynamic_tables_disable_option` is set to true. **Note, post execution of this command, no dynamic tables (user/non-user) will get added to CDC stream. Additionally, there is no option to re-enable dynamic table addition for the stream.** ``` yb-admin \ -master_addresses \ disable_dynamic_table_addition_in_change_data_stream ``` The command works with a single stream_id. **Command-2**: yb-admin command to remove only a particular **user** table from the CDC stream metadata as well as update the checkpoint for corresponding state table entries to OpId max. Since, the checkpoint is set to max, these entries will be later deleted from the cdc state table by a separate thread (UpdatePeersAndMetrics). ``` yb-admin \ -master_addresses \ remove_user_table_from_change_data_stream ``` The command works with a single stream_id & table_id. **Command-3**: yb-admin command to validate cdc state table entries for a particular stream. As part of validation, if the table of any cdc state table entry is not present in the CDC stream metadata, then checkpoint of such entries will be updated to OpID max, and they'll be later deleted by a separate thread (UpdatePeersAndMetrics). ``` yb-admin \ -master_addresses \ validate_and_sync_cdc_state_table_entries_on_change_data_stream ``` The command works with a single stream_id. **Advisory for command-usage:** General guidelines that need to strictly followed while executing these commands: - Ensure no DDLs are performed before/after 15 mins of executing these commands. These yb-admin commands are meant to be used when a user is only interested on polling from subset of tables in the namespace. Therefore, the user can remove the extra tables from CDC stream that are not supposed to be polled. To achieve this, user needs to first execute Command-1, followed by command-2 & command-3. Example: Starting state: 5 user tables (t1 to t5) in the CDC stream including 4 extra tables that are not polled (t1,t2,t3,t4) + 2 indexes (i1,i2) Target state: Only t5 + 2 indexes (i1,i2) should be present in CDC stream. To reach the target state, we need to remove 4 user tables (t1-t4) from stream metadata & their state entries **Perform the following steps to remove user tables from the CDC stream:** # Firstly, disable dynamic table addition using command-1. # Confirm that dynamic table addition is disabled by running `list_change_data_streams` yb-admin command. The output for that stream would contain the string `cdcsdk_disable_dynamic_table_addition: true` # Remove the table from stream metadata & update its state table entries using command-2. # Confirm that the table is removed from stream metadata by re-running `list_change_data_streams` command. # Based on when the user reads the cdc state table (via cqlsh), the state table entries corresponding to this table would have been either updated to checkpoint max or may be removed. Note, State table entries deletion might take some time as it will be done in a separate thread. # Repeat step 3-5 for all user tables that needs to be removed. # At the end, once all extra user tables are removed from a stream, execute command-3 as a sanity check to get rid of any cdc state entries that might still be hanging around in state table but the corresponding table has been removed from stream metadata. One scenario where cdc state table entries might be present even after table is removed, is when a tablet splits while table was being removed from stream metadata. In this case, the children tablet entries will get added to cdc state table and so they'll get removed when command-3 is executed. **Working**: Command-1 internally calls //DisableDynamicTableAdditionOnCDCSDKStream// RPC that will set the optional field `cdcsdk_disable_dynamic_table_addition` in stream metadata to true. This will prevent any tables, that are not yet part of the CDC stream, to get added to the CDC stream. Command-2 internally calls //RemoveUserTableFromCDCSDKStream// RPC that performs the following: # Update the checkpoint of tablet entries for the given table in the CDC state table to `OpId::Max()`. This is done to release the retention barriers on these tables and allow the deletion of the state table entry by UpdatePeersAndMetrics. # Remove the table from CDC stream metadata, //cdcsdk_tables_to_stream_map_// and persist the updated metadata in sys catalog. Command-3 internally calls //ValidateAndSyncCDCStateEntriesForCDCSDKStream// RPC that updates checkpoint to max for cdc state table entries whose table is not found in the CDC stream metadata. **Upgrade/Rollback safety:** //cdcsdk_disable_dynamic_table_addition// - added a new optional field in existing protos SysCDCStreamEntryPB, CDCStreamInfoPB. This field is protected and will only be read when the new auto flag `cdcsdk_enable_dynamic_tables_disable_option` is set. Introduced request, response proto for new RPCs: - DisableDynamicTableAdditionOnCDCSDKStream - DisableDynamicTableAdditionOnCDCSDKStreamRequestPB, DisableDynamicTableAdditionOnCDCSDKStreamResponsePB - RemoveUserTableFromCDCSDKStream - RemoveUserTableFromCDCSDKStreamRequestPB, RemoveUserTableFromCDCSDKStreamResponsePB - ValidateAndSyncCDCStateEntriesForCDCSDKStream - ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB, ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB Jira: DB-11778, DB-11676 Test Plan: ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestDisableOfDynamicTableAdditionOnNonConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestDisableOfDynamicTableAdditionOnConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestUserTableRemovalFromNonConsistentSnapshotCDCStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestUserTableRemovalFromConsistentSnapshotCDCStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnNonConsistentSnapshotStream ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnConsistentSnapshotStream Reviewers: skumar, asrinivasan, stiwary Reviewed By: asrinivasan, stiwary Subscribers: ycdcxcluster, ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D35870 --- src/yb/integration-tests/cdcsdk_ysql-test.cc | 356 ++++++++++++++++ .../cdcsdk_ysql_test_base.cc | 48 +++ .../integration-tests/cdcsdk_ysql_test_base.h | 18 + src/yb/master/catalog_entity_info.cc | 11 + src/yb/master/catalog_entity_info.h | 2 + src/yb/master/catalog_entity_info.proto | 5 + src/yb/master/catalog_manager.h | 23 +- src/yb/master/master_replication.proto | 42 ++ src/yb/master/master_replication_service.cc | 3 + src/yb/master/xrepl_catalog_manager.cc | 380 +++++++++++++++++- src/yb/tools/yb-admin_cli.cc | 47 +++ src/yb/tools/yb-admin_client.cc | 78 ++++ src/yb/tools/yb-admin_client.h | 6 + 13 files changed, 1015 insertions(+), 4 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 7ca9b43e0d82..dbd965bc39b5 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -8624,5 +8624,361 @@ TEST_F(CDCSDKYsqlTest, TestTablesWithEnumArrayColumnShouldNotGetAddedToStream) { ASSERT_EQ(stream.stream().table_id_size(), 0); } +void CDCSDKYsqlTest::TestDisableOfDynamicTableAdditionOnCDCStream( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_catalog_manager_bg_task_wait_ms) = 100; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + + const vector table_list_suffix = {"_0", "_1", "_2", "_3", "_4"}; + const int kNumTables = 5; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the first two tables. + for (idx = 0; idx < 2; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id1 = use_consistent_snapshot_stream ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(EXPLICIT)); + auto stream_id2 = use_consistent_snapshot_stream ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(EXPLICIT)); + + std::unordered_set expected_table_ids = {table[0].table_id(), table[1].table_id()}; + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, "Waiting for stream metadata after stream creation."); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids, "Waiting for stream metadata after stream creation."); + + // Since dynamic table addition is not yet disabled, create a new table and verify that it gets + // added to stream metadata of both the streams. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + expected_table_ids.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, "Waiting for GetDBStreamInfo after creating a new table."); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids, "Waiting for GetDBStreamInfo after creating a new table."); + + // Disable dynamic table addition on stream1 via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id1)); + + // Create a new table and verify that it only gets added to stream2's metadata. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + // wait for the bg thread responsible for dynamic table addition to complete its processing. + SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier)); + + // Stream1's metadata should not contain table_4 as dynamic table addition is disabled. Therefore, + // the expected set of tables remains same as before. + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, + "Waiting for GetDBStreamInfo after disabling dynamic table addition on stream1."); + + // Stream2's metadata should contain table_4 as dynamic table addition is not disabled. + auto expected_table_ids_for_stream2 = expected_table_ids; + expected_table_ids_for_stream2.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids_for_stream2, + "Waiting for GetDBStreamInfo after disabling dynamic table addition on stream1."); + + // Verify tablets of table_4 have only been added to cdc_state table for stream2. + std::unordered_set expected_tablets_for_stream1; + std::unordered_set expected_tablets_for_stream2; + for (int i = 0; i < idx; i++) { + if (i < 3) { + expected_tablets_for_stream1.insert(tablets[i].Get(0).tablet_id()); + } + expected_tablets_for_stream2.insert(tablets[i].Get(0).tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets_for_stream1, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets_for_stream2, test_client(), stream_id2); + + // Even on a master restart, table_4 should not get added to the stream1. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Any newly created table after master restart should not get added to stream1. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + // wait for the bg thread responsible for dynamic table addition to complete its processing. + SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier)); + + // Stream1's metadata should not contain table_5 as dynamic table addition is disabled. + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, + "Waiting for GetDBStreamInfo after creating new table on master restart."); + + // Stream2's metadata should contain table_5 as dynamic table addition is not disabled. + expected_table_ids_for_stream2.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids_for_stream2, + "Waiting for GetDBStreamInfo after creating new table on master restart."); + + // verify tablets of table_4 & table_5 have not been added to cdc_state table for stream1. + CheckTabletsInCDCStateTable(expected_tablets_for_stream1, test_client(), stream_id1); + + // Tablets of table_5 should be added to cdc state table for stream2. + expected_tablets_for_stream2.insert(tablets[idx - 1].Get(0).tablet_id()); + CheckTabletsInCDCStateTable(expected_tablets_for_stream2, test_client(), stream_id2); +} + +TEST_F(CDCSDKYsqlTest, TestDisableOfDynamicTableAdditionOnNonConsistentSnapshotStream) { + TestDisableOfDynamicTableAdditionOnCDCStream( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestDisableOfDynamicTableAdditionOnConsistentSnapshotStream) { + TestDisableOfDynamicTableAdditionOnCDCStream( + /* use_consistent_snapshot_stream */ true); +} + +void CDCSDKYsqlTest::TestUserTableRemovalFromCDCStream(bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(1, 1, false)); + + const vector table_list_suffix = {"_0", "_1", "_2"}; + const int kNumTables = 3; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the all 3 tables. + for (idx = 0; idx < kNumTables; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Before we remove a table, get the initial stream metadata as well as cdc state table entries. + std::unordered_set expected_tables; + for (const auto& table_entry : table) { + expected_tables.insert(table_entry.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id, expected_tables, "Waiting for GetDBStreamInfo after stream creation"); + + std::unordered_set expected_tablets; + for (const auto& tablets_entries : tablets) { + for (const auto& tablet : tablets_entries) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Disable dynamic table addition on stream via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id)); + + // Remove table_1 from stream using yb-admin command. This command will remove table from stream + // metadata as well as update its corresponding state table tablet entries with checkpoint as max. + ASSERT_OK(RemoveUserTableFromCDCSDKStream(stream_id, table[0].table_id())); + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Stream metadata should no longer contain the removed table i.e. table_1. + expected_tables.erase(table[0].table_id()); + std::unordered_set expected_tables_after_table_removal = expected_tables; + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetDBStreamInfo after table removal from CDC stream."); + + // Since checkpoint will be set to max for table_1's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (int i = 1; i < idx; i++) { + for (const auto& tablet : tablets[i]) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + ASSERT_OK(WaitForFlushTables( + {table[0].table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split table_1's tablet. + WaitUntilSplitIsSuccesful(tablets[0].Get(0).tablet_id(), table[0], 2); + google::protobuf::RepeatedPtrField table1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + table[0], 0, &table1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(table1_tablets_after_split.size(), 2); + + // Wait for sometime so that tablet split codepath has completed adding new cdc state entries. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + // Children tablets of table_1 shouldnt get added to cdc state table since the table no longer + // exists in stream metadata. + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + + // Even after a restart, we shouldn't see table_1 in stream metadata as well as cdc state table + // entries shouldnt contain any of the table_1 tablets. + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetBStreamInfo after master restart."); + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); +} + +TEST_F(CDCSDKYsqlTest, TestUserTableRemovalFromNonConsistentSnapshotCDCStream) { + TestUserTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestUserTableRemovalFromConsistentSnapshotCDCStream) { + TestUserTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ true); +} + +void CDCSDKYsqlTest::TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal) = + true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + + const vector table_list_suffix = {"_0", "_1", "_2"}; + const int kNumTables = 3; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the all 3 tables. + for (idx = 0; idx < kNumTables; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 3, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Before we remove a table, get the initial stream metadata as well as cdc state table entries. + std::unordered_set expected_tables; + for (const auto& table_entry : table) { + expected_tables.insert(table_entry.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id, expected_tables, "Waiting for GetDBStreamInfo after stream creation"); + + std::unordered_set expected_tablets; + for (const auto& tablets_entries : tablets) { + for (const auto& tablet : tablets_entries) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Disable dynamic table addition on stream via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id)); + + // Remove table_1 from stream using yb-admin command. This command will remove table from stream + // metadata but skip updating cdc state entries because the test flag + // skip_updating_cdc_state_entries_on_table_removal is set. + ASSERT_OK(RemoveUserTableFromCDCSDKStream(stream_id, table[0].table_id())); + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Stream metadata should no longer contain the removed table i.e. table_1. + expected_tables.erase(table[0].table_id()); + std::unordered_set expected_tables_after_table_removal = expected_tables; + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetDBStreamInfo after table removal from CDC stream."); + + // Verify that cdc state table still contains entries for the table that was removed. + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Now, validate the cdc state entries using the yb-admin command + // 'validate_cdc_state_table_entries_on_change_data_stream'. It will find state table entries for + // table_1 and update their checkpoints to max. + ASSERT_OK(ValidateAndSyncCDCStateEntriesForCDCSDKStream(stream_id)); + + // Since checkpoint will be set to max for table_1's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (int i = 1; i < idx; i++) { + for (const auto& tablet : tablets[i]) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); +} + +TEST_F( + CDCSDKYsqlTest, + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnNonConsistentSnapshotStream) { + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F( + CDCSDKYsqlTest, + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnConsistentSnapshotStream) { + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + /* use_consistent_snapshot_stream */ true); +} + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index 124730b79e52..43010f34647e 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -4503,5 +4503,53 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { return oss.str(); } + Status CDCSDKYsqlTest::ExecuteYBAdminCommand( + const std::string& command_name, const std::vector& command_args) { + string tool_path = GetToolPath("../bin", "yb-admin"); + vector argv; + argv.push_back(tool_path); + argv.push_back("--master_addresses"); + argv.push_back(AsString(test_cluster_.mini_cluster_->GetMasterAddresses())); + argv.push_back(command_name); + for (const auto& command_arg : command_args) { + argv.push_back(command_arg); + } + + RETURN_NOT_OK(Subprocess::Call(argv)); + + return Status::OK(); + } + + Status CDCSDKYsqlTest::DisableDynamicTableAdditionOnCDCSDKStream( + const xrepl::StreamId& stream_id) { + std::string yb_admin_command = "disable_dynamic_table_addition_on_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + return Status::OK(); + } + + Status CDCSDKYsqlTest::RemoveUserTableFromCDCSDKStream( + const xrepl::StreamId& stream_id, const TableId& table_id) { + std::string yb_admin_command = "remove_user_table_from_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + command_args.push_back(table_id); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + + return Status::OK(); + } + + Status CDCSDKYsqlTest::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const xrepl::StreamId& stream_id) { + std::string yb_admin_command = + "validate_and_sync_cdc_state_table_entries_on_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + + return Status::OK(); + } + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 3b8d2eaa965c..4e4fa39760c0 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -117,6 +117,8 @@ DECLARE_uint64(TEST_cdcsdk_publication_list_refresh_interval_micros); DECLARE_bool(cdcsdk_enable_dynamic_table_support); DECLARE_bool(enable_cdcsdk_setting_get_changes_response_byte_limit); DECLARE_uint64(cdcsdk_vwal_getchanges_resp_max_size_bytes); +DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); +DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal); namespace yb { @@ -781,6 +783,22 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { std::string GetPubRefreshTimesString(vector pub_refresh_times); void TestNonUserTableShouldNotGetAddedToCDCStream (bool create_consistent_snapshot_stream); + + Status ExecuteYBAdminCommand( + const std::string& command_name, const std::vector& command_args); + + Status DisableDynamicTableAdditionOnCDCSDKStream(const xrepl::StreamId& stream_id); + + void TestDisableOfDynamicTableAdditionOnCDCStream(bool use_consistent_snapshot_stream); + + Status RemoveUserTableFromCDCSDKStream(const xrepl::StreamId& stream_id, const TableId& table_id); + + void TestUserTableRemovalFromCDCStream(bool use_consistent_snapshot_stream); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream(const xrepl::StreamId& stream_id); + + void TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + bool use_consistent_snapshot_stream); }; } // namespace cdc diff --git a/src/yb/master/catalog_entity_info.cc b/src/yb/master/catalog_entity_info.cc index c7acaed05bac..5ce566841e3c 100644 --- a/src/yb/master/catalog_entity_info.cc +++ b/src/yb/master/catalog_entity_info.cc @@ -61,6 +61,7 @@ using std::string; using strings::Substitute; DECLARE_int32(tserver_unresponsive_timeout_ms); +DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); DEFINE_RUNTIME_AUTO_bool( use_parent_table_id_field, kLocalPersisted, false, true, @@ -1264,6 +1265,16 @@ CDCStreamInfo::GetReplicaIdentityMap() const { return l->pb.replica_identity_map(); } +bool CDCStreamInfo::IsDynamicTableAdditionDisabled() const { + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + return false; + } + + auto l = LockForRead(); + return l->pb.has_cdcsdk_disable_dynamic_table_addition() && + l->pb.cdcsdk_disable_dynamic_table_addition(); +} + std::string CDCStreamInfo::ToString() const { auto l = LockForRead(); if (l->pb.has_namespace_id()) { diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 22d2dfef3370..ca5da4f3cf92 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -1154,6 +1154,8 @@ class CDCStreamInfo : public RefCountedThreadSafe, const google::protobuf::Map<::std::string, ::yb::PgReplicaIdentity> GetReplicaIdentityMap() const; + bool IsDynamicTableAdditionDisabled() const; + std::string ToString() const override; bool IsXClusterStream() const; diff --git a/src/yb/master/catalog_entity_info.proto b/src/yb/master/catalog_entity_info.proto index f2f278826466..9d34e1334179 100644 --- a/src/yb/master/catalog_entity_info.proto +++ b/src/yb/master/catalog_entity_info.proto @@ -535,6 +535,11 @@ message SysCDCStreamEntryPB { map replica_identity_map = 9; optional string cdcsdk_ysql_replication_slot_plugin_name = 10; + + // Dynamic tables are the tables which are created after the creation of the stream. + // This field controls if dynamic tables should automatically be added to the CDC stream or not. + // If set to true, dynamic table wont get added to the CDC stream. + optional bool cdcsdk_disable_dynamic_table_addition = 11; } diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index b4c52a24f22a..c264f244885c 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -45,6 +45,7 @@ #include #include "yb/cdc/cdc_service.pb.h" +#include "yb/cdc/cdc_state_table.h" #include "yb/cdc/xcluster_types.h" #include "yb/common/constants.h" #include "yb/common/entity_ids.h" @@ -1352,6 +1353,18 @@ class CatalogManager : public tserver::TabletPeerLookupIf, YsqlBackfillReplicationSlotNameToCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + Status DisableDynamicTableAdditionOnCDCSDKStream( + const DisableDynamicTableAdditionOnCDCSDKStreamRequestPB* req, + DisableDynamicTableAdditionOnCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + + Status RemoveUserTableFromCDCSDKStream( + const RemoveUserTableFromCDCSDKStreamRequestPB* req, + RemoveUserTableFromCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB* req, + ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + // Query if Bootstrapping is required for a CDC stream (e.g. Are we missing logs). Status IsBootstrapRequired( const IsBootstrapRequiredRequestPB* req, @@ -1482,7 +1495,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Find all CDCSDK streams which do not have metadata for the newly added tables. Status FindCDCSDKStreamsForAddedTables(TableStreamIdsMap* table_to_unprocessed_streams_map); - bool CanTableBeAddedToCDCSDKStream( + bool IsTableEligibleForCDCSDKStream( const TableInfoPtr& table_info, const Schema& schema) const REQUIRES_SHARED(mutex_); // This method compares all tables in the namespace to all the tables added to a CDCSDK stream, @@ -3102,6 +3115,14 @@ class CatalogManager : public tserver::TabletPeerLookupIf, const TabletInfo& tablet, const ScheduleMinRestoreTime& schedule_to_min_restore_time) EXCLUDES(mutex_); + Result> UpdateCheckpointForTabletEntriesInCDCState( + const xrepl::StreamId& stream_id, + const std::unordered_set& tables_in_stream_metadata, + const TableId& table_to_be_removed = ""); + + Status RemoveTableFromCDCStreamMetadataAndMaps( + const CDCStreamInfoPtr stream, const TableId table_id); + // Should be bumped up when tablet locations are changed. std::atomic tablet_locations_version_{0}; diff --git a/src/yb/master/master_replication.proto b/src/yb/master/master_replication.proto index d0832391f181..d090d61b8a24 100644 --- a/src/yb/master/master_replication.proto +++ b/src/yb/master/master_replication.proto @@ -42,6 +42,11 @@ message CDCStreamInfoPB { map replica_identity_map = 10; optional string cdcsdk_ysql_replication_slot_plugin_name = 11; + + // Dynamic tables are the tables which are created after the creation of the stream. + // This field controls if dynamic tables should automatically be added to the CDC stream or not. + // If set to true, dynamic table wont get added to the CDC stream. + optional bool cdcsdk_disable_dynamic_table_addition = 12; } message ValidateReplicationInfoRequestPB { @@ -755,6 +760,32 @@ message GetUniverseReplicationInfoResponsePB { repeated DbScopedInfoPB db_scoped_infos = 5; } +message DisableDynamicTableAdditionOnCDCSDKStreamRequestPB { + optional string stream_id = 1; +} + +message DisableDynamicTableAdditionOnCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; +} + +message RemoveUserTableFromCDCSDKStreamRequestPB { + optional string stream_id = 1; + optional string table_id = 2; +} + +message RemoveUserTableFromCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; +} + +message ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB { + optional string stream_id = 1; +} + +message ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; + repeated string updated_tablet_entries = 2; +} + service MasterReplication { option (yb.rpc.custom_service_name) = "yb.master.MasterService"; @@ -871,4 +902,15 @@ service MasterReplication { returns (AddNamespaceToXClusterReplicationResponsePB); rpc IsAlterXClusterReplicationDone(IsAlterXClusterReplicationDoneRequestPB) returns (IsAlterXClusterReplicationDoneResponsePB); + + // Introduced for bug (#22876, #22773) + rpc DisableDynamicTableAdditionOnCDCSDKStream (DisableDynamicTableAdditionOnCDCSDKStreamRequestPB) + returns (DisableDynamicTableAdditionOnCDCSDKStreamResponsePB); + // Introduced for bug (#22876, #22773) + rpc RemoveUserTableFromCDCSDKStream (RemoveUserTableFromCDCSDKStreamRequestPB) + returns (RemoveUserTableFromCDCSDKStreamResponsePB); + // Introduced for bug (#22876, #22773) + rpc ValidateAndSyncCDCStateEntriesForCDCSDKStream( + ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB) + returns (ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB); } diff --git a/src/yb/master/master_replication_service.cc b/src/yb/master/master_replication_service.cc index 54fed364fa6e..b9ce83d56a61 100644 --- a/src/yb/master/master_replication_service.cc +++ b/src/yb/master/master_replication_service.cc @@ -57,6 +57,9 @@ class MasterReplicationServiceImpl : public MasterServiceBase, public MasterRepl (ChangeXClusterRole) (BootstrapProducer) (YsqlBackfillReplicationSlotNameToCDCSDKStream) + (DisableDynamicTableAdditionOnCDCSDKStream) + (RemoveUserTableFromCDCSDKStream) + (ValidateAndSyncCDCStateEntriesForCDCSDKStream) ) MASTER_SERVICE_IMPL_ON_LEADER_WITH_LOCK( diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index 82a638182be5..5febd70735f7 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -138,6 +138,22 @@ DEFINE_test_flag(bool, fail_universe_replication_merge, false, "Causes MergeUniv DEFINE_test_flag(bool, xcluster_fail_setup_stream_update, false, "Fail UpdateCDCStream RPC call"); +DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_dynamic_tables_disable_option, + kLocalPersisted, + false, + true, + "This flag needs to be true in order to disable addition of dynamic tables " + "to CDC stream. This flag is required to be to true for execution of " + "yb-admin commands - " + "\'disable_dynamic_table_addition_on_change_data_stream\', " + "\'remove_user_table_from_change_data_stream\'"); +TAG_FLAG(cdcsdk_enable_dynamic_tables_disable_option, advanced); +TAG_FLAG(cdcsdk_enable_dynamic_tables_disable_option, hidden); + +DEFINE_test_flag(bool, cdcsdk_skip_updating_cdc_state_entries_on_table_removal, false, + "Skip updating checkpoint to max for cdc state table entries while removing a user table from " + "CDCSDK stream."); + DECLARE_bool(xcluster_wait_on_ddl_alter); DECLARE_int32(master_rpc_timeout_ms); DECLARE_bool(ysql_yb_enable_replication_commands); @@ -1718,6 +1734,11 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( continue; } + // skip streams on which dynamic table addition is disabled. + if(stream_info->IsDynamicTableAdditionDisabled()) { + continue; + } + auto const unprocessed_tables = FindOrNull(namespace_to_unprocessed_table_map, stream_info->namespace_id()); if (!unprocessed_tables) { @@ -1741,7 +1762,7 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( continue; } - if (!CanTableBeAddedToCDCSDKStream(table, schema)) { + if (!IsTableEligibleForCDCSDKStream(table, schema)) { RemoveTableFromCDCSDKUnprocessedMap(unprocessed_table_id, stream_info->namespace_id()); continue; } @@ -1878,7 +1899,7 @@ std::vector CatalogManager::FindAllTablesForCDCSDK(const Namespace } } - if (!CanTableBeAddedToCDCSDKStream(table_info.get(), schema)) { + if (!IsTableEligibleForCDCSDKStream(table_info.get(), schema)) { continue; } @@ -1888,7 +1909,7 @@ std::vector CatalogManager::FindAllTablesForCDCSDK(const Namespace return tables; } -bool CatalogManager::CanTableBeAddedToCDCSDKStream( +bool CatalogManager::IsTableEligibleForCDCSDKStream( const TableInfoPtr& table_info, const Schema& schema) const { bool has_pk = true; bool has_invalid_pg_typeoid = false; @@ -2479,6 +2500,12 @@ Status CatalogManager::GetCDCStream( stream_info->set_stream_creation_time(stream_lock->pb.stream_creation_time()); } + if (FLAGS_cdcsdk_enable_dynamic_tables_disable_option && + stream_lock->pb.has_cdcsdk_disable_dynamic_table_addition()) { + stream_info->set_cdcsdk_disable_dynamic_table_addition( + stream_lock->pb.cdcsdk_disable_dynamic_table_addition()); + } + auto replica_identity_map = stream_lock->pb.replica_identity_map(); stream_info->mutable_replica_identity_map()->swap(replica_identity_map); @@ -2612,6 +2639,11 @@ Status CatalogManager::ListCDCStreams( stream->set_stream_creation_time(ltm->pb.stream_creation_time()); } + if (FLAGS_cdcsdk_enable_dynamic_tables_disable_option && + ltm->pb.has_cdcsdk_disable_dynamic_table_addition()) { + stream->set_cdcsdk_disable_dynamic_table_addition( + ltm->pb.cdcsdk_disable_dynamic_table_addition()); + } } return Status::OK(); } @@ -6102,6 +6134,230 @@ Status CatalogManager::YsqlBackfillReplicationSlotNameToCDCSDKStream( return Status::OK(); } +Status CatalogManager::DisableDynamicTableAdditionOnCDCSDKStream( + const DisableDynamicTableAdditionOnCDCSDKStreamRequestPB* req, + DisableDynamicTableAdditionOnCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing DisableDynamicTableAdditionOnCDCSDKStream request from " + << RequestorString(rpc) << ": " << req->ShortDebugString(); + + if (!req->has_stream_id()) { + RETURN_INVALID_REQUEST_STATUS("CDC Stream ID must be provided"); + } + + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + RETURN_INVALID_REQUEST_STATUS( + "Disabling addition of dynamic tables to CDC stream is disallowed in the middle of an " + "upgrade. Finalize the upgrade and try again"); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream"); + } + + // We only want to allow disabling dynamic table addition on older streams that are not associated + // with a replication slot. + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot disable dynamic table addition on CDC streams associated with a replication slot"); + } + + if (stream->IsDynamicTableAdditionDisabled()) { + return STATUS(AlreadyPresent, "Dynamic table addition already disabled on the CDC stream"); + } + + // Disable dynamic table addition by setting the stream metadata field to true. + { + auto stream_lock = stream->LockForWrite(); + auto& pb = stream_lock.mutable_data()->pb; + + pb.set_cdcsdk_disable_dynamic_table_addition(true); + + RETURN_ACTION_NOT_OK( + sys_catalog_->Upsert(leader_ready_term(), stream), "Updating CDC stream in system catalog"); + + stream_lock.Commit(); + } + + LOG_WITH_FUNC(INFO) << "Successfully disabled dynamic table addition on CDC stream: " + << stream_id; + + return Status::OK(); +} + +Status CatalogManager::RemoveUserTableFromCDCSDKStream( + const RemoveUserTableFromCDCSDKStreamRequestPB* req, + RemoveUserTableFromCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing RemoveUserTableFromCDCSDKStream request from " << RequestorString(rpc) + << ": " << req->ShortDebugString(); + + if (!req->has_stream_id() || !req->has_table_id()) { + RETURN_INVALID_REQUEST_STATUS("Both CDC Stream ID and table ID must be provided"); + } + + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + RETURN_INVALID_REQUEST_STATUS( + "Removal of user table from CDC stream is disallowed in the middle of an " + "upgrade. Finalize the upgrade and try again"); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + auto table_id = req->table_id(); + + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream"); + } + + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot remove table from CDC streams that are associated with a replication slot"); + } + + if (!stream->IsDynamicTableAdditionDisabled()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot remove table unless dynamic table addition is disabled for the stream. Please use " + "the yb-admin command \"disable_dynamic_table_addition_in_change_data_stream\" to disable " + "dynamic table addition on the stream."); + } + + auto stream_ns_id = stream->LockForRead()->namespace_id(); + + scoped_refptr table; + { + SharedLock lock(mutex_); + table = tables_->FindTableOrNull(table_id); + } + + if (table == nullptr || table->LockForRead()->is_deleting()) { + return STATUS(NotFound, "Could not find table", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + Schema schema; + Status status = table->GetSchema(&schema); + if (!status.ok()) { + return STATUS(InternalError, Format("Error while getting schema for table: $0", table->name())); + } + + { + SharedLock lock(mutex_); + if (!IsTableEligibleForCDCSDKStream(table, schema)) { + RETURN_INVALID_REQUEST_STATUS( + "Only allowed to remove user tables from CDC streams via this command."); + } + } + + auto table_ns_id = table->LockForRead()->namespace_id(); + if (table_ns_id != stream_ns_id) { + RETURN_INVALID_REQUEST_STATUS("Stream and Table are not under the same namespace"); + } + + if (!FLAGS_TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal) { + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + for (const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + // Explicitly remove the table from the set since we want to remove the tablet entries of this + // table from the cdc state table. + tables_in_stream_metadata.erase(table_id); + RETURN_NOT_OK_PREPEND( + UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata), + "Error updating tablet entries from cdc state table"); + } + + // Now remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist + // the updated metadata. + RETURN_NOT_OK_PREPEND( + RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id), + "Error removing table from stream metadata and maps"); + + LOG_WITH_FUNC(INFO) + << "Successfully removed table " << table_id << " from CDC stream: " << stream_id + << " and updated the checkpoint to max for corresponding cdc state table entries."; + + return Status::OK(); +} + +Status CatalogManager::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB* req, + ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing ValidateAndSyncCDCStateEntriesForCDCSDKStream request from " + << RequestorString(rpc) << ": " << req->ShortDebugString(); + + if (!req->has_stream_id()) { + RETURN_INVALID_REQUEST_STATUS("CDC Stream ID must be provided"); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream"); + } + + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot validate and sync cdc state table entries for CDC streams that are associated with " + "a replication slot"); + } + + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + tables_in_stream_metadata.reserve(stream_lock->table_id().size()); + for (const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + auto updated_state_table_entries = VERIFY_RESULT( + UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata)); + + for (const auto& entry : updated_state_table_entries) { + resp->add_updated_tablet_entries(entry.key.tablet_id); + } + + LOG_WITH_FUNC(INFO) + << "Successfully validated and synced cdc state table entries for CDC stream: " << stream_id; + + return Status::OK(); +} + std::vector CatalogManager::GetAllXClusterUniverseReplicationInfos() { SharedLock lock(mutex_); @@ -7453,5 +7709,123 @@ void CatalogManager::CDCSDKPopulateDeleteRetainerInfoForTabletDrop( delete_retainer.active_cdcsdk = IsTablePartOfCDCSDK(tablet_info.table()->id()); } +Result> +CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( + const xrepl::StreamId& stream_id, const std::unordered_set& tables_in_stream_metadata, + const TableId& table_to_be_removed) { + std::unordered_set tablet_entries_to_be_removed; + + // If the table_id to be removed is provided, we will only find out cdc state table entries + // corresponding to this table and update their checkpoints. Otherwise, we'll consider all state + // table entries for checkpoint update. + if (!table_to_be_removed.empty()) { + scoped_refptr table; + { + SharedLock lock(mutex_); + table = tables_->FindTableOrNull(table_to_be_removed); + } + + // First we'll update the checkpoint to OpId max for all the cdc state entries correponding to + // the table. Therefore, get all the tablets for the table to be removed. + TabletInfos tablets; + tablets = VERIFY_RESULT(table->GetTablets(IncludeInactive::kTrue)); + + for (const auto& tablet : tablets) { + tablet_entries_to_be_removed.insert(tablet->tablet_id()); + } + } + + Status iteration_status; + auto all_entry_keys = + VERIFY_RESULT(cdc_state_table_->GetTableRange({} /* just key columns */, &iteration_status)); + std::vector entries_to_update; + // Get all the tablet, stream pairs from cdc_state for the given stream. + std::vector cdc_state_tablet_entries; + for (const auto& entry_result : all_entry_keys) { + RETURN_NOT_OK(entry_result); + const auto& entry = *entry_result; + + if (entry.key.stream_id == stream_id) { + // If table_id is provided, filter out state entries belonging to tablets of the table. + if (table_to_be_removed.empty() || + (!table_to_be_removed.empty() && + tablet_entries_to_be_removed.contains(entry.key.tablet_id))) { + cdc_state_tablet_entries.push_back(entry.key.tablet_id); + } + } + } + RETURN_NOT_OK(iteration_status); + + // Get the tablet info for state table entries of the stream. + auto tablet_infos = GetTabletInfos(cdc_state_tablet_entries); + + // For each state table entry present in cdc_state_tablet_entries, verify that the tablet's table + // is present in the CDC stream metadata. If not, update checkpoint of such tablet entries to + // OpId::Max. For colocated tables, even if one of the colocated table is present in the CDC + // stream metadata, skip updating the checkpoint for that tablet, stream pair. + for (const auto& tablet_info : tablet_infos) { + bool table_found = false; + for (const auto& table_id : tablet_info->GetTableIds()) { + if (tables_in_stream_metadata.contains(table_id)) { + table_found = true; + } + } + + if (!table_found) { + cdc::CDCStateTableEntry update_entry(tablet_info->tablet_id(), stream_id); + update_entry.checkpoint = OpId::Max(); + entries_to_update.emplace_back(std::move(update_entry)); + LOG_WITH_FUNC(INFO) + << "Setting checkpoint to OpId::Max() for cdc state table entry (tablet,stream) - " + << update_entry.ToString(); + } + } + + if (!entries_to_update.empty()) { + LOG_WITH_FUNC(INFO) + << "Updating checkpoint to max for " << entries_to_update.size() + << " cdc state entries as part of validating cdc state table entries for CDC stream: " + << stream_id; + RETURN_NOT_OK_PREPEND( + cdc_state_table_->UpdateEntries(entries_to_update), + "Error setting checkpoint to OpId::Max() in cdc_state table"); + } + + return entries_to_update; +} + +Status CatalogManager::RemoveTableFromCDCStreamMetadataAndMaps( + const CDCStreamInfoPtr stream, const TableId table_id) { + // Remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist + // the updated metadata. + { + auto ltm = stream->LockForWrite(); + bool need_to_update_stream = false; + + auto table_id_iter = std::find(ltm->table_id().begin(), ltm->table_id().end(), table_id); + if (table_id_iter != ltm->table_id().end()) { + need_to_update_stream = true; + ltm.mutable_data()->pb.mutable_table_id()->erase(table_id_iter); + } + + if (need_to_update_stream) { + RETURN_ACTION_NOT_OK( + sys_catalog_->Upsert(leader_ready_term(), stream), + "Updating CDC streams in system catalog"); + } + + ltm.Commit(); + + if (need_to_update_stream) { + { + LockGuard lock(mutex_); + cdcsdk_tables_to_stream_map_[table_id].erase(stream->StreamId()); + } + } + } + + return Status::OK(); +} + } // namespace master } // namespace yb diff --git a/src/yb/tools/yb-admin_cli.cc b/src/yb/tools/yb-admin_cli.cc index b47da2e9d029..d54106c19b5b 100644 --- a/src/yb/tools/yb-admin_cli.cc +++ b/src/yb/tools/yb-admin_cli.cc @@ -1945,6 +1945,50 @@ Status ysql_backfill_change_data_stream_with_replication_slot_action( return Status::OK(); } +const auto disable_dynamic_table_addition_on_change_data_stream_args = ""; +Status disable_dynamic_table_addition_on_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 1) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + string msg = Format("Failed to disable dynamic table addition on CDC stream $0", stream_id); + + RETURN_NOT_OK_PREPEND(client->DisableDynamicTableAdditionOnCDCSDKStream(stream_id), msg); + return Status::OK(); +} + +const auto remove_user_table_from_change_data_stream_args = " "; +Status remove_user_table_from_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 2) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + const string table_id = args[1]; + string msg = Format("Failed to remove table $0 from CDC stream $1", table_id, stream_id); + + RETURN_NOT_OK_PREPEND(client->RemoveUserTableFromCDCSDKStream(stream_id, table_id), msg); + return Status::OK(); +} + +const auto validate_and_sync_cdc_state_table_entries_on_change_data_stream_args = ""; +Status validate_and_sync_cdc_state_table_entries_on_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 1) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + string msg = + Format("Failed to validate and sync cdc state table entries for CDC stream $0", stream_id); + + RETURN_NOT_OK_PREPEND(client->ValidateAndSyncCDCStateEntriesForCDCSDKStream(stream_id), msg); + return Status::OK(); +} + const auto setup_universe_replication_args = " " " [] " @@ -2731,6 +2775,9 @@ void ClusterAdminCli::RegisterCommandHandlers() { REGISTER_COMMAND(list_change_data_streams); REGISTER_COMMAND(get_change_data_stream_info); REGISTER_COMMAND(ysql_backfill_change_data_stream_with_replication_slot); + REGISTER_COMMAND(disable_dynamic_table_addition_on_change_data_stream); + REGISTER_COMMAND(remove_user_table_from_change_data_stream); + REGISTER_COMMAND(validate_and_sync_cdc_state_table_entries_on_change_data_stream); // xCluster Source commands REGISTER_COMMAND(bootstrap_cdc_producer); REGISTER_COMMAND(list_cdc_streams); diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index 7d263f632b57..caacae7ae9c9 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -3792,6 +3792,84 @@ Status ClusterAdminClient::YsqlBackfillReplicationSlotNameToCDCSDKStream( return Status::OK(); } +Status ClusterAdminClient::DisableDynamicTableAdditionOnCDCSDKStream(const std::string& stream_id) { + master::DisableDynamicTableAdditionOnCDCSDKStreamRequestPB req; + master::DisableDynamicTableAdditionOnCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + + RpcController rpc; + rpc.set_timeout(timeout_); + RETURN_NOT_OK( + master_replication_proxy_->DisableDynamicTableAdditionOnCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error disabling dynamic table addition from CDC stream: " + << resp.error().status().message() << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully disabled dynamic table addition on CDC stream: " << stream_id << "\n"; + + return Status::OK(); +} + +Status ClusterAdminClient::RemoveUserTableFromCDCSDKStream( + const std::string& stream_id, const std::string& table_id) { + master::RemoveUserTableFromCDCSDKStreamRequestPB req; + master::RemoveUserTableFromCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + req.set_table_id(table_id); + + RpcController rpc; + // Set a higher timeout since this RPC verifes that each cdc state table entry for the stream + // belongs to one of the tables in the stream metadata. + rpc.set_timeout(MonoDelta::FromSeconds(std::max(timeout_.ToSeconds(), 120.0))); + RETURN_NOT_OK(master_replication_proxy_->RemoveUserTableFromCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error removing user table from CDC stream: " << resp.error().status().message() + << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully removed user table: " << table_id << " from CDC stream: " << stream_id + << "\n"; + + return Status::OK(); +} + +Status ClusterAdminClient::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const std::string& stream_id) { + master::ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB req; + master::ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + + RpcController rpc; + rpc.set_timeout(timeout_); + RETURN_NOT_OK( + master_replication_proxy_->ValidateAndSyncCDCStateEntriesForCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error validating CDC state table entries on CDC stream: " + << resp.error().status().message() << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully validated and synced CDC state table entries on CDC stream: " << stream_id + << "\n"; + if (resp.updated_tablet_entries().size() > 0) { + cout << "Updated checkpoint for the stream's cdc state table entries for following tablet_ids: " + << AsString(resp.updated_tablet_entries()) << "\n"; + } else { + cout << "No additional entries found in cdc state table that requires update. \n"; + } + + return Status::OK(); +} + Status ClusterAdminClient::WaitForSetupUniverseReplicationToFinish( const string& replication_group_id) { master::IsSetupUniverseReplicationDoneRequestPB req; diff --git a/src/yb/tools/yb-admin_client.h b/src/yb/tools/yb-admin_client.h index 28a902f90df1..a8088b91a29d 100644 --- a/src/yb/tools/yb-admin_client.h +++ b/src/yb/tools/yb-admin_client.h @@ -409,6 +409,12 @@ class ClusterAdminClient { Status YsqlBackfillReplicationSlotNameToCDCSDKStream( const std::string& stream_id, const std::string& replication_slot_name); + Status DisableDynamicTableAdditionOnCDCSDKStream(const std::string& stream_id); + + Status RemoveUserTableFromCDCSDKStream(const std::string& stream_id, const std::string& table_id); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream(const std::string& stream_id); + Status SetupNamespaceReplicationWithBootstrap(const std::string& replication_id, const std::vector& producer_addresses, const TypedNamespaceName& ns,