Skip to content

Commit

Permalink
[#20726][#21203] YSQL: Retrieve and use confirm_flush, restart_lsn an…
Browse files Browse the repository at this point in the history
…d xmin for a replication slot from the CDC state table

Summary:
The values of confirm_flush, restart_lsn and xmin for a replication slot were hardcoded to dummy values in earlier revisions. For reference, see https://phorge.dev.yugabyte.com/D31997

This revision updates the logic to fetch them from the CDC state table. CDC state table stores these entries for every replication slot. This support was added
recently as part of https://phorge.dev.yugabyte.com/D32643.

Note that the values are still going to 0 for all of them till we support consumption of changes via the Replication Slot. As a result, the output of the view
`pg_replication_slots` is still always 0 for these fields. These values are incremented during the consumption of changes via the replication slot which is a WIP.

Also, as part of this revision, I've deprecated the `GetReplicationSlotStatus` RPC of pg_client_service. This is because this RPC is a strict subset of the `GetReplicationSlot` RPC. So this additional RPC doesn't add much value. As part of the deprecation, I've replaced the usage of `GetReplicationSlotStatus` with `GetReplicationSlot`.

**Upgrade/Rollback safety:**
The proto changes are just reordering of code.

The complete feature is disabled using the test flag: `ysql_TEST_enable_replication_slot_consumption`
Jira: DB-9729, DB-10133

Test Plan:
Existing tests are enough to ensure no behavior change.

Tests ensuring these values are correctly incremented will be added once the support for consumption of changes is complete. The main requirement is the logic to
call CDC service to persist these values.

Reviewers: asrinivasan, aagrawal, skumar

Reviewed By: asrinivasan

Subscribers: ybase, ycdcxcluster, yql, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D32729
  • Loading branch information
dr0pdb committed Mar 6, 2024
1 parent 19c131f commit e5dbd2e
Show file tree
Hide file tree
Showing 24 changed files with 178 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ public int getTestMethodTimeoutSec() {
@Override
protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_replication_commands");
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands,yb_enable_cdc_consistent_snapshot_streams");
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
return flagMap;
}

@Override
protected Map<String, String> getMasterFlags() {
Map<String, String> flagMap = super.getMasterFlags();
flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_replication_commands");
flagMap.put("allowed_preview_flags_csv",
"ysql_yb_enable_replication_commands,yb_enable_cdc_consistent_snapshot_streams");
flagMap.put("ysql_yb_enable_replication_commands", "true");
flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true");
return flagMap;
}

Expand Down
12 changes: 0 additions & 12 deletions src/postgres/src/backend/commands/ybccmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -1898,18 +1898,6 @@ YBCGetReplicationSlot(const char *slot_name,
YBCPgGetReplicationSlot(slot_name, replication_slot), error_message);
}

void
YBCGetReplicationSlotStatus(const char *slot_name,
bool *active)
{
char error_message[NAMEDATALEN + 64] = "";
snprintf(error_message, sizeof(error_message),
"replication slot \"%s\" does not exist", slot_name);

HandleYBStatusWithCustomErrorForNotFound(
YBCPgGetReplicationSlotStatus(slot_name, active), error_message);
}

