diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index dbd965bc39b5..bf61a34c2b0e 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -8511,7 +8511,7 @@ TEST_F(CDCSDKYsqlTest, TestGetChangesResponseSize) { ASSERT_TRUE(seen_resp_greater_than_limit); } -void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( +void CDCSDKYsqlTest::TestNonEligibleTableShouldNotGetAddedToCDCStream( bool create_consistent_snapshot_stream) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = create_consistent_snapshot_stream; @@ -8533,7 +8533,7 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); ASSERT_EQ(table1_tablets.size(), 3); - // Create non-user tables like index, mat views BEFORE the stream has been created + // Create non-eligible tables like index, mat views BEFORE the stream has been created ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx1 ON $0(a ASC)", tableName1)); ASSERT_OK( conn.ExecuteFormat("CREATE MATERIALIZED VIEW $0_mv1 AS SELECT COUNT(*) FROM $0", tableName1)); @@ -8543,7 +8543,7 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( ? ASSERT_RESULT(CreateConsistentSnapshotStream()) : ASSERT_RESULT(CreateDBStream()); - // // Create non-user tables AFTER the stream has been created + // // Create non-eligible tables AFTER the stream has been created ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx2 ON $0(b ASC)", tableName1)); ASSERT_OK( conn.ExecuteFormat("CREATE MATERIALIZED VIEW $0_mv2 AS SELECT COUNT(*) FROM $0", tableName1)); @@ -8603,12 +8603,12 @@ void CDCSDKYsqlTest::TestNonUserTableShouldNotGetAddedToCDCStream( ASSERT_EQ(expected_tablets, actual_tablets); } -TEST_F(CDCSDKYsqlTest, TestNonUserTableShouldNotGetAddedToNonConsistentSnapshotCDCStream) { - TestNonUserTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ false); +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableShouldNotGetAddedToNonConsistentSnapshotCDCStream) { + TestNonEligibleTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ false); } -TEST_F(CDCSDKYsqlTest, TestNonUserTableShouldNotGetAddedToConsistentSnapshotCDCStream) { - TestNonUserTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ true); +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableShouldNotGetAddedToConsistentSnapshotCDCStream) { + TestNonEligibleTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ true); } TEST_F(CDCSDKYsqlTest, TestTablesWithEnumArrayColumnShouldNotGetAddedToStream) { @@ -8980,5 +8980,325 @@ TEST_F( /* use_consistent_snapshot_stream */ true); } +// This test performs the following: +// 1. Create a table t1 +// 2. Create a CDC stream +// 3. Create an index i1 on t1 - since test flag to add index is enabled, i1 should get added to CDC +// stream. +// 4. Confirm t1 & i1 are part of CDC stream metadata and cdc state table. +// 5. Restart master -> i1 will be marked for removal and bg thread will actually remove it from CDC +// stream metadata and update the checkpoint for state entries to max. +// 6. Verify i1 no longer exists in stream metadata and state entries have been deleted. +// 7. Create a table t2 +// 8. Verify it gets added to stream metadata and cdc state table. +void CDCSDKYsqlTest::TestNonEligibleTableRemovalFromCDCStream(bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_catalog_manager_bg_task_wait_ms) = 100; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_yb_enable_replica_identity) = false; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams( + 1, 1, false /* colocated */, false /* cdc_populate_safepoint_record */, + true /* set_pgsql_proxy_bind_address */)); + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + const auto tableName1 = "test_table_1"; + const auto tableName2 = "test_table_2"; + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName1)); + auto table1 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName1)); + google::protobuf::RepeatedPtrField table1_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table1_tablets.size(), 3); + + xrepl::StreamId stream_id1 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + xrepl::StreamId stream_id2 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + const vector index_list_suffix = {"_0", "_1", "_2", "_3"}; + const int kNumIndexes = 4; + vector indexes(kNumIndexes); + int i = 0; + vector> idx_tablets(kNumIndexes); + + while (i < kNumIndexes) { + // Create an index AFTER the stream has been created + ASSERT_OK( + conn.ExecuteFormat("CREATE INDEX $0_idx$1 ON $0(b ASC)", tableName1, index_list_suffix[i])); + indexes[i] = ASSERT_RESULT(GetTable( + &test_cluster_, kNamespaceName, Format("$0_idx$1", tableName1, index_list_suffix[i]))); + // Wait for the bg thread to complete finding out new tables added in the namespace and add + // them to CDC stream if relevant. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + ASSERT_OK(test_client()->GetTablets( + indexes[i], 0, &idx_tablets[i], /* partition_list_version=*/nullptr)); + ASSERT_EQ(idx_tablets[i].size(), 1); + i++; + } + + // Verify CDC stream metadata contains both table1 and the index table. + std::unordered_set expected_tables = {table1.table_id()}; + for (const auto& idx : indexes) { + expected_tables.insert(idx.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating an index after stream creation"); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after creating an index after stream creation"); + + // Verify cdc state table contains entries from both table1 & index table. + std::unordered_set expected_tablets; + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + for (const auto& tablets : idx_tablets) { + for (const auto& tablet : tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream contains the user table as well as indexes"; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = false; + // Non-eligible tables like the index will be removed from stream on a master restart. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + + // wait for the bg thread to remove the index from stream metadata and update the checkpoint for + // corresponding state table entries to max. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + // Stream metadata should no longer contain the index. + expected_tables.clear(); + expected_tables.insert(table1.table_id()); + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after non-user table removal from CDC stream."); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after non-user table removal from CDC stream."); + + // Since checkpoint will be set to max for index's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream, after master restart, only contains the user table."; + + // Create a dynamic table and create non eligible tables on this dynamic table. + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName2)); + auto table2 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName2)); + google::protobuf::RepeatedPtrField table2_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table2, 0, &table2_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table2_tablets.size(), 3); + + expected_tables.insert(table2.table_id()); + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating a new user table post master restart."); + VerifyTablesInStreamMetadata( + stream_id2, expected_tables, + "Waiting for GetDBStreamInfo after creating a new user table post master restart."); + + for (const auto& tablet : table2_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id2); + LOG(INFO) << "Stream contains both the user tables."; +} + +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableRemovalFromNonConsistentSnapshotCDCStream) { + TestNonEligibleTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestNonEligibleTableRemovalFromConsistentSnapshotCDCStream) { + TestNonEligibleTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ true); +} + +// This test performs the following: +// 1. Create a table t1 +// 2. Create a CDC stream +// 3. Create an index i1 on t1 - since test flag to add index is enabled, i1 should get added to CDC +// stream. +// 4. Confirm t1 & i1 are part of CDC stream metadata and cdc state table. +// 5. Split one tablet each of index i1 and table t1. +// 6. Verify none of the children tablets of i1 are added to cdc state table. +// 7. Verify both children tablets of table t1 have been added to cdc state table. +void CDCSDKYsqlTest::TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_cdcsdk_streamed_tables) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cleanup_split_tablets_interval_sec) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams( + 1, 1, false /* colocated */, false /* cdc_populate_safepoint_record */, + true /* set_pgsql_proxy_bind_address */)); + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + const auto tableName1 = "test_table_1"; + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(key int PRIMARY KEY, a int, b int) SPLIT INTO 3 TABLETS;", tableName1)); + auto table1 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, tableName1)); + google::protobuf::RepeatedPtrField table1_tablets; + + // Wait for a second for the table to be created and the tablets to be RUNNING + // Only after this will the tablets of this table get entries in cdc_state table + SleepFor(MonoDelta::FromSeconds(1 * kTimeMultiplier)); + ASSERT_OK( + test_client()->GetTablets(table1, 0, &table1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(table1_tablets.size(), 3); + + int num_inserts = 10; + for (int i = 0; i < num_inserts; i++) { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1, $2)", tableName1, i, i + 1)); + } + + xrepl::StreamId stream_id1 = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Create an index AFTER the stream has been created + ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx1 ON $0(b ASC)", tableName1)); + auto idx1 = + ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, Format("$0_idx1", tableName1))); + // Wait for the bg thread to complete finding out new tables added in the namespace and add + // them to CDC stream if relevant. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + google::protobuf::RepeatedPtrField idx1_tablets; + ASSERT_OK(test_client()->GetTablets(idx1, 0, &idx1_tablets, /* partition_list_version=*/nullptr)); + ASSERT_EQ(idx1_tablets.size(), 1); + + // Verify CDC stream metadata contains both table1 and the index table. + std::unordered_set expected_tables = {table1.table_id(), idx1.table_id()}; + + VerifyTablesInStreamMetadata( + stream_id1, expected_tables, + "Waiting for GetDBStreamInfo after creating an index creation after stream creation"); + + // Verify cdc state table contains entries from both table1 & index table. + std::unordered_set expected_tablets; + for (const auto& tablet : table1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + for (const auto& tablet : idx1_tablets) { + expected_tablets.insert(tablet.tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id1); + LOG(INFO) << "Stream contains the user table as well as index"; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_add_indexes_to_stream) = false; + + ASSERT_OK(WaitForFlushTables( + {idx1.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split the index's tablet. + WaitUntilSplitIsSuccesful(idx1_tablets.Get(0).tablet_id(), idx1, 2); + google::protobuf::RepeatedPtrField idx1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + idx1, 0, &idx1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(idx1_tablets_after_split.size(), 2); + + ASSERT_OK(WaitForFlushTables( + {table1.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split the table1's tablet. + WaitUntilSplitIsSuccesful(table1_tablets.Get(0).tablet_id(), table1, 4); + google::protobuf::RepeatedPtrField table1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + table1, 0, &table1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(table1_tablets_after_split.size(), 4); + + // wait for sometime so that tablet split codepath has completed adding new cdc state entries. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + std::unordered_set new_expected_tablets_in_state_table; + for (const auto& tablet : table1_tablets_after_split) { + new_expected_tablets_in_state_table.insert(tablet.tablet_id()); + } + + std::unordered_set tablets_not_expected_in_state_table; + for (const auto& tablet : idx1_tablets_after_split) { + tablets_not_expected_in_state_table.insert(tablet.tablet_id()); + } + + CDCStateTable cdc_state_table(test_client()); + bool seen_unexpected_tablets = false; + Status s; + auto table_range = + ASSERT_RESULT(cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeAll(), &s)); + for (auto row_result : table_range) { + ASSERT_OK(row_result); + auto& row = *row_result; + + if (row.key.stream_id == stream_id1) { + LOG(INFO) << "Read cdc_state table with tablet_id: " << row.key.tablet_id + << " stream_id: " << row.key.stream_id; + if (new_expected_tablets_in_state_table.contains(row.key.tablet_id)) { + new_expected_tablets_in_state_table.erase(row.key.tablet_id); + } + + if (tablets_not_expected_in_state_table.contains(row.key.tablet_id)) { + seen_unexpected_tablets = true; + break; + } + } + } + + bool seen_all_expected_tablets = new_expected_tablets_in_state_table.size() == 0 ? true : false; + ASSERT_FALSE(seen_unexpected_tablets); + ASSERT_TRUE(seen_all_expected_tablets); + LOG(INFO) << "CDC State table does not contain the children tablets of index's split tablet"; +} + +TEST_F( + CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToNonConsistentSnapshotStream) { + TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsistentSnapshotStream) { + TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + /* use_consistent_snapshot_stream */ true); +} + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index 43010f34647e..b2095ca52f9e 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -2742,7 +2742,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { return (tablets_after_split.size() == expected_num_tablets); }, - MonoDelta::FromSeconds(120), "Tabelt Split not succesful")); + MonoDelta::FromSeconds(120), "Tablet Split not succesful")); } void CDCSDKYsqlTest::CheckTabletsInCDCStateTable( diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 4e4fa39760c0..96eb4492100e 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -119,6 +119,8 @@ DECLARE_bool(enable_cdcsdk_setting_get_changes_response_byte_limit); DECLARE_uint64(cdcsdk_vwal_getchanges_resp_max_size_bytes); DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal); +DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream); +DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream); namespace yb { @@ -782,7 +784,7 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { std::string GetPubRefreshTimesString(vector pub_refresh_times); - void TestNonUserTableShouldNotGetAddedToCDCStream (bool create_consistent_snapshot_stream); + void TestNonEligibleTableShouldNotGetAddedToCDCStream(bool create_consistent_snapshot_stream); Status ExecuteYBAdminCommand( const std::string& command_name, const std::vector& command_args); @@ -799,6 +801,11 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { void TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( bool use_consistent_snapshot_stream); + + void TestNonEligibleTableRemovalFromCDCStream(bool use_consistent_snapshot_stream); + + void TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( + bool use_consistent_snapshot_stream); }; } // namespace cdc diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index c264f244885c..ca29d8f2dd9d 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -45,7 +45,6 @@ #include #include "yb/cdc/cdc_service.pb.h" -#include "yb/cdc/cdc_state_table.h" #include "yb/cdc/xcluster_types.h" #include "yb/common/constants.h" #include "yb/common/entity_ids.h" @@ -141,6 +140,7 @@ enum RaftGroupStatePB; namespace cdc { class CDCStateTable; +struct CDCStateTableEntry; } // namespace cdc namespace master { @@ -1495,15 +1495,27 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Find all CDCSDK streams which do not have metadata for the newly added tables. Status FindCDCSDKStreamsForAddedTables(TableStreamIdsMap* table_to_unprocessed_streams_map); + // Find all CDCSDK streams that contain non eligible tables like indexes, mat views etc. in + // their metadata. + Status FindCDCSDKStreamsForNonEligibleTables(TableStreamIdsMap* non_user_tables_to_streams_map); + bool IsTableEligibleForCDCSDKStream( - const TableInfoPtr& table_info, const Schema& schema) const REQUIRES_SHARED(mutex_); + const TableInfoPtr& table_info, const std::optional& schema) const + REQUIRES_SHARED(mutex_); // This method compares all tables in the namespace to all the tables added to a CDCSDK stream, // to find tables which are not yet processed by the CDCSDK streams. void FindAllTablesMissingInCDCSDKStream( const xrepl::StreamId& stream_id, - const google::protobuf::RepeatedPtrField& table_ids, const NamespaceId& ns_id) - REQUIRES(mutex_); + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) REQUIRES(mutex_); + + // This method compares all tables in the namespace eligible for a CDCSDK stream to all the tables + // added to a CDCSDK stream, to find indexes / mat views that are part of the CDCSDK streams. + void FindAllNonEligibleTablesInCDCSDKStream( + const xrepl::StreamId& stream_id, + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) REQUIRES(mutex_); Status ValidateCDCSDKRequestProperties( const CreateCDCStreamRequestPB& req, const std::string& source_type_option_value, @@ -1516,6 +1528,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Status ProcessNewTablesForCDCSDKStreams( const TableStreamIdsMap& table_to_unprocessed_streams_map, const LeaderEpoch& epoch); + Status RemoveNonEligibleTablesFromCDCSDKStreams( + const TableStreamIdsMap& non_user_tables_to_streams_map, const LeaderEpoch& epoch); + // Find all the CDC streams that have been marked as provided state. Result> FindXReplStreamsMarkedForDeletion( SysCDCStreamEntryPB::State deletion_state); @@ -3034,6 +3049,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf, void RemoveTableFromCDCSDKUnprocessedMap(const TableId& table_id, const NamespaceId& ns_id); + void RemoveTableFromCDCSDKNonEligibleTableMap(const TableId& table_id, const NamespaceId& ns_id); + void ClearXReplState() REQUIRES(mutex_); Status LoadXReplStream() REQUIRES(mutex_); Status LoadUniverseReplication() REQUIRES(mutex_); @@ -3203,6 +3220,13 @@ class CatalogManager : public tserver::TabletPeerLookupIf, std::unordered_map> namespace_to_cdcsdk_unprocessed_table_map_ GUARDED_BY(cdcsdk_unprocessed_table_mutex_); + mutable MutexType cdcsdk_non_eligible_table_mutex_; + // In-memory map containing non-eligble tables like indexes/ materialized views which got added to + // CDCSDK stream's metadata. Will be refreshed on master restart / leadership change through the + // function: 'FindAllNonEligibleTablesInCDCSDKStream'. + std::unordered_map> + namespace_to_cdcsdk_non_eligible_table_map_ GUARDED_BY(cdcsdk_non_eligible_table_mutex_); + // Map of all consumer tables that are part of xcluster replication, to a map of the stream infos. std::unordered_map xcluster_consumer_table_stream_ids_map_ GUARDED_BY(mutex_); diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index d0d5b48f5d3b..1b7d0227e6cd 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -82,6 +82,7 @@ DEFINE_test_flag(bool, cdcsdk_skip_processing_dynamic_table_addition, false, DECLARE_bool(enable_ysql); DECLARE_bool(TEST_echo_service_enabled); +DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream); namespace yb { namespace master { @@ -304,6 +305,31 @@ void CatalogManagerBgTasks::Run() { } } + { + if (FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) { + // Find if there are any non eligible tables (indexes, mat views) present in cdcsdk + // stream that are not associated with a replication slot. + TableStreamIdsMap non_user_tables_to_streams_map; + // In case of master leader restart or leadership changes, we would have scanned all + // streams (without replication slot) in ACTIVE/DELETING METADATA state for non eligible + // tables and marked such tables for removal in + // namespace_to_cdcsdk_non_eligible_table_map_. + Status s = catalog_manager_->FindCDCSDKStreamsForNonEligibleTables( + &non_user_tables_to_streams_map); + + if (s.ok() && !non_user_tables_to_streams_map.empty()) { + s = catalog_manager_->RemoveNonEligibleTablesFromCDCSDKStreams( + non_user_tables_to_streams_map, l.epoch()); + } + if (!s.ok()) { + YB_LOG_EVERY_N(WARNING, 10) + << "Encountered failure while trying to remove non eligible " + "tables from cdc_state table: " + << s.ToString(); + } + } + } + // Ensure the master sys catalog tablet follows the cluster's affinity specification. if (FLAGS_sys_catalog_respect_affinity_task) { Status s = catalog_manager_->SysCatalogRespectLeaderAffinity(); diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index 5febd70735f7..ab1a122272df 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -154,6 +154,13 @@ DEFINE_test_flag(bool, cdcsdk_skip_updating_cdc_state_entries_on_table_removal, "Skip updating checkpoint to max for cdc state table entries while removing a user table from " "CDCSDK stream."); +DEFINE_test_flag(bool, cdcsdk_add_indexes_to_stream, false, "Allows addition of index to a stream"); + +DEFINE_RUNTIME_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream, false, + "When enabled, all CDCSDK streams will be scanned for non-eligible tables like indexes, " + "materialised view etc. in their stream metadata and these tables will be marked for removal " + "by catalog manager background thread."); + DECLARE_bool(xcluster_wait_on_ddl_alter); DECLARE_int32(master_rpc_timeout_ms); DECLARE_bool(ysql_yb_enable_replication_commands); @@ -332,8 +339,17 @@ class CDCStreamLoader : public Visitor { if ((metadata.state() == SysCDCStreamEntryPB::ACTIVE || metadata.state() == SysCDCStreamEntryPB::DELETING_METADATA) && ns && ns->state() == SysNamespaceEntryPB::RUNNING) { + auto eligible_tables_info = catalog_manager_->FindAllTablesForCDCSDK(metadata.namespace_id()); catalog_manager_->FindAllTablesMissingInCDCSDKStream( - stream_id, metadata.table_id(), metadata.namespace_id()); + stream_id, metadata.table_id(), eligible_tables_info); + + // Check for any non-eligible tables like indexes, matview etc in CDC stream only if the + // stream is not associated with a replication slot. + if (FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream && + stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + catalog_manager_->FindAllNonEligibleTablesInCDCSDKStream( + stream_id, metadata.table_id(), eligible_tables_info); + } } LOG(INFO) << "Loaded metadata for CDC stream " << stream->ToString() << ": " @@ -1792,7 +1808,8 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( void CatalogManager::FindAllTablesMissingInCDCSDKStream( const xrepl::StreamId& stream_id, - const google::protobuf::RepeatedPtrField& table_ids, const NamespaceId& ns_id) { + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) { std::unordered_set stream_table_ids; // Store all table_ids associated with the stram in 'stream_table_ids'. for (const auto& table_id : table_ids) { @@ -1802,7 +1819,7 @@ void CatalogManager::FindAllTablesMissingInCDCSDKStream( // Get all the tables associated with the namespace. // If we find any table present only in the namespace, but not in the stream, we add the table // id to 'cdcsdk_unprocessed_tables'. - for (const auto& table_info : FindAllTablesForCDCSDK(ns_id)) { + for (const auto& table_info : eligible_tables_info) { auto ltm = table_info->LockForRead(); if (!stream_table_ids.contains(table_info->id())) { LOG(INFO) << "Found unprocessed table: " << table_info->id() @@ -1814,6 +1831,118 @@ void CatalogManager::FindAllTablesMissingInCDCSDKStream( } } +Status CatalogManager::FindCDCSDKStreamsForNonEligibleTables( + TableStreamIdsMap* non_user_tables_to_streams_map) { + std::unordered_map> namespace_to_non_user_table_map; + { + SharedLock lock(cdcsdk_non_eligible_table_mutex_); + int32_t found_non_user_tables = 0; + for (const auto& [ns_id, table_ids] : namespace_to_cdcsdk_non_eligible_table_map_) { + for (const auto& table_id : table_ids) { + namespace_to_non_user_table_map[ns_id].insert(table_id); + if (++found_non_user_tables >= FLAGS_cdcsdk_table_processing_limit_per_run) { + break; + } + } + } + } + + { + SharedLock lock(mutex_); + for (const auto& [stream_id, stream_info] : cdc_stream_map_) { + if (stream_info->namespace_id().empty()) { + continue; + } + + // Removal of non-eligible tables will only be done on CDC stream that are not associated with + // a replication slot. + if (!stream_info->GetCdcsdkYsqlReplicationSlotName().empty()) { + continue; + } + + const auto non_user_tables = + FindOrNull(namespace_to_non_user_table_map, stream_info->namespace_id()); + if (!non_user_tables) { + continue; + } + + auto ltm = stream_info->LockForRead(); + if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE || + ltm->pb.state() == SysCDCStreamEntryPB::DELETING_METADATA) { + for (const auto& non_user_table_id : *non_user_tables) { + auto table = tables_->FindTableOrNull(non_user_table_id); + if (!table) { + LOG_WITH_FUNC(WARNING) + << "Table " << non_user_table_id << " deleted before it could be removed"; + continue; + } + + if (std::find(ltm->table_id().begin(), ltm->table_id().end(), non_user_table_id) != + ltm->table_id().end()) { + (*non_user_tables_to_streams_map)[non_user_table_id].push_back(stream_info); + VLOG(1) << "Will try and remove table: " << non_user_table_id + << ", from stream: " << stream_info->id(); + } + } + } + } + } + + for (const auto& [ns_id, non_user_table_ids] : namespace_to_non_user_table_map) { + for (const auto& non_user_table_id : non_user_table_ids) { + if (!non_user_tables_to_streams_map->contains(non_user_table_id)) { + // This means we found no active CDCSDK stream where this table was present, hence we can + // remove this table from 'namespace_to_cdcsdk_non_eligible_table_map_'. + RemoveTableFromCDCSDKNonEligibleTableMap(non_user_table_id, ns_id); + } + } + } + + return Status::OK(); +} + +void CatalogManager::FindAllNonEligibleTablesInCDCSDKStream( + const xrepl::StreamId& stream_id, + const google::protobuf::RepeatedPtrField& table_ids, + const std::vector& eligible_tables_info) { + // If we find any table present only in the the stream, but not in the list of eligible tables in + // namespace for CDC, we add the table id to 'namespace_to_cdcsdk_non_eligible_table_map_'. + std::unordered_set user_table_ids; + for (const auto& table_info : eligible_tables_info) { + user_table_ids.insert(table_info->id()); + } + + std::unordered_set stream_table_ids; + // Store all table_ids associated with the stream in 'stream_table_ids'. + for (const auto& table_id : table_ids) { + if (!user_table_ids.contains(table_id)) { + auto table_info = GetTableInfoUnlocked(table_id); + Schema schema; + Status status = table_info->GetSchema(&schema); + if (!status.ok()) { + LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name(); + // Skip this table for now, it will be revisited for removal on master restart/master leader + // change. + continue; + } + + // Re-confirm this table is not meant to be part of a CDC stream. + if (!IsTableEligibleForCDCSDKStream(table_info, schema)) { + LOG(INFO) << "Found a non-eligible table: " << table_info->id() + << ", for stream: " << stream_id; + LockGuard lock(cdcsdk_non_eligible_table_mutex_); + namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert( + table_info->id()); + } else { + // Ideally we are not expected to enter the else clause. + LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id + << " that is not present in the eligible list of tables " + "from the namespace for CDC"; + } + } + } +} + Status CatalogManager::ValidateCDCSDKRequestProperties( const CreateCDCStreamRequestPB& req, const std::string& source_type_option_value, const std::string& record_type_option_value, const std::string& id_type_option_value) { @@ -1910,24 +2039,32 @@ std::vector CatalogManager::FindAllTablesForCDCSDK(const Namespace } bool CatalogManager::IsTableEligibleForCDCSDKStream( - const TableInfoPtr& table_info, const Schema& schema) const { - bool has_pk = true; - bool has_invalid_pg_typeoid = false; - for (const auto& col : schema.columns()) { - if (col.order() == static_cast(PgSystemAttrNum::kYBRowId)) { - // ybrowid column is added for tables that don't have user-specified primary key. - VLOG(1) << "Table: " << table_info->id() - << ", will not be added to CDCSDK stream, since it does not have a primary key"; - has_pk = false; - break; + const TableInfoPtr& table_info, const std::optional& schema) const { + if (schema.has_value()) { + bool has_pk = true; + bool has_invalid_pg_typeoid = false; + for (const auto& col : schema->columns()) { + if (col.order() == static_cast(PgSystemAttrNum::kYBRowId)) { + // ybrowid column is added for tables that don't have user-specified primary key. + VLOG(1) << "Table: " << table_info->id() + << ", will not be added to CDCSDK stream, since it does not have a primary key"; + has_pk = false; + break; + } + if (col.pg_type_oid() == 0) { + has_invalid_pg_typeoid = true; + } } - if (col.pg_type_oid() == 0) { - has_invalid_pg_typeoid = true; + if (!has_pk || has_invalid_pg_typeoid) { + if (FLAGS_TEST_cdcsdk_add_indexes_to_stream) { + // allow adding user created indexes to CDC stream. + if (IsUserIndexUnlocked(*table_info)) { + return true; + } + } + return false; } } - if (!has_pk || has_invalid_pg_typeoid) { - return false; - } if (IsMatviewTable(*table_info)) { // Materialized view should not be added as they are not supported for streaming. @@ -2111,6 +2248,82 @@ Status CatalogManager::ProcessNewTablesForCDCSDKStreams( return Status::OK(); } +Status CatalogManager::RemoveNonEligibleTablesFromCDCSDKStreams( + const TableStreamIdsMap& non_user_tables_to_streams_map, const LeaderEpoch& epoch) { + int32_t removed_non_user_tables = 0; + for (const auto& [table_id, streams] : non_user_tables_to_streams_map) { + if (removed_non_user_tables >= FLAGS_cdcsdk_table_processing_limit_per_run) { + VLOG(1) + << "Reached the limit of number of non-eligible tables to be removed per iteration. Will " + "remove the remaining tables in the next iteration."; + break; + } + + // Delete the table from all streams now. + NamespaceId namespace_id; + bool stream_pending = false; + Status status; + for (const auto& stream : streams) { + if PREDICT_FALSE (stream == nullptr) { + LOG(WARNING) << "Could not find CDC stream: " << stream->id(); + continue; + } + + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + for(const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + // Explicitly remove the table from the set since we want to remove the tablet entries of this + // table from the cdc state table. + tables_in_stream_metadata.erase(table_id); + auto result = UpdateCheckpointForTabletEntriesInCDCState( + stream->StreamId(), tables_in_stream_metadata, table_id); + + if (!result.ok()) { + LOG(WARNING) << "Encountered error while trying to update/delete tablets entries of table: " + << table_id << ", from cdc_state table for stream" << stream->id() << ": " + << status; + stream_pending = true; + continue; + } + + { + auto stream_lock = stream->LockForWrite(); + if (stream_lock->is_deleting()) { + continue; + } + } + + Status status = RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id); + if (!status.ok()) { + LOG(WARNING) << "Encountered error while trying to remove non-eligible table " << table_id + << " from metadata of stream " << stream->StreamId() << " and maps. "; + stream_pending = true; + continue; + } + + LOG(INFO) + << "Succesfully removed non-eligible table " << table_id + << " from stream metadata and updated corresponding cdc_state table entries for stream: " + << stream->id(); + + namespace_id = stream->namespace_id(); + } + + // Remove non_user tables from 'namespace_to_cdcsdk_non_user_table_map_'. + if (!stream_pending) { + RemoveTableFromCDCSDKNonEligibleTableMap(table_id, namespace_id); + } + ++removed_non_user_tables; + } + + return Status::OK(); +} + void CatalogManager::RemoveTableFromCDCSDKUnprocessedMap( const TableId& table_id, const NamespaceId& ns_id) { LockGuard lock(cdcsdk_unprocessed_table_mutex_); @@ -2123,6 +2336,20 @@ void CatalogManager::RemoveTableFromCDCSDKUnprocessedMap( } } +void CatalogManager::RemoveTableFromCDCSDKNonEligibleTableMap( + const TableId& table_id, const NamespaceId& ns_id) { + LockGuard lock(cdcsdk_non_eligible_table_mutex_); + auto non_user_tables = FindOrNull(namespace_to_cdcsdk_non_eligible_table_map_, ns_id); + if (!non_user_tables) { + return; + } + + non_user_tables->erase(table_id); + if (non_user_tables->empty()) { + namespace_to_cdcsdk_non_eligible_table_map_.erase(ns_id); + } +} + Result> CatalogManager::FindXReplStreamsMarkedForDeletion( SysCDCStreamEntryPB::State deletion_state) { std::vector streams; @@ -4325,6 +4552,24 @@ Status CatalogManager::UpdateCDCProducerOnTabletSplit( std::vector streams; std::vector entries; for (const auto stream_type : {cdc::XCLUSTER, cdc::CDCSDK}) { + if (stream_type == cdc::CDCSDK && + FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) { + const auto& table_info = GetTableInfo(producer_table_id); + // Skip adding children tablet entries in cdc state if the table is an index or a mat view. + // These tables, if present in CDC stream, are anyway going to be removed by a bg thread. This + // check ensures even if there is a race condition where a tablet of a non-eligible table + // splits and concurrently we are removing such tables from stream, the child tables do not + // get added. + { + SharedLock lock(mutex_); + if (!IsTableEligibleForCDCSDKStream(table_info, std::nullopt)) { + LOG(INFO) << "Skipping adding children tablets to cdc state for table " + << producer_table_id << " as it is not meant to part of a CDC stream"; + continue; + } + } + } + { SharedLock lock(mutex_); streams = GetXReplStreamsForTable(producer_table_id, stream_type); @@ -6239,7 +6484,7 @@ Status CatalogManager::RemoveUserTableFromCDCSDKStream( if (!stream->IsDynamicTableAdditionDisabled()) { RETURN_INVALID_REQUEST_STATUS( "Cannot remove table unless dynamic table addition is disabled for the stream. Please use " - "the yb-admin command \"disable_dynamic_table_addition_in_change_data_stream\" to disable " + "the yb-admin command \"disable_dynamic_table_addition_on_change_data_stream\" to disable " "dynamic table addition on the stream."); } @@ -6287,8 +6532,8 @@ Status CatalogManager::RemoveUserTableFromCDCSDKStream( // table from the cdc state table. tables_in_stream_metadata.erase(table_id); RETURN_NOT_OK_PREPEND( - UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata), - "Error updating tablet entries from cdc state table"); + UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata, table_id), + "Error updating/deleting tablet entries from cdc state table"); } // Now remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist @@ -6297,9 +6542,9 @@ Status CatalogManager::RemoveUserTableFromCDCSDKStream( RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id), "Error removing table from stream metadata and maps"); - LOG_WITH_FUNC(INFO) - << "Successfully removed table " << table_id << " from CDC stream: " << stream_id - << " and updated the checkpoint to max for corresponding cdc state table entries."; + LOG_WITH_FUNC(INFO) << "Successfully removed table " << table_id + << " from CDC stream: " << stream_id + << " and updated/deleted corresponding cdc state table entries."; return Status::OK(); } @@ -7715,6 +7960,11 @@ CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( const TableId& table_to_be_removed) { std::unordered_set tablet_entries_to_be_removed; + // This will only contain entries for colocated tables that have a composite value in the + // stream_id column i.e. in the form of stream_id_table_id. Such entries will have to be directly + // deleted as UpdatePeersAndMetrics ignores these entries. + std::vector cdc_state_entries_to_be_deleted; + // If the table_id to be removed is provided, we will only find out cdc state table entries // corresponding to this table and update their checkpoints. Otherwise, we'll consider all state // table entries for checkpoint update. @@ -7746,11 +7996,19 @@ CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( const auto& entry = *entry_result; if (entry.key.stream_id == stream_id) { - // If table_id is provided, filter out state entries belonging to tablets of the table. - if (table_to_be_removed.empty() || - (!table_to_be_removed.empty() && - tablet_entries_to_be_removed.contains(entry.key.tablet_id))) { - cdc_state_tablet_entries.push_back(entry.key.tablet_id); + // For updating the checkpoint, only consider entries that do not have a colocated table_id as + // these will be manually deleted. + if (entry.key.colocated_table_id.empty()) { + // If table_id is provided, filter out state entries belonging to tablets of the table. + if (table_to_be_removed.empty() || + tablet_entries_to_be_removed.contains(entry.key.tablet_id)) { + cdc_state_tablet_entries.push_back(entry.key.tablet_id); + } + } else if (entry.key.colocated_table_id == table_to_be_removed) { + // If the entry contain a colocated_table_id, it belongs to one of the colocated + // tables on that tablet. If this colocated_table_id matches with the table being removed, + // then we'll delete this entry directly. + cdc_state_entries_to_be_deleted.push_back(entry.key); } } } @@ -7782,15 +8040,28 @@ CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( } if (!entries_to_update.empty()) { - LOG_WITH_FUNC(INFO) - << "Updating checkpoint to max for " << entries_to_update.size() - << " cdc state entries as part of validating cdc state table entries for CDC stream: " - << stream_id; + LOG_WITH_FUNC(INFO) << "Setting checkpoint to max for " << entries_to_update.size() + << " cdc state entries for CDC stream: " << stream_id; RETURN_NOT_OK_PREPEND( cdc_state_table_->UpdateEntries(entries_to_update), "Error setting checkpoint to OpId::Max() in cdc_state table"); } + if (!cdc_state_entries_to_be_deleted.empty()) { + // Only 1 entry is expected for a colocated table that is being removed. + RSTATUS_DCHECK( + cdc_state_entries_to_be_deleted.size() == 1, IllegalState, + "Found more than one cdc state table entry that needs to be deleted for removing table $0 " + "from CDC stream $1", + table_to_be_removed, stream_id); + + LOG_WITH_FUNC(INFO) << "Deleting cdc state table entry (tablet,stream) - " + << cdc_state_entries_to_be_deleted[0].ToString(); + RETURN_NOT_OK_PREPEND( + cdc_state_table_->DeleteEntries(cdc_state_entries_to_be_deleted), + "Error deleting entries from cdc_state table"); + } + return entries_to_update; } diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index caacae7ae9c9..9796310c6a44 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -3848,7 +3848,7 @@ Status ClusterAdminClient::ValidateAndSyncCDCStateEntriesForCDCSDKStream( req.set_stream_id(stream_id); RpcController rpc; - rpc.set_timeout(timeout_); + rpc.set_timeout(MonoDelta::FromSeconds(std::max(timeout_.ToSeconds(), 120.0))); RETURN_NOT_OK( master_replication_proxy_->ValidateAndSyncCDCStateEntriesForCDCSDKStream(req, &resp, &rpc));