Skip to content

Commit

Permalink
[#22984] CDCSDK: Promote yb_enable_cdc_consistent_snapshot_streams to…
Browse files Browse the repository at this point in the history
… auto flag

Summary:
This diff promotes the flag `yb_enable_cdc_consistent_snapshot_streams` from a preview flag to a default true auto flag.
Jira: DB-11902

Test Plan: Jenkins: urgent

Reviewers: asrinivasan, skumar, stiwary

Reviewed By: asrinivasan

Subscribers: ybase, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36084
  • Loading branch information
Sumukh-Phalgaonkar committed Jun 24, 2024
1 parent 6dccdac commit 9656a89
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,15 @@ protected Map<String, String> getTServerFlags() {
if (isTestRunningWithConnectionManager()) {
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replica_identity," +
"enable_ysql_conn_mgr");
flagMap.put("enable_ysql_conn_mgr", "true");
} else {
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replica_identity");
}
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
flagMap.put("ysql_TEST_enable_replication_slot_consumption", "true");
flagMap.put("ysql_yb_enable_replica_identity", "true");
flagMap.put(
Expand All @@ -59,10 +56,8 @@ protected Map<String, String> getMasterFlags() {
Map<String, String> flagMap = super.getMasterFlags();
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replica_identity");
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
flagMap.put("ysql_TEST_enable_replication_slot_consumption", "true");
flagMap.put("ysql_yb_enable_replica_identity", "true");
return flagMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,18 @@ protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
if (isTestRunningWithConnectionManager()) {
String preview_flags = "ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams,enable_ysql_conn_mgr," +
"enable_ysql_conn_mgr," +
"ysql_yb_enable_replica_identity,cdcsdk_enable_dynamic_table_support";
flagMap.put("allowed_preview_flags_csv",preview_flags);
flagMap.put("ysql_conn_mgr_stats_interval", "1");
} else {
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands," +
"yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replica_identity," +
"cdcsdk_enable_dynamic_table_support");
}
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("ysql_TEST_enable_replication_slot_consumption", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
flagMap.put("ysql_yb_enable_replica_identity", "true");
flagMap.put(
"vmodule", "cdc_service=4,cdcsdk_producer=4,ybc_pggate=4,cdcsdk_virtual_wal=4,client=4");
Expand All @@ -98,11 +96,10 @@ protected Map<String, String> getTServerFlags() {
protected Map<String, String> getMasterFlags() {
Map<String, String> flagMap = super.getMasterFlags();
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands,yb_enable_cdc_consistent_snapshot_streams," +
"ysql_yb_enable_replication_commands," +
"ysql_yb_enable_replica_identity");
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("ysql_TEST_enable_replication_slot_consumption", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
flagMap.put("ysql_yb_enable_replica_identity", "true");
return flagMap;
}
Expand Down
5 changes: 3 additions & 2 deletions src/yb/common/common_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ DEFINE_RUNTIME_int32(
"tserver");
TAG_FLAG(ysql_clone_pg_schema_rpc_timeout_ms, advanced);

DEFINE_RUNTIME_PREVIEW_bool(yb_enable_cdc_consistent_snapshot_streams, false,
"Enable support for CDC Consistent Snapshot Streams");
DEFINE_RUNTIME_AUTO_bool(
yb_enable_cdc_consistent_snapshot_streams, kLocalPersisted, false, true,
"Enable support for CDC Consistent Snapshot Streams");

DEFINE_RUNTIME_PG_FLAG(bool, TEST_enable_replication_slot_consumption, false,
"Enable consumption of changes via replication slots."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class CDCSDKConsistentSnapshotTest : public CDCSDKYsqlTest {
public:
void SetUp() override {
CDCSDKYsqlTest::SetUp();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_cdcsdk_streamed_tables) = true;
}

Expand Down
12 changes: 4 additions & 8 deletions src/yb/integration-tests/cdcsdk_replica_identity-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,20 @@ TEST_F(CDCSDKReplicaIdentityTest, YB_DISABLE_TEST_IN_TSAN(TestReplicaIdentityWit
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT"));
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY FULL"));

xrepl::StreamId stream_id_1 = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id_1, tablets));
ASSERT_FALSE(set_resp.has_error());
xrepl::StreamId stream_id_1 = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

// These alter replica identity will have no effect on records streamed for stream_id_1
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT"));
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY CHANGE"));

// stream_id_2 will have replica identity CHANGE for test_table
xrepl::StreamId stream_id_2 = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id_2, tablets));
ASSERT_FALSE(set_resp.has_error());
xrepl::StreamId stream_id_2 = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

WriteUpdateDelete(kTableName);

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {10, 2, 2, 2, 0, 0};
const uint32_t expected_count_with_packed_row[] = {10, 4, 0, 2, 0, 0};
const uint32_t expected_count[] = {3, 2, 2, 2, 0, 0};
const uint32_t expected_count_with_packed_row[] = {3, 4, 0, 2, 0, 0};
uint32_t count[] = {0, 0, 0, 0, 0, 0};

ExpectedRecord expected_records_1[] = {{1, 2}, {1, 3}, {1, 3}};
Expand Down
4 changes: 2 additions & 2 deletions src/yb/integration-tests/cdcsdk_tablet_split-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCBootstrapWithTabletSpli
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamBasedOnCheckpointType(checkpoint_type));
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(checkpoint_type));
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));

ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true));
Expand Down Expand Up @@ -1173,7 +1173,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCBootstrapWithTwoTabletS
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamBasedOnCheckpointType(checkpoint_type));
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(checkpoint_type));
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));

ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true));
Expand Down
27 changes: 7 additions & 20 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,31 +226,18 @@ void CDCSDKYsqlTest::TestCDCLagMetric(CDCCheckpointType checkpoint_type) {
CDCSDK_TESTS_FOR_ALL_CHECKPOINT_OPTIONS(CDCSDKYsqlTest, TestCDCLagMetric);

// Begin transaction, perform some operations and abort transaction.
// Expected records: 1 (DDL).
// Expected records: 0. In case of non consistent snapshot streams we receive 1 DDL.
TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(AbortAllWriteOperations)) {
auto tablets = ASSERT_RESULT(SetUpCluster());
ASSERT_EQ(tablets.size(), 1);
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets));
ASSERT_FALSE(set_resp.has_error());
ASSERT_OK(WriteRowsHelper(1 /* start */, 4 /* end */, &test_cluster_, false));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {1, 0, 0, 0, 0, 0};
uint32_t count[] = {0, 0, 0, 0, 0, 0};
xrepl::StreamId stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

