Skip to content

Commit

Permalink
[BACKPORT 2024.1][#22984] CDCSDK: Promote yb_enable_cdc_consistent_sn…
Browse files Browse the repository at this point in the history
…apshot_streams to auto flag Needs ReviewPublic

Summary:
Original commit: 9656a89 / D36084

This diff promotes the flag yb_enable_cdc_consistent_snapshot_streams from a preview flag to a default true auto flag.

######Backport Description
Minor merge conflicts were encountered in  java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgReplicationSlot.java because of connection manager related changes not being backported.
Jira: DB-11902

Test Plan: Jenkins: rebase: 2024.1

Reviewers: asrinivasan, skumar, stiwary

Reviewed By: asrinivasan

Subscribers: ycdcxcluster, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36090
  • Loading branch information
Sumukh-Phalgaonkar committed Jun 25, 2024
1 parent 8ee6a69 commit 255b462
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replica_identity");
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
flagMap.put("ysql_TEST_enable_replication_slot_consumption", "true");
flagMap.put("ysql_yb_enable_replica_identity", "true");
flagMap.put(
Expand All @@ -49,10 +47,8 @@ protected Map<String, String> getMasterFlags() {
Map<String, String> flagMap = super.getMasterFlags();
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replica_identity");
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
flagMap.put("ysql_TEST_enable_replication_slot_consumption", "true");
flagMap.put("ysql_yb_enable_replica_identity", "true");
return flagMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replica_identity," +
"cdcsdk_enable_dynamic_table_support");
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("ysql_TEST_enable_replication_slot_consumption", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
flagMap.put("ysql_yb_enable_replica_identity", "true");
flagMap.put(
"vmodule", "cdc_service=4,cdcsdk_producer=4,ybc_pggate=4,cdcsdk_virtual_wal=4,client=4");
Expand All @@ -91,11 +89,9 @@ protected Map<String, String> getMasterFlags() {
Map<String, String> flagMap = super.getMasterFlags();
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replica_identity");
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("ysql_TEST_enable_replication_slot_consumption", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
flagMap.put("ysql_yb_enable_replica_identity", "true");
return flagMap;
}
Expand Down
6 changes: 4 additions & 2 deletions src/yb/common/common_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ DEFINE_NON_RUNTIME_bool(ysql_enable_pg_per_database_oid_allocator, true,
TAG_FLAG(ysql_enable_pg_per_database_oid_allocator, advanced);
TAG_FLAG(ysql_enable_pg_per_database_oid_allocator, hidden);

DEFINE_RUNTIME_PREVIEW_bool(yb_enable_cdc_consistent_snapshot_streams, false,
"Enable support for CDC Consistent Snapshot Streams");

DEFINE_RUNTIME_AUTO_bool(
yb_enable_cdc_consistent_snapshot_streams, kLocalPersisted, false, true,
"Enable support for CDC Consistent Snapshot Streams");

DEFINE_RUNTIME_PG_FLAG(bool, TEST_enable_replication_slot_consumption, false,
"Enable consumption of changes via replication slots."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class CDCSDKConsistentSnapshotTest : public CDCSDKYsqlTest {
public:
void SetUp() override {
CDCSDKYsqlTest::SetUp();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_cdcsdk_streamed_tables) = true;
}

Expand Down
12 changes: 4 additions & 8 deletions src/yb/integration-tests/cdcsdk_replica_identity-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,20 @@ TEST_F(CDCSDKReplicaIdentityTest, YB_DISABLE_TEST_IN_TSAN(TestReplicaIdentityWit
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT"));
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY FULL"));

xrepl::StreamId stream_id_1 = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id_1, tablets));
ASSERT_FALSE(set_resp.has_error());
xrepl::StreamId stream_id_1 = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

// These alter replica identity will have no effect on records streamed for stream_id_1
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT"));
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY CHANGE"));

// stream_id_2 will have replica identity CHANGE for test_table
xrepl::StreamId stream_id_2 = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id_2, tablets));
ASSERT_FALSE(set_resp.has_error());
xrepl::StreamId stream_id_2 = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

WriteUpdateDelete(kTableName);

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {10, 2, 2, 2, 0, 0};
const uint32_t expected_count_with_packed_row[] = {10, 4, 0, 2, 0, 0};
const uint32_t expected_count[] = {3, 2, 2, 2, 0, 0};
const uint32_t expected_count_with_packed_row[] = {3, 4, 0, 2, 0, 0};
uint32_t count[] = {0, 0, 0, 0, 0, 0};

ExpectedRecord expected_records_1[] = {{1, 2}, {1, 3}, {1, 3}};
Expand Down
4 changes: 2 additions & 2 deletions src/yb/integration-tests/cdcsdk_tablet_split-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCBootstrapWithTabletSpli
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamBasedOnCheckpointType(checkpoint_type));
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(checkpoint_type));
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));

ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true));
Expand Down Expand Up @@ -1173,7 +1173,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCBootstrapWithTwoTabletS
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamBasedOnCheckpointType(checkpoint_type));
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(checkpoint_type));
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));

ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true));
Expand Down
27 changes: 7 additions & 20 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,31 +226,18 @@ void CDCSDKYsqlTest::TestCDCLagMetric(CDCCheckpointType checkpoint_type) {
CDCSDK_TESTS_FOR_ALL_CHECKPOINT_OPTIONS(CDCSDKYsqlTest, TestCDCLagMetric);

// Begin transaction, perform some operations and abort transaction.
// Expected records: 1 (DDL).
// Expected records: 0. In case of non consistent snapshot streams we receive 1 DDL.
TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(AbortAllWriteOperations)) {
auto tablets = ASSERT_RESULT(SetUpCluster());
ASSERT_EQ(tablets.size(), 1);
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets));
ASSERT_FALSE(set_resp.has_error());
ASSERT_OK(WriteRowsHelper(1 /* start */, 4 /* end */, &test_cluster_, false));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {1, 0, 0, 0, 0, 0};
uint32_t count[] = {0, 0, 0, 0, 0, 0};
xrepl::StreamId stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

