Skip to content

Commit

Permalink
[BACKPORT 2.20.5] [#22876][#22773] CDCSDK: Add new yb-admin command t…
Browse files Browse the repository at this point in the history
…o remove user table from CDCSDK stream

Summary:
**Backport description:**
Faced merge conflicts because of missing macro, utility methods. Fixed some and added some missing methods.

**Original description:**
Original commit : 7c99ff9 / D35870

This diff introduces three new yb-admin commands required to remove a **user table** from a CDCSDK stream.
**`NOTE: All three commands are only meant to be used on CDC streams that are not associated with a replication slot.`**

**Command-1**: yb-admin command to disable dynamic table addition in a CDC stream. Only works when the new auto flag `enable_cdcsdk_dynamic_tables_disable_option` is set to true. **Note, post execution of this command, no dynamic tables (user/non-user) will get added to CDC stream. Additionally, there is no option to re-enable dynamic table addition for the stream.**
```
yb-admin \
    -master_addresses <master-addresses> \
    disable_dynamic_table_addition_in_change_data_stream <stream_id>
```
The command works with a single stream_id.

**Command-2**: yb-admin command to remove only a particular **user** table from the CDC stream metadata as well as update the checkpoint for corresponding state table entries to OpId max. Since, the checkpoint is set to max, these entries will be later deleted from the cdc state table by a separate thread (UpdatePeersAndMetrics).

```
yb-admin \
    -master_addresses <master-addresses> \
    remove_user_table_from_change_data_stream <stream_id> <table_id>
```
The command works with a single stream_id & table_id.

**Command-3**: yb-admin command to validate cdc state table entries for a particular stream. As part of validation, if the table of any cdc state table entry is not present in the CDC stream metadata, then checkpoint of such entries will be updated to OpID max, and they'll be later deleted by a separate thread (UpdatePeersAndMetrics).

```
yb-admin \
    -master_addresses <master-addresses> \
    validate_and_sync_cdc_state_table_entries_on_change_data_stream <stream_id>
```
The command works with a single stream_id.

**Advisory for command-usage:**
General guidelines that need to strictly followed while executing these commands:
  - Ensure no DDLs are performed before/after 15 mins of executing these commands.

These yb-admin commands are meant to be used when a user is only interested on polling from subset of tables in the namespace. Therefore, the user can remove the extra tables from CDC stream that are not supposed to be polled. To achieve this, user needs to first execute Command-1, followed by command-2 & command-3.

Example:
Starting state: 5 user tables (t1 to t5) in the CDC stream including 4 extra tables that are not polled (t1,t2,t3,t4) + 2 indexes (i1,i2)
Target state: Only t5 + 2 indexes (i1,i2) should be present in CDC stream.

To reach the target state, we need to remove 4 user tables (t1-t4) from stream metadata & their state entries

**Perform the following steps to remove user tables from the CDC stream:**

  # Firstly, disable dynamic table addition using command-1.
  # Confirm that dynamic table addition is disabled by running `list_change_data_streams` yb-admin command. The output for that stream would contain the string `cdcsdk_disable_dynamic_table_addition: true`
  # Remove the table from stream metadata & update its state table entries using command-2.
  # Confirm that the table is removed from stream metadata by re-running `list_change_data_streams` command.
  # Based on when the user reads the cdc state table (via cqlsh), the state table entries corresponding to this table would have been either updated to checkpoint max or may be removed. Note, State table entries deletion might take some time as it will be done in a separate thread.
  # Repeat step 3-5 for all user tables that needs to be removed.
  # At the end, once all extra user tables are removed from a stream, execute command-3 as a sanity check to get rid of any cdc state entries that might still be hanging around in state table but the corresponding table has been removed from stream metadata. One scenario where cdc state table entries might be present even after table is removed, is when a tablet splits while table was being removed from stream metadata. In this case, the children tablet entries will get added to cdc state table and so they'll get removed when command-3 is executed.

**Working**:
Command-1 internally calls //DisableDynamicTableAdditionOnCDCSDKStream// RPC that will set the optional field `cdcsdk_disable_dynamic_table_addition` in stream metadata to true. This will prevent any tables, that are not yet part of the CDC stream, to get added to the CDC stream.

Command-2 internally calls //RemoveUserTableFromCDCSDKStream// RPC that performs the following:
  # Update the checkpoint of tablet entries for the given table in the CDC state table to `OpId::Max()`. This is done to release the retention barriers on these tables and allow the deletion of the state table entry  by UpdatePeersAndMetrics.
  # Remove the table from CDC stream metadata, //cdcsdk_tables_to_stream_map_// and persist the updated metadata in sys catalog.

Command-3 internally calls //ValidateAndSyncCDCStateEntriesForCDCSDKStream// RPC that updates checkpoint to max for cdc state table entries whose table is not found in the CDC stream metadata.

**Upgrade/Rollback safety:**
//cdcsdk_disable_dynamic_table_addition// - added a new optional field in existing protos SysCDCStreamEntryPB, CDCStreamInfoPB. This field is protected and will only be read when the new auto flag `cdcsdk_enable_dynamic_tables_disable_option` is set.

Introduced request, response proto for new RPCs:

  - DisableDynamicTableAdditionOnCDCSDKStream - DisableDynamicTableAdditionOnCDCSDKStreamRequestPB,  DisableDynamicTableAdditionOnCDCSDKStreamResponsePB
  - RemoveUserTableFromCDCSDKStream - RemoveUserTableFromCDCSDKStreamRequestPB,  RemoveUserTableFromCDCSDKStreamResponsePB
  - ValidateAndSyncCDCStateEntriesForCDCSDKStream - ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB, ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB

Jira: DB-11778, DB-11676

Test Plan:
Jenkins: urgent
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestDisableOfDynamicTableAdditionOnNonConsistentSnapshotStream
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestDisableOfDynamicTableAdditionOnConsistentSnapshotStream

./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestUserTableRemovalFromNonConsistentSnapshotCDCStream
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestUserTableRemovalFromConsistentSnapshotCDCStream

./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnNonConsistentSnapshotStream
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnConsistentSnapshotStream

Reviewers: asrinivasan, stiwary, skumar

Reviewed By: asrinivasan, stiwary

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36128
  • Loading branch information
siddharth2411 committed Jun 26, 2024
1 parent 0fe67e3 commit f23350f
Show file tree
Hide file tree
Showing 13 changed files with 1,049 additions and 4 deletions.
356 changes: 356 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc

Large diffs are not rendered by default.

69 changes: 69 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2043,6 +2043,27 @@ namespace cdc {
return get_resp;
}

void CDCSDKYsqlTest::VerifyTablesInStreamMetadata(
const xrepl::StreamId& stream_id, const std::unordered_set<std::string>& expected_table_ids,
const std::string& timeout_msg) {
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
auto get_resp = GetDBStreamInfo(stream_id);
if (get_resp.ok() && !get_resp->has_error()) {
const uint64_t table_info_size = get_resp->table_info_size();
if (table_info_size == expected_table_ids.size()) {
std::unordered_set<std::string> table_ids;
for (auto entry : get_resp->table_info()) {
table_ids.insert(entry.table_id());
}
if (expected_table_ids == table_ids) return true;
}
}
return false;
},
MonoDelta::FromSeconds(60), timeout_msg));
}