ExpectedRecord expected_records[] = {{0, 0}};
ASSERT_OK(WriteRowsHelper(1 /* start */, 4 /* end */, &test_cluster_, false));

GetChangesResponsePB change_resp;
ASSERT_OK(WaitForGetChangesToFetchRecords(&change_resp, stream_id, tablets, 0));

uint32_t record_size = change_resp.cdc_sdk_proto_records_size();
for (uint32_t i = 0; i < record_size; ++i) {
const CDCSDKProtoRecordPB record = change_resp.cdc_sdk_proto_records(i);
CheckRecord(record, expected_records[i], count);
}
LOG(INFO) << "Got " << count[1] << " insert record and " << count[0] << " ddl record";
CheckCount(expected_count, count);
ASSERT_EQ(change_resp.cdc_sdk_proto_records_size(), 0);
}

// Insert one row, update the inserted row.
Expand Down Expand Up @@ -7337,8 +7324,8 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAtomicDDLRollback)) {
ASSERT_FALSE(set_resp.has_error());

// 0=DDL, 1=INSERT, 2=UPDATE, 3=DELETE, 4=READ, 5=TRUNCATE, 6=BEGIN, 7=COMMIT
const int expected_count[] = {3, 6, 1, 1, 0, 0, 1, 1};
const int expected_count_for_packed_row[] = {3, 7, 0, 1, 0, 0, 1, 1};
const int expected_count[] = {2, 6, 1, 1, 0, 0, 1, 1};
const int expected_count_for_packed_row[] = {2, 7, 0, 1, 0, 0, 1, 1};
int count[] = {0, 0, 0, 0, 0, 0, 0, 0};

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand Down Expand Up @@ -7526,7 +7513,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAtomicDDLDropColumn)) {
ASSERT_FALSE(set_resp.has_error());

// 0=DDL, 1=INSERT, 2=UPDATE, 3=DELETE, 4=READ, 5=TRUNCATE, 6=BEGIN, 7=COMMIT
const int expected_count[] = {3, 10, 0, 0, 0, 0, 4, 4};
const int expected_count[] = {2, 10, 0, 0, 0, 0, 4, 4};
int count[] = {0, 0, 0, 0, 0, 0, 0, 0};

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand Down

0 comments on commit 9656a89

Please sign in to comment.