ExpectedRecord expected_records[] = {{0, 0}};
ASSERT_OK(WriteRowsHelper(1 /* start */, 4 /* end */, &test_cluster_, false));

GetChangesResponsePB change_resp;
ASSERT_OK(WaitForGetChangesToFetchRecords(&change_resp, stream_id, tablets, 0));

uint32_t record_size = change_resp.cdc_sdk_proto_records_size();
for (uint32_t i = 0; i < record_size; ++i) {
const CDCSDKProtoRecordPB record = change_resp.cdc_sdk_proto_records(i);
CheckRecord(record, expected_records[i], count);
}
LOG(INFO) << "Got " << count[1] << " insert record and " << count[0] << " ddl record";
CheckCount(expected_count, count);
ASSERT_EQ(change_resp.cdc_sdk_proto_records_size(), 0);
}

// Insert one row, update the inserted row.
Expand Down Expand Up @@ -7348,8 +7335,8 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAtomicDDLRollback)) {
ASSERT_FALSE(set_resp.has_error());

// 0=DDL, 1=INSERT, 2=UPDATE, 3=DELETE, 4=READ, 5=TRUNCATE, 6=BEGIN, 7=COMMIT
const int expected_count[] = {3, 6, 1, 1, 0, 0, 1, 1};
const int expected_count_for_packed_row[] = {3, 7, 0, 1, 0, 0, 1, 1};
const int expected_count[] = {2, 6, 1, 1, 0, 0, 1, 1};
const int expected_count_for_packed_row[] = {2, 7, 0, 1, 0, 0, 1, 1};
int count[] = {0, 0, 0, 0, 0, 0, 0, 0};

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand Down Expand Up @@ -7537,7 +7524,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAtomicDDLDropColumn)) {
ASSERT_FALSE(set_resp.has_error());

// 0=DDL, 1=INSERT, 2=UPDATE, 3=DELETE, 4=READ, 5=TRUNCATE, 6=BEGIN, 7=COMMIT
const int expected_count[] = {3, 10, 0, 0, 0, 0, 4, 4};
const int expected_count[] = {2, 10, 0, 0, 0, 0, 4, 4};
int count[] = {0, 0, 0, 0, 0, 0, 0, 0};

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand Down

0 comments on commit 255b462

Please sign in to comment.