Skip to content

Commit

Permalink
[#22876][#22835][#22773] CDCSDK: Remove non-eligible tables for CDC f…
Browse files Browse the repository at this point in the history
…rom existing CDCSDK stream

Summary:
Some non-eligible tables like indexes etc. created after creation of a CDC stream were getting added to the CDC stream due to [[ https://phorge.dev.yugabyte.com/D35856 | this missing logic in addition of dynamic tables codepath ]]. We do not hold retention barriers on tablets of such tables until and unless they split, in which case, we start holding retention barriers on the children tablets. This leads to heavy resource usage over time and since the CDC stream never polls on tables of such tables, retention barriers are not lifted until the active time of these tablets exceed `cdc_intent_retention_ms`.

Therefore, to prevent resource consumption from such tables, we want to achieve the following:
1. Remove these non-eligible tables from the stream metadata so that any further tablet splitting on these tables do not lead to addition of children tablets in cdc state table.
2. Release retention barriers on the existing tablets that are part of the cdc state table and finally remove these state table entries.

We have followed the same pattern of achieving the above tasks via a background thread, exactly similar to addition of dynamic table in CDC streams.

**Working:**

  - //FindAllNonUserTablesInCDCSDKStream// - On a master restart/leadership change, while loading CDCSDK streams into memory, we will compute the set difference between tables present in stream metadata and tables in the namespace that are eligible for a CDC stream. This set difference will give us the set of non-eligible tables that were not supposed to get added to the CDC stream, but got added because of the above mentioned bug. These non-eligible tables will be added to `namespace_to_cdcsdk_non_user_table_map_` which is further processed in catalog manager background thread by //FindCDCSDKStreamsForNonUserTables//.

 The bg thread of catalog manager (CatalogManagerBgTasks), with the following methods handles the actual table removal:

  - //FindCDCSDKStreamsForNonUserTables//: This method is run in every subsequent iteration of the bg thread of catalog manager. It scans the cdc_stream_map_ and finds all streams in ACTIVE/DELETING METADATA state which have the non-eligible table entry in stream metadata, and collects the details to be further processed by //RemoveNonUserTablesForCDCSDKStreams//.
  - //RemoveNonUserTablesForCDCSDKStreams//: This method is run after FindCDCSDKStreamsForNonUserTables and does the following for each stream that contains the non-eligible table entry:
     1. Update the checkpoint of cdc state entries related to non-eligible table to OpId max. Incase of colocated tables, entries with a colocated_table_id will be deleted.
     2. Removes the table from stream metadata and cdcsdk_tables_to_stream_map_.
    Once the table is removed from all relevant CDC streams, then we remove the table entry from `namespace_to_cdcsdk_non_user_table_map_`.

Note:
1. To enable this cleanup of non-eligible tables, user has to set the master flag `enable_cleanup_of_non_eligible_tables_from_cdcsdk_stream`.
2. In single iteration of the bg thread, we only process two non-eligible tables across all namespaces. This processing limit is configurable and we are reusing the existing flag `cdcsdk_table_processing_limit_per_run`.

Additionally, in the tablet split codepath, before adding cdc state entries for children tables, we will now check if the table is a non-eligible table for CDC stream or not. This also helps in preventing a race condition when a tablet of a non-eligible is split and concurrently, there was a master restart/leadership changes and we are trying to remove the table from stream metadata.
Jira: DB-11778, DB-11733, DB-11676

Test Plan:
Jenkins: urgent
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTableRemovalFromNonConsistentSnapshotCDCStream
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTableRemovalFromConsistentSnapshotCDCStream

./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestChildTabletsOfNonEligibleTableDoNotGetAddedToNonConsistentSnapshotStream
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsistentSnapshotStream

Reviewers: asrinivasan, stiwary, skumar

Reviewed By: stiwary

Subscribers: ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36031
  • Loading branch information
siddharth2411 authored and jaki committed Jun 28, 2024
1 parent 92827b3 commit e7ad4f8
Show file tree
Hide file tree
Showing 7 changed files with 695 additions and 47 deletions.
334 changes: 327 additions & 7 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/yb/integration-tests/cdcsdk_ysql_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2742,7 +2742,7 @@ Result<string> CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) {

return (tablets_after_split.size() == expected_num_tablets);
},
MonoDelta::FromSeconds(120), "Tabelt Split not succesful"));
MonoDelta::FromSeconds(120), "Tablet Split not succesful"));
}

