diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressReplicationSlot.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressReplicationSlot.java index c1fe2a507131..f530b46dbc50 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressReplicationSlot.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressReplicationSlot.java @@ -31,16 +31,20 @@ public int getTestMethodTimeoutSec() { @Override protected Map getTServerFlags() { Map flagMap = super.getTServerFlags(); - flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_replication_commands"); + flagMap.put("allowed_preview_flags_csv", + "ysql_yb_enable_replication_commands,yb_enable_cdc_consistent_snapshot_streams"); flagMap.put("ysql_yb_enable_replication_commands", "true"); + flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true"); return flagMap; } @Override protected Map getMasterFlags() { Map flagMap = super.getMasterFlags(); - flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_replication_commands"); + flagMap.put("allowed_preview_flags_csv", + "ysql_yb_enable_replication_commands,yb_enable_cdc_consistent_snapshot_streams"); flagMap.put("ysql_yb_enable_replication_commands", "true"); + flagMap.put("yb_enable_cdc_consistent_snapshot_streams", "true"); return flagMap; } diff --git a/src/postgres/src/backend/commands/ybccmds.c b/src/postgres/src/backend/commands/ybccmds.c index bd11de86aff4..9e6d7efc088d 100644 --- a/src/postgres/src/backend/commands/ybccmds.c +++ b/src/postgres/src/backend/commands/ybccmds.c @@ -1898,18 +1898,6 @@ YBCGetReplicationSlot(const char *slot_name, YBCPgGetReplicationSlot(slot_name, replication_slot), error_message); } -void -YBCGetReplicationSlotStatus(const char *slot_name, - bool *active) -{ - char error_message[NAMEDATALEN + 64] = ""; - snprintf(error_message, sizeof(error_message), - "replication slot \"%s\" does not exist", slot_name); - - HandleYBStatusWithCustomErrorForNotFound( - YBCPgGetReplicationSlotStatus(slot_name, active), error_message); -} - void YBCDropReplicationSlot(const char *slot_name) { diff --git a/src/postgres/src/backend/replication/slot.c b/src/postgres/src/backend/replication/slot.c index e89cd6901555..07af399db8df 100644 --- a/src/postgres/src/backend/replication/slot.c +++ b/src/postgres/src/backend/replication/slot.c @@ -379,14 +379,14 @@ ReplicationSlotAcquire(const char *name, bool nowait) LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS); ConditionVariableInit(&slot->active_cv); + slot->data.confirmed_flush = yb_replication_slot->confirmed_flush; + slot->data.xmin = yb_replication_slot->xmin; /* - * Dummy values to always stream from the start. - * TODO(#20726): This has to be updated to support restarts. + * Set catalog_xmin as xmin to make the PG Debezium connector work. + * It is not used in our implementation. */ - slot->data.catalog_xmin = 0; - slot->data.confirmed_flush = 0; - slot->data.xmin = 0; - slot->data.restart_lsn = 0; + slot->data.catalog_xmin = yb_replication_slot->xmin; + slot->data.restart_lsn = yb_replication_slot->restart_lsn; MyReplicationSlot = slot; return; @@ -590,10 +590,10 @@ ReplicationSlotDrop(const char *name, bool nowait) */ if (IsYugaByteEnabled()) { - bool stream_active; + YBCReplicationSlotDescriptor *yb_replication_slot; + YBCGetReplicationSlot(name, &yb_replication_slot); - YBCGetReplicationSlotStatus(name, &stream_active); - if (stream_active) + if (yb_replication_slot->active) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active", name))); diff --git a/src/postgres/src/backend/replication/slotfuncs.c b/src/postgres/src/backend/replication/slotfuncs.c index fbb7a6acd850..25e3082865d4 100644 --- a/src/postgres/src/backend/replication/slotfuncs.c +++ b/src/postgres/src/backend/replication/slotfuncs.c @@ -187,8 +187,18 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) if (IsYugaByteEnabled()) { values[0] = CStringGetTextDatum(name->data); - /* Send lsn as NULL */ - nulls[1] = true; + + /* + * Send "0/2" as the consistent_point. The LSN "0/1" is reserved + * for the records to be streamed as part of the snapshot consumption. + * The first change record is always streamed with LSN "0/2". + * + * This value should be kept in sync with the confirmed_flush_lsn value + * being set during the creation of the CDC stream in the + * PopulateCDCStateTable function of xrepl_catalog_manager.cc. + */ + XLogRecPtr consistent_point = 2; + values[1] = LSNGetDatum(consistent_point); } else { @@ -354,11 +364,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) yb_stream_id = slot->stream_id; yb_stream_active = slot->active; - /* Fill in the dummy values. */ - xmin = InvalidXLogRecPtr; - catalog_xmin = InvalidXLogRecPtr; - restart_lsn = InvalidXLogRecPtr; - confirmed_flush_lsn = InvalidXLogRecPtr; + restart_lsn = slot->restart_lsn; + confirmed_flush_lsn = slot->confirmed_flush; + xmin = slot->xmin; + /* + * Set catalog_xmin as xmin to make the PG Debezium connector work. + * It is not used in our implementation. + */ + catalog_xmin = slot->xmin; + + /* Fill in the dummy/constant values. */ active_pid = 0; persistency = RS_PERSISTENT; } diff --git a/src/postgres/src/backend/replication/walsender.c b/src/postgres/src/backend/replication/walsender.c index 39b757e3bc4c..3de88b308a6c 100644 --- a/src/postgres/src/backend/replication/walsender.c +++ b/src/postgres/src/backend/replication/walsender.c @@ -1072,13 +1072,17 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ReplicationSlotSave(); } - /* - * Send "0/0" as the consistent wal location instead of a NULL value. This - * is so that the drivers which have a NULL check on the value continue to - * work. + /* + * Send "0/2" as the consistent wal location. The LSN "0/1" is reserved for + * the records to be streamed as part of the snapshot consumption. The first + * change record is always streamed with LSN "0/2". + * + * This value should be kept in sync with the confirmed_flush_lsn value + * being set during the creation of the CDC stream in the + * PopulateCDCStateTable function of xrepl_catalog_manager.cc. */ if (IsYugaByteEnabled()) - snprintf(xloc, sizeof(xloc), "%X/%X", 0, 0); + snprintf(xloc, sizeof(xloc), "%X/%X", 0, 2); else snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (MyReplicationSlot->data.confirmed_flush >> 32), diff --git a/src/postgres/src/include/commands/ybccmds.h b/src/postgres/src/include/commands/ybccmds.h index 71d64a00d89a..8246529d47fd 100644 --- a/src/postgres/src/include/commands/ybccmds.h +++ b/src/postgres/src/include/commands/ybccmds.h @@ -121,10 +121,6 @@ extern void YBCGetReplicationSlot(const char *slot_name, YBCReplicationSlotDescriptor **replication_slot); -extern void -YBCGetReplicationSlotStatus(const char *slot_name, - bool *active); - extern void YBCDropReplicationSlot(const char *slot_name); extern void YBCInitVirtualWalForCDC(const char *stream_id, diff --git a/src/postgres/src/test/regress/expected/yb_replication_slot.out b/src/postgres/src/test/regress/expected/yb_replication_slot.out index 2e10ff95f9c7..e3530e94ef9a 100644 --- a/src/postgres/src/test/regress/expected/yb_replication_slot.out +++ b/src/postgres/src/test/regress/expected/yb_replication_slot.out @@ -8,13 +8,13 @@ SET SESSION AUTHORIZATION 'regress_replicationslot_user'; SELECT * FROM pg_create_logical_replication_slot('testslot1', 'pgoutput', false); slot_name | lsn -----------+----- - testslot1 | + testslot1 | 0/2 (1 row) SELECT * FROM pg_create_logical_replication_slot('testslot2', 'pgoutput', false); slot_name | lsn -----------+----- - testslot2 | + testslot2 | 0/2 (1 row) -- Cannot do SELECT * since yb_stream_id changes across runs. @@ -23,8 +23,8 @@ SELECT slot_name, plugin, slot_type, database, temporary, active, FROM pg_replication_slots; slot_name | plugin | slot_type | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn -----------+----------+-----------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- - testslot2 | pgoutput | logical | yugabyte | f | f | | | | | - testslot1 | pgoutput | logical | yugabyte | f | f | | | | | + testslot2 | pgoutput | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2 + testslot1 | pgoutput | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2 (2 rows) -- drop the replication slot and create with same name again. @@ -38,7 +38,7 @@ SELECT * FROM pg_drop_replication_slot('testslot1'); SELECT * FROM pg_create_logical_replication_slot('testslot1', 'pgoutput', false); slot_name | lsn -----------+----- - testslot1 | + testslot1 | 0/2 (1 row) -- unsupported cases @@ -58,7 +58,7 @@ SET ROLE regress_replicationslot_replication_user; SELECT * FROM pg_create_logical_replication_slot('testslot3', 'pgoutput', false); slot_name | lsn -----------+----- - testslot3 | + testslot3 | 0/2 (1 row) RESET ROLE; diff --git a/src/yb/cdc/cdc_service.h b/src/yb/cdc/cdc_service.h index 1c3b52f8b3d2..dff8c96b7266 100644 --- a/src/yb/cdc/cdc_service.h +++ b/src/yb/cdc/cdc_service.h @@ -81,7 +81,6 @@ static const char* const kCheckpointType = "checkpoint_type"; static const char* const kStreamState = "state"; static const char* const kNamespaceId = "NAMESPACEID"; static const char* const kCDCSDKSnapshotDoneKey = "snapshot_done_key"; -static const char* const kCDCSDKSlotEntryTabletId = "dummy_id_for_replication_slot"; struct TabletCheckpoint { OpId op_id; diff --git a/src/yb/common/constants.h b/src/yb/common/constants.h index 15c0d32740b5..64bb348bfd87 100644 --- a/src/yb/common/constants.h +++ b/src/yb/common/constants.h @@ -42,4 +42,6 @@ static const char* const kObsoleteShortPrimaryTableId = "sys.catalog.uuid"; constexpr auto kPitrFeatureName = "PITR"; +constexpr auto kCDCSDKSlotEntryTabletId = "dummy_id_for_replication_slot"; + } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_test_base.cc b/src/yb/integration-tests/cdcsdk_test_base.cc index d580d284612f..f2b4c08e8b81 100644 --- a/src/yb/integration-tests/cdcsdk_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_test_base.cc @@ -417,6 +417,7 @@ Result CDCSDKTestBase::CreateDBStreamWithReplicationSlot( } Result CDCSDKTestBase::CreateConsistentSnapshotStreamWithReplicationSlot( + const std::string& slot_name, CDCSDKSnapshotOption snapshot_option, bool verify_snapshot_name) { auto repl_conn = VERIFY_RESULT(test_cluster_.ConnectToDBWithReplication(kNamespaceName)); @@ -430,7 +431,6 @@ Result CDCSDKTestBase::CreateConsistentSnapshotStreamWithReplic break; } - auto slot_name = GenerateRandomReplicationSlotName(); auto result = VERIFY_RESULT(repl_conn.FetchFormat( "CREATE_REPLICATION_SLOT $0 LOGICAL pgoutput $1", slot_name, snapshot_action)); auto snapshot_name = @@ -460,6 +460,13 @@ Result CDCSDKTestBase::CreateConsistentSnapshotStreamWithReplic return xrepl_stream_id; } +Result CDCSDKTestBase::CreateConsistentSnapshotStreamWithReplicationSlot( + CDCSDKSnapshotOption snapshot_option, bool verify_snapshot_name) { + auto slot_name = GenerateRandomReplicationSlotName(); + return CreateConsistentSnapshotStreamWithReplicationSlot( + slot_name, snapshot_option, verify_snapshot_name); +} + // This creates a Consistent Snapshot stream on the database kNamespaceName by default. Result CDCSDKTestBase::CreateConsistentSnapshotStream( CDCSDKSnapshotOption snapshot_option, diff --git a/src/yb/integration-tests/cdcsdk_test_base.h b/src/yb/integration-tests/cdcsdk_test_base.h index ac7f5c694b24..0a9dfe777f57 100644 --- a/src/yb/integration-tests/cdcsdk_test_base.h +++ b/src/yb/integration-tests/cdcsdk_test_base.h @@ -215,6 +215,10 @@ class CDCSDKTestBase : public YBTest { Result CreateDBStreamWithReplicationSlot( const std::string& replication_slot_name, CDCRecordType record_type = CDCRecordType::CHANGE); + Result CreateConsistentSnapshotStreamWithReplicationSlot( + const std::string& replication_slot_name, + CDCSDKSnapshotOption snapshot_option = CDCSDKSnapshotOption::USE_SNAPSHOT, + bool verify_snapshot_name = false); Result CreateConsistentSnapshotStreamWithReplicationSlot( CDCSDKSnapshotOption snapshot_option = CDCSDKSnapshotOption::USE_SNAPSHOT, bool verify_snapshot_name = false); diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 5ff3561cf15b..c7eccf0e23a5 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -7665,6 +7665,7 @@ TEST_F(CDCSDKYsqlTest, TestReplicationSlotDropWithActiveInvalid) { // Set the active window to a smaller value for faster test execution. ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_cdc_active_replication_slot_window_ms) = 10000 * yb::kTimeMultiplier; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; ASSERT_OK(SetUpWithParams(3, 1, false)); auto table = @@ -7673,16 +7674,8 @@ TEST_F(CDCSDKYsqlTest, TestReplicationSlotDropWithActiveInvalid) { google::protobuf::RepeatedPtrField tablets; ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr)); ASSERT_EQ(tablets.size(), 3); - xrepl::StreamId stream_id = - ASSERT_RESULT(CreateDBStreamWithReplicationSlot("repl_slot_drop_with_active_invalid")); - - // set checkpoint for each tablet. - for (int tablet_idx = 0; tablet_idx < tablets.size(); tablet_idx++) { - auto set_resp = ASSERT_RESULT(SetCDCCheckpoint( - stream_id, tablets, OpId::Min(), /*cdc_sdk_safe_time=*/0, /*initial_checkpoint=*/true, - tablet_idx)); - ASSERT_FALSE(set_resp.has_error()); - } + xrepl::StreamId stream_id = ASSERT_RESULT( + CreateConsistentSnapshotStreamWithReplicationSlot("repl_slot_drop_with_active_invalid")); auto repl_conn = ASSERT_RESULT(test_cluster_.ConnectToDBWithReplication(kNamespaceName)); diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index e2975c2ff9d6..207ff1aa2a35 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -1211,8 +1211,11 @@ Status CatalogManager::PopulateCDCStateTable(const xrepl::StreamId& stream_id, // 1. Represent the slot's consistent point i.e. first record sent in the streaming phase will // have LSN & txnID set to 2. // 2. Initialize components (LSN & txnID generators) of the CDCSDK Virtual WAL on restarts. + // + // If these values are changed here, also update the consistent point sent as part of the + // creation of logical replication slot in walsender.c and slotfuncs.c. if (FLAGS_ysql_yb_enable_replication_commands && has_consistent_snapshot_option) { - cdc::CDCStateTableEntry entry(cdc::kCDCSDKSlotEntryTabletId, stream_id); + cdc::CDCStateTableEntry entry(kCDCSDKSlotEntryTabletId, stream_id); entry.confirmed_flush_lsn = 2; entry.restart_lsn = 1; entry.xmin = 1; @@ -1220,7 +1223,7 @@ Status CatalogManager::PopulateCDCStateTable(const xrepl::StreamId& stream_id, entry.cdc_sdk_safe_time = consistent_snapshot_time; entries.push_back(entry); VLOG(1) << "Added entry in cdc_state for the replication slot with tablet_id: " - << cdc::kCDCSDKSlotEntryTabletId << " stream_id: " << stream_id; + << kCDCSDKSlotEntryTabletId << " stream_id: " << stream_id; } return cdc_state_table_->UpsertEntries(entries); diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index 2009c43e8013..0147c6a8ef12 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -49,8 +49,6 @@ service PgClientService { rpc GetDatabaseInfo(PgGetDatabaseInfoRequestPB) returns (PgGetDatabaseInfoResponsePB); rpc GetLockStatus(PgGetLockStatusRequestPB) returns (PgGetLockStatusResponsePB); rpc GetReplicationSlot(PgGetReplicationSlotRequestPB) returns (PgGetReplicationSlotResponsePB); - rpc GetReplicationSlotStatus(PgGetReplicationSlotStatusRequestPB) - returns (PgGetReplicationSlotStatusResponsePB); rpc IsInitDbDone(PgIsInitDbDoneRequestPB) returns (PgIsInitDbDoneResponsePB); rpc ListLiveTabletServers(PgListLiveTabletServersRequestPB) returns (PgListLiveTabletServersResponsePB); @@ -99,6 +97,10 @@ service PgClientService { rpc GetNewObjectId(PgGetNewObjectIdRequestPB) returns (PgGetNewObjectIdResponsePB); + // DEPRECATED: GetReplicationSlot RPC is a superset of this GetReplicationSlotStatus. + // So GetReplicationSlot should be used everywhere. + rpc GetReplicationSlotStatus(PgGetReplicationSlotStatusRequestPB) + returns (PgGetReplicationSlotStatusResponsePB); // DEPRECATED rpc SetActiveSubTransaction(PgSetActiveSubTransactionRequestPB) returns (PgSetActiveSubTransactionResponsePB); @@ -416,10 +418,6 @@ message PgGetLockStatusResponsePB { map transactions_by_node = 3; } -message PgGetReplicationSlotStatusRequestPB { - string replication_slot_name = 1; -} - message PgGetReplicationSlotRequestPB { string replication_slot_name = 1; @@ -436,6 +434,10 @@ enum ReplicationSlotStatus { INACTIVE = 1; } +message PgGetReplicationSlotStatusRequestPB { + string replication_slot_name = 1; +} + message PgGetReplicationSlotStatusResponsePB { AppStatusPB status = 1; @@ -481,6 +483,9 @@ message PgReplicationSlotInfoPB { bytes stream_id = 2; uint32 database_oid = 3; ReplicationSlotStatus replication_slot_status = 4; + uint64 confirmed_flush_lsn = 5; + uint64 restart_lsn = 6; + uint32 xmin = 7; } message PgListReplicationSlotsResponsePB { diff --git a/src/yb/tserver/pg_client_service.cc b/src/yb/tserver/pg_client_service.cc index 6a6c2a8abd2c..123963bf7738 100644 --- a/src/yb/tserver/pg_client_service.cc +++ b/src/yb/tserver/pg_client_service.cc @@ -1057,10 +1057,18 @@ class PgClientServiceImpl::Impl { // Determine latest active time of each stream if there are any. std::unordered_map stream_to_latest_active_time; + // stream id -> ((confirmed_flush, restart_lsn), xmin) + std::unordered_map, uint32_t>> + stream_to_metadata; if (!streams.empty()) { Status iteration_status; auto range_result = VERIFY_RESULT(cdc_state_table_->GetTableRange( - cdc::CDCStateTableEntrySelector().IncludeActiveTime(), &iteration_status)); + cdc::CDCStateTableEntrySelector() + .IncludeActiveTime() + .IncludeConfirmedFlushLSN() + .IncludeRestartLSN() + .IncludeXmin(), + &iteration_status)); for (auto entry_result : range_result) { RETURN_NOT_OK(entry_result); @@ -1068,6 +1076,19 @@ class PgClientServiceImpl::Impl { auto stream_id = entry.key.stream_id; auto active_time = entry.active_time; + + // The special entry storing the replication slot metadata set during the stream creation. + if (entry.key.tablet_id == kCDCSDKSlotEntryTabletId) { + DCHECK(!stream_to_metadata.contains(stream_id)); + DCHECK(entry.confirmed_flush_lsn.has_value()); + DCHECK(entry.restart_lsn.has_value()); + DCHECK(entry.xmin.has_value()); + + stream_to_metadata[stream_id] = std::make_pair( + std::make_pair(*entry.confirmed_flush_lsn, *entry.restart_lsn), *entry.xmin); + continue; + } + // If active_time isn't populated, then the (stream_id, tablet_id) pair hasn't been consumed // yet by the client. So treat it is as an inactive case. if (!active_time) { @@ -1100,6 +1121,11 @@ class PgClientServiceImpl::Impl { 1000 * GetAtomicFlag(&FLAGS_ysql_cdc_active_replication_slot_window_ms); replication_slot->set_replication_slot_status( (is_stream_active) ? ReplicationSlotStatus::ACTIVE : ReplicationSlotStatus::INACTIVE); + + auto slot_metadata = stream_to_metadata[*stream_id]; + replication_slot->set_confirmed_flush_lsn(slot_metadata.first.first); + replication_slot->set_restart_lsn(slot_metadata.first.second); + replication_slot->set_xmin(slot_metadata.second); } return Status::OK(); } @@ -1107,13 +1133,34 @@ class PgClientServiceImpl::Impl { Status GetReplicationSlot( const PgGetReplicationSlotRequestPB& req, PgGetReplicationSlotResponsePB* resp, rpc::RpcContext* context) { - LOG_WITH_FUNC(INFO) << "Start with req: " << req.DebugString(); auto stream = VERIFY_RESULT(client().GetCDCStream(ReplicationSlotName(req.replication_slot_name()))); stream.ToPB(resp->mutable_replication_slot_info()); + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(stream.stream_id)); + bool is_slot_active; + uint64_t confirmed_flush_lsn = 0; + uint64_t restart_lsn = 0; + uint32_t xmin = 0; + RETURN_NOT_OK(GetReplicationSlotInfoFromCDCState( + stream_id, &is_slot_active, &confirmed_flush_lsn, &restart_lsn, &xmin)); + resp->mutable_replication_slot_info()->set_replication_slot_status( + (is_slot_active) ? ReplicationSlotStatus::ACTIVE : ReplicationSlotStatus::INACTIVE); + + RSTATUS_DCHECK( + confirmed_flush_lsn != 0 && restart_lsn != 0 && xmin != 0, InternalError, + Format( + "Unexpected value present in the CDC state table. confirmed_flush_lsn: $0, " + "restart_lsn: $1, xmin: $2", + confirmed_flush_lsn, restart_lsn, xmin)); + resp->mutable_replication_slot_info()->set_confirmed_flush_lsn(confirmed_flush_lsn); + resp->mutable_replication_slot_info()->set_restart_lsn(restart_lsn); + resp->mutable_replication_slot_info()->set_xmin(xmin); return Status::OK(); } + // DEPRECATED: GetReplicationSlot RPC is a superset of this GetReplicationSlotStatus. + // So GetReplicationSlot should be used everywhere. Status GetReplicationSlotStatus( const PgGetReplicationSlotStatusRequestPB& req, PgGetReplicationSlotStatusResponsePB* resp, rpc::RpcContext* context) { @@ -1122,10 +1169,29 @@ class PgClientServiceImpl::Impl { VERIFY_RESULT(client().GetCDCStream(ReplicationSlotName(req.replication_slot_name()))); auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(stream.stream_id)); + bool is_slot_active; + uint64_t confirmed_flush_lsn; + uint64_t restart_lsn; + uint32_t xmin; + RETURN_NOT_OK(GetReplicationSlotInfoFromCDCState( + stream_id, &is_slot_active, &confirmed_flush_lsn, &restart_lsn, &xmin)); + resp->set_replication_slot_status( + (is_slot_active) ? ReplicationSlotStatus::ACTIVE : ReplicationSlotStatus::INACTIVE); + return Status::OK(); + } + + Status GetReplicationSlotInfoFromCDCState( + const xrepl::StreamId& stream_id, bool* active, uint64_t* confirmed_flush_lsn, + uint64_t* restart_lsn, uint32_t* xmin) { // TODO(#19850): Fetch only the entries belonging to the stream_id from the table. Status iteration_status; auto range_result = VERIFY_RESULT(cdc_state_table_->GetTableRange( - cdc::CDCStateTableEntrySelector().IncludeActiveTime(), &iteration_status)); + cdc::CDCStateTableEntrySelector() + .IncludeActiveTime() + .IncludeConfirmedFlushLSN() + .IncludeRestartLSN() + .IncludeXmin(), + &iteration_status)); // Find the latest active time for the stream across all tablets. uint64_t last_activity_time_micros = 0; @@ -1137,6 +1203,18 @@ class PgClientServiceImpl::Impl { continue; } + // The special entry storing the replication slot metadata set during the stream creation. + if (entry.key.tablet_id == kCDCSDKSlotEntryTabletId) { + DCHECK(entry.confirmed_flush_lsn.has_value()); + DCHECK(entry.restart_lsn.has_value()); + DCHECK(entry.xmin.has_value()); + + *DCHECK_NOTNULL(confirmed_flush_lsn) = *entry.confirmed_flush_lsn; + *DCHECK_NOTNULL(restart_lsn) = *entry.restart_lsn; + *DCHECK_NOTNULL(xmin) = *entry.xmin; + continue; + } + auto active_time = entry.active_time; // If active_time isn't populated, then the (stream_id, tablet_id) pair hasn't been consumed @@ -1151,10 +1229,9 @@ class PgClientServiceImpl::Impl { iteration_status.ok(), InternalError, "Unable to read the CDC state table", iteration_status); - auto is_stream_active = GetCurrentTimeMicros() - last_activity_time_micros <= - 1000 * GetAtomicFlag(&FLAGS_ysql_cdc_active_replication_slot_window_ms); - resp->set_replication_slot_status( - (is_stream_active) ? ReplicationSlotStatus::ACTIVE : ReplicationSlotStatus::INACTIVE); + *DCHECK_NOTNULL(active) = + GetCurrentTimeMicros() - last_activity_time_micros <= + 1000 * GetAtomicFlag(&FLAGS_ysql_cdc_active_replication_slot_window_ms); return Status::OK(); } diff --git a/src/yb/yql/pggate/pg_client.cc b/src/yb/yql/pggate/pg_client.cc index 74167c08f154..580cb49b9de4 100644 --- a/src/yb/yql/pggate/pg_client.cc +++ b/src/yb/yql/pggate/pg_client.cc @@ -951,18 +951,6 @@ class PgClient::Impl : public BigDataFetcher { return resp; } - Result GetReplicationSlotStatus( - const ReplicationSlotName& slot_name) { - tserver::PgGetReplicationSlotStatusRequestPB req; - req.set_replication_slot_name(slot_name.ToString()); - - tserver::PgGetReplicationSlotStatusResponsePB resp; - - RETURN_NOT_OK(proxy_->GetReplicationSlotStatus(req, &resp, PrepareController())); - RETURN_NOT_OK(ResponseStatus(resp)); - return resp; - } - Result ActiveSessionHistory() { tserver::PgActiveSessionHistoryRequestPB req; req.set_fetch_tserver_states(true); @@ -1299,11 +1287,6 @@ Result PgClient::GetReplicationSlot( return impl_->GetReplicationSlot(slot_name); } -Result PgClient::GetReplicationSlotStatus( - const ReplicationSlotName& slot_name) { - return impl_->GetReplicationSlotStatus(slot_name); -} - Result PgClient::ActiveSessionHistory() { return impl_->ActiveSessionHistory(); } diff --git a/src/yb/yql/pggate/pg_client.h b/src/yb/yql/pggate/pg_client.h index b9328aa275ef..abace119313a 100644 --- a/src/yb/yql/pggate/pg_client.h +++ b/src/yb/yql/pggate/pg_client.h @@ -233,9 +233,6 @@ class PgClient { Result GetReplicationSlot( const ReplicationSlotName& slot_name); - Result GetReplicationSlotStatus( - const ReplicationSlotName& slot_name); - Result ActiveSessionHistory(); Result InitVirtualWALForCDC( diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index be2f8213b7da..384606dfb367 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -1022,11 +1022,6 @@ Result PgSession::GetReplicationSlot( return pg_client_.GetReplicationSlot(slot_name); } -Result PgSession::GetReplicationSlotStatus( - const ReplicationSlotName& slot_name) { - return pg_client_.GetReplicationSlotStatus(slot_name); -} - PgWaitEventWatcher PgSession::StartWaitEvent(ash::WaitStateCode wait_event) { return {wait_starter_, wait_event}; } diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 534ed7637c77..901579c7dbad 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -367,9 +367,6 @@ class PgSession : public RefCountedThreadSafe { Result GetReplicationSlot( const ReplicationSlotName& slot_name); - Result GetReplicationSlotStatus( - const ReplicationSlotName& slot_name); - [[nodiscard]] PgWaitEventWatcher StartWaitEvent(ash::WaitStateCode wait_event); Result ActiveSessionHistory(); diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index 982cebb98434..563b5d16cc1a 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -2260,11 +2260,6 @@ Result PgApiImpl::GetReplicationSlot( return pg_session_->GetReplicationSlot(slot_name); } -Result PgApiImpl::GetReplicationSlotStatus( - const ReplicationSlotName& slot_name) { - return pg_session_->GetReplicationSlotStatus(slot_name); -} - Result PgApiImpl::InitVirtualWALForCDC( const std::string& stream_id, const std::vector& table_ids) { return pg_session_->pg_client().InitVirtualWALForCDC(stream_id, table_ids); diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index b604d86d97bc..a1577eaa8e45 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -751,9 +751,6 @@ class PgApiImpl { Result GetReplicationSlot( const ReplicationSlotName& slot_name); - Result GetReplicationSlotStatus( - const ReplicationSlotName& slot_name); - Result InitVirtualWALForCDC( const std::string& stream_id, const std::vector& table_ids); diff --git a/src/yb/yql/pggate/ybc_pg_typedefs.h b/src/yb/yql/pggate/ybc_pg_typedefs.h index af99a8db8f0a..bbbfff0aafcf 100644 --- a/src/yb/yql/pggate/ybc_pg_typedefs.h +++ b/src/yb/yql/pggate/ybc_pg_typedefs.h @@ -549,6 +549,9 @@ typedef struct PgReplicationSlotDescriptor { const char *stream_id; YBCPgOid database_oid; bool active; + uint64_t confirmed_flush; + uint64_t restart_lsn; + uint32_t xmin; } YBCReplicationSlotDescriptor; typedef struct PgDatumMessage { diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 8469fffec32a..3c4dada16ed2 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -1961,6 +1961,9 @@ YBCStatus YBCPgListReplicationSlots( .stream_id = YBCPAllocStdString(info.stream_id()), .database_oid = info.database_oid(), .active = info.replication_slot_status() == tserver::ReplicationSlotStatus::ACTIVE, + .confirmed_flush = info.confirmed_flush_lsn(), + .restart_lsn = info.restart_lsn(), + .xmin = info.xmin(), }; ++dest; } @@ -1985,23 +1988,14 @@ YBCStatus YBCPgGetReplicationSlot( .stream_id = YBCPAllocStdString(slot_info.stream_id()), .database_oid = slot_info.database_oid(), .active = slot_info.replication_slot_status() == tserver::ReplicationSlotStatus::ACTIVE, + .confirmed_flush = slot_info.confirmed_flush_lsn(), + .restart_lsn = slot_info.restart_lsn(), + .xmin = slot_info.xmin() }; return YBCStatusOK(); } -YBCStatus YBCPgGetReplicationSlotStatus(const char *slot_name, - bool *active) { - const auto replication_slot_name = ReplicationSlotName(std::string(slot_name)); - const auto result = pgapi->GetReplicationSlotStatus(replication_slot_name); - if (!result.ok()) { - return ToYBCStatus(result.status()); - } - - *active = result->replication_slot_status() == tserver::ReplicationSlotStatus::ACTIVE; - return YBCStatusOK(); -} - YBCStatus YBCPgNewDropReplicationSlot(const char *slot_name, YBCPgStatement *handle) { return ToYBCStatus(pgapi->NewDropReplicationSlot(slot_name, diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 07ebb82016a0..8846ea1c02eb 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -787,9 +787,6 @@ YBCStatus YBCPgListReplicationSlots( YBCStatus YBCPgGetReplicationSlot( const char *slot_name, YBCReplicationSlotDescriptor **replication_slot); -YBCStatus YBCPgGetReplicationSlotStatus(const char *slot_name, - bool *active); - YBCStatus YBCPgNewDropReplicationSlot(const char *slot_name, YBCPgStatement *handle); YBCStatus YBCPgExecDropReplicationSlot(YBCPgStatement handle);