Status CDCSDKYsqlTest::ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id) {
CHECK(!FLAGS_enable_load_balancing);

Expand Down Expand Up @@ -3674,5 +3695,53 @@ namespace cdc {
}


Status CDCSDKYsqlTest::ExecuteYBAdminCommand(
const std::string& command_name, const std::vector<string>& command_args) {
string tool_path = GetToolPath("../bin", "yb-admin");
vector<string> argv;
argv.push_back(tool_path);
argv.push_back("--master_addresses");
argv.push_back(AsString(test_cluster_.mini_cluster_->GetMasterAddresses()));
argv.push_back(command_name);
for (const auto& command_arg : command_args) {
argv.push_back(command_arg);
}

RETURN_NOT_OK(Subprocess::Call(argv));

return Status::OK();
}

Status CDCSDKYsqlTest::DisableDynamicTableAdditionOnCDCSDKStream(
const xrepl::StreamId& stream_id) {
std::string yb_admin_command = "disable_dynamic_table_addition_on_change_data_stream";
vector<string> command_args;
command_args.push_back(stream_id.ToString());
RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args));
return Status::OK();
}

Status CDCSDKYsqlTest::RemoveUserTableFromCDCSDKStream(
const xrepl::StreamId& stream_id, const TableId& table_id) {
std::string yb_admin_command = "remove_user_table_from_change_data_stream";
vector<string> command_args;
command_args.push_back(stream_id.ToString());
command_args.push_back(table_id);
RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args));

return Status::OK();
}

Status CDCSDKYsqlTest::ValidateAndSyncCDCStateEntriesForCDCSDKStream(
const xrepl::StreamId& stream_id) {
std::string yb_admin_command =
"validate_and_sync_cdc_state_table_entries_on_change_data_stream";
vector<string> command_args;
command_args.push_back(stream_id.ToString());
RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args));

return Status::OK();
}

} // namespace cdc
} // namespace yb
22 changes: 22 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ DECLARE_bool(enable_log_retention_by_op_idx);
DECLARE_bool(yb_enable_cdc_consistent_snapshot_streams);
DECLARE_uint32(cdcsdk_tablet_not_of_interest_timeout_secs);
DECLARE_uint32(cdcsdk_retention_barrier_no_revision_interval_secs);
DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option);
DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal);