void CDCSDKYsqlTest::CheckTabletsInCDCStateTable(
Expand Down
9 changes: 8 additions & 1 deletion src/yb/integration-tests/cdcsdk_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ DECLARE_bool(enable_cdcsdk_setting_get_changes_response_byte_limit);
DECLARE_uint64(cdcsdk_vwal_getchanges_resp_max_size_bytes);
DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option);
DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal);
DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream);
DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream);

namespace yb {

Expand Down Expand Up @@ -782,7 +784,7 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {

std::string GetPubRefreshTimesString(vector<uint64_t> pub_refresh_times);

void TestNonUserTableShouldNotGetAddedToCDCStream (bool create_consistent_snapshot_stream);
void TestNonEligibleTableShouldNotGetAddedToCDCStream(bool create_consistent_snapshot_stream);

Status ExecuteYBAdminCommand(
const std::string& command_name, const std::vector<string>& command_args);
Expand All @@ -799,6 +801,11 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {

void TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval(
bool use_consistent_snapshot_stream);

void TestNonEligibleTableRemovalFromCDCStream(bool use_consistent_snapshot_stream);

void TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream(
bool use_consistent_snapshot_stream);
};

} // namespace cdc
Expand Down
32 changes: 28 additions & 4 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include <gtest/internal/gtest-internal.h>

#include "yb/cdc/cdc_service.pb.h"
#include "yb/cdc/cdc_state_table.h"
#include "yb/cdc/xcluster_types.h"
#include "yb/common/constants.h"
#include "yb/common/entity_ids.h"
Expand Down Expand Up @@ -141,6 +140,7 @@ enum RaftGroupStatePB;

namespace cdc {
class CDCStateTable;
struct CDCStateTableEntry;
} // namespace cdc

namespace master {
Expand Down Expand Up @@ -1495,15 +1495,27 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
// Find all CDCSDK streams which do not have metadata for the newly added tables.
Status FindCDCSDKStreamsForAddedTables(TableStreamIdsMap* table_to_unprocessed_streams_map);

// Find all CDCSDK streams that contain non eligible tables like indexes, mat views etc. in
// their metadata.
Status FindCDCSDKStreamsForNonEligibleTables(TableStreamIdsMap* non_user_tables_to_streams_map);

bool IsTableEligibleForCDCSDKStream(
const TableInfoPtr& table_info, const Schema& schema) const REQUIRES_SHARED(mutex_);
const TableInfoPtr& table_info, const std::optional<Schema>& schema) const
REQUIRES_SHARED(mutex_);

// This method compares all tables in the namespace to all the tables added to a CDCSDK stream,
// to find tables which are not yet processed by the CDCSDK streams.
void FindAllTablesMissingInCDCSDKStream(
const xrepl::StreamId& stream_id,
const google::protobuf::RepeatedPtrField<std::string>& table_ids, const NamespaceId& ns_id)
REQUIRES(mutex_);
const google::protobuf::RepeatedPtrField<std::string>& table_ids,
const std::vector<TableInfoPtr>& eligible_tables_info) REQUIRES(mutex_);

// This method compares all tables in the namespace eligible for a CDCSDK stream to all the tables
// added to a CDCSDK stream, to find indexes / mat views that are part of the CDCSDK streams.
void FindAllNonEligibleTablesInCDCSDKStream(
const xrepl::StreamId& stream_id,
const google::protobuf::RepeatedPtrField<std::string>& table_ids,
const std::vector<TableInfoPtr>& eligible_tables_info) REQUIRES(mutex_);

