Skip to content

Commit

Permalink
Register custom xlog reader callbacks for on-demand WAL download in S…
Browse files Browse the repository at this point in the history
…tartupDecodingContext
  • Loading branch information
Konstantin Knizhnik committed Sep 16, 2024
1 parent 9156d63 commit bb9a784
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 19 deletions.
11 changes: 8 additions & 3 deletions src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include "utils/inval.h"
#include "utils/memutils.h"

void (*Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

/* data for errcontext callback */
typedef struct LogicalErrorCallbackState
{
Expand Down Expand Up @@ -181,6 +183,12 @@ StartupDecodingContext(List *output_plugin_options,
if (!fast_forward)
LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));

/*
* NEON: override page_read/segment_open/segment_close functions to support on-demand WAL download
*/
if (Custom_XLogReaderRoutines != NULL)
Custom_XLogReaderRoutines(xl_routine);

/*
* Now that the slot's xmin has been set, we can announce ourselves as a
* logical decoding backend which doesn't need to be checked individually
Expand Down Expand Up @@ -2081,9 +2089,6 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
xlr.segment_open = wal_segment_open;
xlr.segment_close = wal_segment_close;

if (SlotFuncs_Custom_XLogReaderRoutines != NULL)
SlotFuncs_Custom_XLogReaderRoutines(&xlr);

/*
* Create our decoding context in fast_forward mode, passing start_lsn
* as InvalidXLogRecPtr, so that we start processing from my slot's
Expand Down
5 changes: 0 additions & 5 deletions src/backend/replication/logical/logicalfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
#include "utils/regproc.h"
#include "utils/resowner.h"

void (*LogicalFuncs_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

/* Private data for writing out data */
typedef struct DecodingOutputState
{
Expand Down Expand Up @@ -208,9 +206,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
xlr.segment_open = wal_segment_open;
xlr.segment_close = wal_segment_close;

if (LogicalFuncs_Custom_XLogReaderRoutines != NULL)
LogicalFuncs_Custom_XLogReaderRoutines(&xlr);

/* restart at slot's confirmed_flush */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
Expand Down
2 changes: 0 additions & 2 deletions src/backend/replication/slotfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "utils/pg_lsn.h"
#include "utils/resowner.h"

void (*SlotFuncs_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

/*
* Helper function for creating a new physical replication slot with
* given arguments. Note that this function doesn't release the created
Expand Down
3 changes: 0 additions & 3 deletions src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
* data message */
bool log_replication_commands = false;

void (*WalSender_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);
/*
* State for WalSndWakeupRequest
*/
Expand Down Expand Up @@ -1482,8 +1481,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
xlr.page_read = logical_read_xlog_page;
xlr.segment_open = WalSndSegmentOpen;
xlr.segment_close = wal_segment_close;
if (WalSender_Custom_XLogReaderRoutines != NULL)
WalSender_Custom_XLogReaderRoutines(&xlr);

/*
* Create our decoding context, making it start at the previously ack'ed
Expand Down
2 changes: 1 addition & 1 deletion src/include/replication/logical.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ typedef struct LogicalDecodingContext
bool processing_required;
} LogicalDecodingContext;

extern void (*LogicalFuncs_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);
extern void (*Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

extern void CheckLogicalDecodingRequirements(void);

Expand Down
2 changes: 0 additions & 2 deletions src/include/replication/slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ typedef struct ReplicationSlot
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)

extern void (*SlotFuncs_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

/*
* Shared memory control area for all of replication slots.
*/
Expand Down
3 changes: 0 additions & 3 deletions src/include/replication/walsender.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ extern PGDLLIMPORT int max_wal_senders;
extern PGDLLIMPORT int wal_sender_timeout;
extern PGDLLIMPORT bool log_replication_commands;

struct XLogReaderRoutine;
extern PGDLLIMPORT void (*WalSender_Custom_XLogReaderRoutines)(struct XLogReaderRoutine *xlr);

extern void InitWalSender(void);
extern bool exec_replication_command(const char *cmd_string);
extern void WalSndErrorCleanup(void);
Expand Down

0 comments on commit bb9a784

Please sign in to comment.