namespace yb {

Expand Down Expand Up @@ -495,6 +497,10 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {

Result<GetCDCDBStreamInfoResponsePB> GetDBStreamInfo(const xrepl::StreamId db_stream_id);

void VerifyTablesInStreamMetadata(
const xrepl::StreamId& stream_id, const std::unordered_set<std::string>& expected_table_ids,
const std::string& timeout_msg);

Status ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id);

Status CreateSnapshot(const NamespaceName& ns);
Expand Down Expand Up @@ -643,6 +649,22 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
CDCSDKCheckpointPB checkpoint, GetChangesResponsePB* change_resp);

void TestNonUserTableShouldNotGetAddedToCDCStream (bool create_consistent_snapshot_stream);

Status ExecuteYBAdminCommand(
const std::string& command_name, const std::vector<string>& command_args);

Status DisableDynamicTableAdditionOnCDCSDKStream(const xrepl::StreamId& stream_id);

void TestDisableOfDynamicTableAdditionOnCDCStream(bool use_consistent_snapshot_stream);

Status RemoveUserTableFromCDCSDKStream(const xrepl::StreamId& stream_id, const TableId& table_id);

void TestUserTableRemovalFromCDCStream(bool use_consistent_snapshot_stream);

Status ValidateAndSyncCDCStateEntriesForCDCSDKStream(const xrepl::StreamId& stream_id);

void TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval(
bool use_consistent_snapshot_stream);
};

} // namespace cdc
Expand Down
16 changes: 16 additions & 0 deletions src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ using std::string;
using strings::Substitute;

DECLARE_int32(tserver_unresponsive_timeout_ms);
DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option);