Status ValidateCDCSDKRequestProperties(
const CreateCDCStreamRequestPB& req, const std::string& source_type_option_value,
Expand All @@ -1516,6 +1528,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
Status ProcessNewTablesForCDCSDKStreams(
const TableStreamIdsMap& table_to_unprocessed_streams_map, const LeaderEpoch& epoch);

Status RemoveNonEligibleTablesFromCDCSDKStreams(
const TableStreamIdsMap& non_user_tables_to_streams_map, const LeaderEpoch& epoch);

// Find all the CDC streams that have been marked as provided state.
Result<std::vector<CDCStreamInfoPtr>> FindXReplStreamsMarkedForDeletion(
SysCDCStreamEntryPB::State deletion_state);
Expand Down Expand Up @@ -3034,6 +3049,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf,

void RemoveTableFromCDCSDKUnprocessedMap(const TableId& table_id, const NamespaceId& ns_id);

void RemoveTableFromCDCSDKNonEligibleTableMap(const TableId& table_id, const NamespaceId& ns_id);

void ClearXReplState() REQUIRES(mutex_);
Status LoadXReplStream() REQUIRES(mutex_);
Status LoadUniverseReplication() REQUIRES(mutex_);
Expand Down Expand Up @@ -3203,6 +3220,13 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
std::unordered_map<NamespaceId, std::unordered_set<TableId>>
namespace_to_cdcsdk_unprocessed_table_map_ GUARDED_BY(cdcsdk_unprocessed_table_mutex_);

mutable MutexType cdcsdk_non_eligible_table_mutex_;
// In-memory map containing non-eligble tables like indexes/ materialized views which got added to
// CDCSDK stream's metadata. Will be refreshed on master restart / leadership change through the
// function: 'FindAllNonEligibleTablesInCDCSDKStream'.
std::unordered_map<NamespaceId, std::unordered_set<TableId>>
namespace_to_cdcsdk_non_eligible_table_map_ GUARDED_BY(cdcsdk_non_eligible_table_mutex_);

// Map of all consumer tables that are part of xcluster replication, to a map of the stream infos.
std::unordered_map<TableId, XClusterConsumerTableStreamIds>
xcluster_consumer_table_stream_ids_map_ GUARDED_BY(mutex_);
Expand Down
26 changes: 26 additions & 0 deletions src/yb/master/catalog_manager_bg_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ DEFINE_test_flag(bool, cdcsdk_skip_processing_dynamic_table_addition, false,

DECLARE_bool(enable_ysql);
DECLARE_bool(TEST_echo_service_enabled);
DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream);

namespace yb {
namespace master {
Expand Down Expand Up @@ -304,6 +305,31 @@ void CatalogManagerBgTasks::Run() {
}
}

{
if (FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) {
// Find if there are any non eligible tables (indexes, mat views) present in cdcsdk
// stream that are not associated with a replication slot.
TableStreamIdsMap non_user_tables_to_streams_map;
// In case of master leader restart or leadership changes, we would have scanned all
// streams (without replication slot) in ACTIVE/DELETING METADATA state for non eligible
// tables and marked such tables for removal in
// namespace_to_cdcsdk_non_eligible_table_map_.
Status s = catalog_manager_->FindCDCSDKStreamsForNonEligibleTables(
&non_user_tables_to_streams_map);

if (s.ok() && !non_user_tables_to_streams_map.empty()) {
s = catalog_manager_->RemoveNonEligibleTablesFromCDCSDKStreams(
non_user_tables_to_streams_map, l.epoch());
}
if (!s.ok()) {
YB_LOG_EVERY_N(WARNING, 10)
<< "Encountered failure while trying to remove non eligible "
"tables from cdc_state table: "
<< s.ToString();
}
}
}

// Ensure the master sys catalog tablet follows the cluster's affinity specification.
if (FLAGS_sys_catalog_respect_affinity_task) {
Status s = catalog_manager_->SysCatalogRespectLeaderAffinity();
Expand Down
Loading

0 comments on commit e7ad4f8

Please sign in to comment.