void
YBCDropReplicationSlot(const char *slot_name)
{
Expand Down
18 changes: 9 additions & 9 deletions src/postgres/src/backend/replication/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,14 @@ ReplicationSlotAcquire(const char *name, bool nowait)
LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
ConditionVariableInit(&slot->active_cv);

slot->data.confirmed_flush = yb_replication_slot->confirmed_flush;
slot->data.xmin = yb_replication_slot->xmin;
/*
* Dummy values to always stream from the start.
* TODO(#20726): This has to be updated to support restarts.
* Set catalog_xmin as xmin to make the PG Debezium connector work.
* It is not used in our implementation.
*/
slot->data.catalog_xmin = 0;
slot->data.confirmed_flush = 0;
slot->data.xmin = 0;
slot->data.restart_lsn = 0;
slot->data.catalog_xmin = yb_replication_slot->xmin;
slot->data.restart_lsn = yb_replication_slot->restart_lsn;

MyReplicationSlot = slot;
return;
Expand Down Expand Up @@ -590,10 +590,10 @@ ReplicationSlotDrop(const char *name, bool nowait)
*/
if (IsYugaByteEnabled())
{
bool stream_active;
YBCReplicationSlotDescriptor *yb_replication_slot;
YBCGetReplicationSlot(name, &yb_replication_slot);

YBCGetReplicationSlotStatus(name, &stream_active);
if (stream_active)
if (yb_replication_slot->active)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active", name)));
Expand Down
29 changes: 22 additions & 7 deletions src/postgres/src/backend/replication/slotfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,18 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
if (IsYugaByteEnabled())
{
values[0] = CStringGetTextDatum(name->data);
/* Send lsn as NULL */
nulls[1] = true;

/*
* Send "0/2" as the consistent_point. The LSN "0/1" is reserved
* for the records to be streamed as part of the snapshot consumption.
* The first change record is always streamed with LSN "0/2".
*
* This value should be kept in sync with the confirmed_flush_lsn value
* being set during the creation of the CDC stream in the
* PopulateCDCStateTable function of xrepl_catalog_manager.cc.
*/
XLogRecPtr consistent_point = 2;
values[1] = LSNGetDatum(consistent_point);
}
else
{
Expand Down Expand Up @@ -354,11 +364,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
yb_stream_id = slot->stream_id;
yb_stream_active = slot->active;

/* Fill in the dummy values. */
xmin = InvalidXLogRecPtr;
catalog_xmin = InvalidXLogRecPtr;
restart_lsn = InvalidXLogRecPtr;
confirmed_flush_lsn = InvalidXLogRecPtr;
restart_lsn = slot->restart_lsn;
confirmed_flush_lsn = slot->confirmed_flush;
xmin = slot->xmin;
/*
* Set catalog_xmin as xmin to make the PG Debezium connector work.
* It is not used in our implementation.
*/
catalog_xmin = slot->xmin;

/* Fill in the dummy/constant values. */
active_pid = 0;
persistency = RS_PERSISTENT;
}
Expand Down
14 changes: 9 additions & 5 deletions src/postgres/src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -1072,13 +1072,17 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ReplicationSlotSave();
}

/*
* Send "0/0" as the consistent wal location instead of a NULL value. This
* is so that the drivers which have a NULL check on the value continue to
* work.
/*
* Send "0/2" as the consistent wal location. The LSN "0/1" is reserved for
* the records to be streamed as part of the snapshot consumption. The first
* change record is always streamed with LSN "0/2".
*
* This value should be kept in sync with the confirmed_flush_lsn value
* being set during the creation of the CDC stream in the
* PopulateCDCStateTable function of xrepl_catalog_manager.cc.
*/
if (IsYugaByteEnabled())
snprintf(xloc, sizeof(xloc), "%X/%X", 0, 0);
snprintf(xloc, sizeof(xloc), "%X/%X", 0, 2);
else
snprintf(xloc, sizeof(xloc), "%X/%X",
(uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
Expand Down
4 changes: 0 additions & 4 deletions src/postgres/src/include/commands/ybccmds.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ extern void
YBCGetReplicationSlot(const char *slot_name,
YBCReplicationSlotDescriptor **replication_slot);

extern void
YBCGetReplicationSlotStatus(const char *slot_name,
bool *active);

extern void YBCDropReplicationSlot(const char *slot_name);

extern void YBCInitVirtualWalForCDC(const char *stream_id,
Expand Down
12 changes: 6 additions & 6 deletions src/postgres/src/test/regress/expected/yb_replication_slot.out
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ SET SESSION AUTHORIZATION 'regress_replicationslot_user';
SELECT * FROM pg_create_logical_replication_slot('testslot1', 'pgoutput', false);
slot_name | lsn
-----------+-----
testslot1 |
testslot1 | 0/2
(1 row)

SELECT * FROM pg_create_logical_replication_slot('testslot2', 'pgoutput', false);
slot_name | lsn
-----------+-----
testslot2 |
testslot2 | 0/2
(1 row)

-- Cannot do SELECT * since yb_stream_id changes across runs.
Expand All @@ -23,8 +23,8 @@ SELECT slot_name, plugin, slot_type, database, temporary, active,
FROM pg_replication_slots;
slot_name | plugin | slot_type | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+----------+-----------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
testslot2 | pgoutput | logical | yugabyte | f | f | | | | |
testslot1 | pgoutput | logical | yugabyte | f | f | | | | |
testslot2 | pgoutput | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2
testslot1 | pgoutput | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2
(2 rows)

-- drop the replication slot and create with same name again.
Expand All @@ -38,7 +38,7 @@ SELECT * FROM pg_drop_replication_slot('testslot1');
SELECT * FROM pg_create_logical_replication_slot('testslot1', 'pgoutput', false);
slot_name | lsn
-----------+-----
testslot1 |
testslot1 | 0/2
(1 row)

-- unsupported cases
Expand All @@ -58,7 +58,7 @@ SET ROLE regress_replicationslot_replication_user;
SELECT * FROM pg_create_logical_replication_slot('testslot3', 'pgoutput', false);
slot_name | lsn
-----------+-----
testslot3 |
testslot3 | 0/2
(1 row)

RESET ROLE;
Expand Down
1 change: 0 additions & 1 deletion src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ static const char* const kCheckpointType = "checkpoint_type";
static const char* const kStreamState = "state";
static const char* const kNamespaceId = "NAMESPACEID";
static const char* const kCDCSDKSnapshotDoneKey = "snapshot_done_key";
static const char* const kCDCSDKSlotEntryTabletId = "dummy_id_for_replication_slot";

struct TabletCheckpoint {
OpId op_id;
Expand Down
2 changes: 2 additions & 0 deletions src/yb/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ static const char* const kObsoleteShortPrimaryTableId = "sys.catalog.uuid";

constexpr auto kPitrFeatureName = "PITR";

constexpr auto kCDCSDKSlotEntryTabletId = "dummy_id_for_replication_slot";

} // namespace yb
9 changes: 8 additions & 1 deletion src/yb/integration-tests/cdcsdk_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ Result<xrepl::StreamId> CDCSDKTestBase::CreateDBStreamWithReplicationSlot(
}

Result<xrepl::StreamId> CDCSDKTestBase::CreateConsistentSnapshotStreamWithReplicationSlot(
const std::string& slot_name,
CDCSDKSnapshotOption snapshot_option, bool verify_snapshot_name) {
auto repl_conn = VERIFY_RESULT(test_cluster_.ConnectToDBWithReplication(kNamespaceName));

Expand All @@ -430,7 +431,6 @@ Result<xrepl::StreamId> CDCSDKTestBase::CreateConsistentSnapshotStreamWithReplic
break;
}

auto slot_name = GenerateRandomReplicationSlotName();
auto result = VERIFY_RESULT(repl_conn.FetchFormat(
"CREATE_REPLICATION_SLOT $0 LOGICAL pgoutput $1", slot_name, snapshot_action));
auto snapshot_name =
Expand Down Expand Up @@ -460,6 +460,13 @@ Result<xrepl::StreamId> CDCSDKTestBase::CreateConsistentSnapshotStreamWithReplic
return xrepl_stream_id;
}

Result<xrepl::StreamId> CDCSDKTestBase::CreateConsistentSnapshotStreamWithReplicationSlot(
CDCSDKSnapshotOption snapshot_option, bool verify_snapshot_name) {
auto slot_name = GenerateRandomReplicationSlotName();
return CreateConsistentSnapshotStreamWithReplicationSlot(
slot_name, snapshot_option, verify_snapshot_name);
}

// This creates a Consistent Snapshot stream on the database kNamespaceName by default.
Result<xrepl::StreamId> CDCSDKTestBase::CreateConsistentSnapshotStream(
CDCSDKSnapshotOption snapshot_option,
Expand Down
4 changes: 4 additions & 0 deletions src/yb/integration-tests/cdcsdk_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ class CDCSDKTestBase : public YBTest {
Result<xrepl::StreamId> CreateDBStreamWithReplicationSlot(
const std::string& replication_slot_name, CDCRecordType record_type = CDCRecordType::CHANGE);

Result<xrepl::StreamId> CreateConsistentSnapshotStreamWithReplicationSlot(
const std::string& replication_slot_name,
CDCSDKSnapshotOption snapshot_option = CDCSDKSnapshotOption::USE_SNAPSHOT,
bool verify_snapshot_name = false);
Result<xrepl::StreamId> CreateConsistentSnapshotStreamWithReplicationSlot(
CDCSDKSnapshotOption snapshot_option = CDCSDKSnapshotOption::USE_SNAPSHOT,
bool verify_snapshot_name = false);
Expand Down
13 changes: 3 additions & 10 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7665,6 +7665,7 @@ TEST_F(CDCSDKYsqlTest, TestReplicationSlotDropWithActiveInvalid) {
// Set the active window to a smaller value for faster test execution.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_cdc_active_replication_slot_window_ms) =
10000 * yb::kTimeMultiplier;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true;

ASSERT_OK(SetUpWithParams(3, 1, false));
auto table =
Expand All @@ -7673,16 +7674,8 @@ TEST_F(CDCSDKYsqlTest, TestReplicationSlotDropWithActiveInvalid) {
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 3);
xrepl::StreamId stream_id =
ASSERT_RESULT(CreateDBStreamWithReplicationSlot("repl_slot_drop_with_active_invalid"));

// set checkpoint for each tablet.
for (int tablet_idx = 0; tablet_idx < tablets.size(); tablet_idx++) {
auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(
stream_id, tablets, OpId::Min(), /*cdc_sdk_safe_time=*/0, /*initial_checkpoint=*/true,
tablet_idx));
ASSERT_FALSE(set_resp.has_error());
}
xrepl::StreamId stream_id = ASSERT_RESULT(
CreateConsistentSnapshotStreamWithReplicationSlot("repl_slot_drop_with_active_invalid"));

auto repl_conn = ASSERT_RESULT(test_cluster_.ConnectToDBWithReplication(kNamespaceName));

Expand Down
7 changes: 5 additions & 2 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1211,16 +1211,19 @@ Status CatalogManager::PopulateCDCStateTable(const xrepl::StreamId& stream_id,
// 1. Represent the slot's consistent point i.e. first record sent in the streaming phase will
// have LSN & txnID set to 2.
// 2. Initialize components (LSN & txnID generators) of the CDCSDK Virtual WAL on restarts.
//
// If these values are changed here, also update the consistent point sent as part of the
// creation of logical replication slot in walsender.c and slotfuncs.c.
if (FLAGS_ysql_yb_enable_replication_commands && has_consistent_snapshot_option) {
cdc::CDCStateTableEntry entry(cdc::kCDCSDKSlotEntryTabletId, stream_id);
cdc::CDCStateTableEntry entry(kCDCSDKSlotEntryTabletId, stream_id);
entry.confirmed_flush_lsn = 2;
entry.restart_lsn = 1;
entry.xmin = 1;
entry.record_id_commit_time = consistent_snapshot_time;
entry.cdc_sdk_safe_time = consistent_snapshot_time;
entries.push_back(entry);
VLOG(1) << "Added entry in cdc_state for the replication slot with tablet_id: "
<< cdc::kCDCSDKSlotEntryTabletId << " stream_id: " << stream_id;
<< kCDCSDKSlotEntryTabletId << " stream_id: " << stream_id;
}

return cdc_state_table_->UpsertEntries(entries);
Expand Down
17 changes: 11 additions & 6 deletions src/yb/tserver/pg_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ service PgClientService {
rpc GetDatabaseInfo(PgGetDatabaseInfoRequestPB) returns (PgGetDatabaseInfoResponsePB);
rpc GetLockStatus(PgGetLockStatusRequestPB) returns (PgGetLockStatusResponsePB);
rpc GetReplicationSlot(PgGetReplicationSlotRequestPB) returns (PgGetReplicationSlotResponsePB);
rpc GetReplicationSlotStatus(PgGetReplicationSlotStatusRequestPB)
returns (PgGetReplicationSlotStatusResponsePB);
rpc IsInitDbDone(PgIsInitDbDoneRequestPB) returns (PgIsInitDbDoneResponsePB);
rpc ListLiveTabletServers(PgListLiveTabletServersRequestPB)
returns (PgListLiveTabletServersResponsePB);
Expand Down Expand Up @@ -99,6 +97,10 @@ service PgClientService {
rpc GetNewObjectId(PgGetNewObjectIdRequestPB)
returns (PgGetNewObjectIdResponsePB);

// DEPRECATED: GetReplicationSlot RPC is a superset of this GetReplicationSlotStatus.
// So GetReplicationSlot should be used everywhere.
rpc GetReplicationSlotStatus(PgGetReplicationSlotStatusRequestPB)
returns (PgGetReplicationSlotStatusResponsePB);
// DEPRECATED
rpc SetActiveSubTransaction(PgSetActiveSubTransactionRequestPB)
returns (PgSetActiveSubTransactionResponsePB);
Expand Down Expand Up @@ -416,10 +418,6 @@ message PgGetLockStatusResponsePB {
map<string, TransactionList> transactions_by_node = 3;
}

message PgGetReplicationSlotStatusRequestPB {
string replication_slot_name = 1;
}

message PgGetReplicationSlotRequestPB {
string replication_slot_name = 1;

Expand All @@ -436,6 +434,10 @@ enum ReplicationSlotStatus {
INACTIVE = 1;
}

message PgGetReplicationSlotStatusRequestPB {
string replication_slot_name = 1;
}

message PgGetReplicationSlotStatusResponsePB {
AppStatusPB status = 1;

Expand Down Expand Up @@ -481,6 +483,9 @@ message PgReplicationSlotInfoPB {
bytes stream_id = 2;
uint32 database_oid = 3;
ReplicationSlotStatus replication_slot_status = 4;
uint64 confirmed_flush_lsn = 5;
uint64 restart_lsn = 6;
uint32 xmin = 7;
}

message PgListReplicationSlotsResponsePB {
Expand Down
Loading

0 comments on commit e5dbd2e

Please sign in to comment.