DEFINE_RUNTIME_AUTO_bool(
use_parent_table_id_field, kLocalPersisted, false, true,
Expand Down Expand Up @@ -1282,6 +1283,11 @@ const NamespaceId CDCStreamInfo::namespace_id() const {
return LockForRead()->pb.namespace_id();
}

bool CDCStreamInfo::IsCDCSDKStream() const {
auto l = LockForRead();
return l->pb.has_namespace_id() && !l->pb.namespace_id().empty();
}

const ReplicationSlotName CDCStreamInfo::GetCdcsdkYsqlReplicationSlotName() const {
auto l = LockForRead();
return ReplicationSlotName(l->pb.cdcsdk_ysql_replication_slot_name());
Expand All @@ -1293,6 +1299,16 @@ bool CDCStreamInfo::IsConsistentSnapshotStream() const {
l->pb.cdcsdk_stream_metadata().has_consistent_snapshot_option();
}

bool CDCStreamInfo::IsDynamicTableAdditionDisabled() const {
if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) {
return false;
}

auto l = LockForRead();
return l->pb.has_cdcsdk_disable_dynamic_table_addition() &&
l->pb.cdcsdk_disable_dynamic_table_addition();
}

std::string CDCStreamInfo::ToString() const {
auto l = LockForRead();
if (l->pb.has_namespace_id()) {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,10 @@ class CDCStreamInfo : public RefCountedThreadSafe<CDCStreamInfo>,

bool IsConsistentSnapshotStream() const;

bool IsCDCSDKStream() const;

bool IsDynamicTableAdditionDisabled() const;

std::string ToString() const override;

private:
Expand Down
5 changes: 5 additions & 0 deletions src/yb/master/catalog_entity_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,11 @@ message SysCDCStreamEntryPB {
optional string cdcsdk_ysql_replication_slot_name = 6;
optional CDCSDKStreamEntryPB cdcsdk_stream_metadata = 7;
optional uint64 stream_creation_time = 8;

// Dynamic tables are the tables which are created after the creation of the stream.
// This field controls if dynamic tables should automatically be added to the CDC stream or not.
// If set to true, dynamic table wont get added to the CDC stream.
optional bool cdcsdk_disable_dynamic_table_addition = 11;
}


Expand Down
23 changes: 22 additions & 1 deletion src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <boost/functional/hash.hpp>
#include <gtest/internal/gtest-internal.h>

#include "yb/cdc/cdc_state_table.h"
#include "yb/cdc/xcluster_types.h"
#include "yb/common/constants.h"
#include "yb/common/entity_ids.h"
Expand Down Expand Up @@ -1311,6 +1312,18 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
Status UpdateCDCStream(
const UpdateCDCStreamRequestPB* req, UpdateCDCStreamResponsePB* resp, rpc::RpcContext* rpc);

Status DisableDynamicTableAdditionOnCDCSDKStream(
const DisableDynamicTableAdditionOnCDCSDKStreamRequestPB* req,
DisableDynamicTableAdditionOnCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc);

Status RemoveUserTableFromCDCSDKStream(
const RemoveUserTableFromCDCSDKStreamRequestPB* req,
RemoveUserTableFromCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc);

Status ValidateAndSyncCDCStateEntriesForCDCSDKStream(
const ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB* req,
ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc);

// Query if Bootstrapping is required for a CDC stream (e.g. Are we missing logs).
Status IsBootstrapRequired(
const IsBootstrapRequiredRequestPB* req,
Expand Down Expand Up @@ -1439,7 +1452,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
// Find all CDCSDK streams which do not have metadata for the newly added tables.
Status FindCDCSDKStreamsForAddedTables(TableStreamIdsMap* table_to_unprocessed_streams_map);

bool CanTableBeAddedToCDCSDKStream(
bool IsTableEligibleForCDCSDKStream(
const TableInfoPtr& table_info, const Schema& schema) const REQUIRES_SHARED(mutex_);

// This method compares all tables in the namespace to all the tables added to a CDCSDK stream,
Expand Down Expand Up @@ -3103,6 +3116,14 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
void ValidateIndexTablesPostLoad(std::unordered_map<TableId, TableIdSet>&& indexes_map,
TableIdSet* tables_to_persist) EXCLUDES(mutex_);

Result<std::vector<cdc::CDCStateTableEntry>> UpdateCheckpointForTabletEntriesInCDCState(
const xrepl::StreamId& stream_id,
const std::unordered_set<TableId>& tables_in_stream_metadata,
const TableId& table_to_be_removed = "");

Status RemoveTableFromCDCStreamMetadataAndMaps(
const CDCStreamInfoPtr stream, const TableId table_id);

// Should be bumped up when tablet locations are changed.
std::atomic<uintptr_t> tablet_locations_version_{0};

Expand Down
42 changes: 42 additions & 0 deletions src/yb/master/master_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ message CDCStreamInfoPB {
optional uint64 cdcsdk_consistent_snapshot_time = 7;
optional CDCSDKSnapshotOption cdcsdk_consistent_snapshot_option = 8;
optional uint64 stream_creation_time = 9;

// Dynamic tables are the tables which are created after the creation of the stream.
// This field controls if dynamic tables should automatically be added to the CDC stream or not.
// If set to true, dynamic table wont get added to the CDC stream.
optional bool cdcsdk_disable_dynamic_table_addition = 12;
}

message ValidateReplicationInfoRequestPB {
Expand Down Expand Up @@ -498,6 +503,32 @@ message XClusterReportNewAutoFlagConfigVersionResponsePB {
optional MasterErrorPB error = 1;
}

message DisableDynamicTableAdditionOnCDCSDKStreamRequestPB {
optional string stream_id = 1;
}

message DisableDynamicTableAdditionOnCDCSDKStreamResponsePB {
optional MasterErrorPB error = 1;
}

message RemoveUserTableFromCDCSDKStreamRequestPB {
optional string stream_id = 1;
optional string table_id = 2;
}

message RemoveUserTableFromCDCSDKStreamResponsePB {
optional MasterErrorPB error = 1;
}

message ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB {
optional string stream_id = 1;
}

message ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB {
optional MasterErrorPB error = 1;
repeated string updated_tablet_entries = 2;
}

service MasterReplication {
option (yb.rpc.custom_service_name) = "yb.master.MasterService";

Expand Down Expand Up @@ -563,4 +594,15 @@ service MasterReplication {
rpc XClusterReportNewAutoFlagConfigVersion(
XClusterReportNewAutoFlagConfigVersionRequestPB)
returns (XClusterReportNewAutoFlagConfigVersionResponsePB);

// Introduced for bug (#22876, #22773)
rpc DisableDynamicTableAdditionOnCDCSDKStream (DisableDynamicTableAdditionOnCDCSDKStreamRequestPB)
returns (DisableDynamicTableAdditionOnCDCSDKStreamResponsePB);
// Introduced for bug (#22876, #22773)
rpc RemoveUserTableFromCDCSDKStream (RemoveUserTableFromCDCSDKStreamRequestPB)
returns (RemoveUserTableFromCDCSDKStreamResponsePB);
// Introduced for bug (#22876, #22773)
rpc ValidateAndSyncCDCStateEntriesForCDCSDKStream(
ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB)
returns (ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB);
}
3 changes: 3 additions & 0 deletions src/yb/master/master_replication_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class MasterReplicationServiceImpl : public MasterServiceBase, public MasterRepl
(GetTableSchemaFromSysCatalog)
(ChangeXClusterRole)
(BootstrapProducer)
(DisableDynamicTableAdditionOnCDCSDKStream)
(RemoveUserTableFromCDCSDKStream)
(ValidateAndSyncCDCStateEntriesForCDCSDKStream)
)

MASTER_SERVICE_IMPL_ON_LEADER_WITH_LOCK(
Expand Down
Loading

0 comments on commit f23350f

Please sign in to comment.