Skip to content

Commit

Permalink
[BACKPORT 2024.1][#22449] YSQL: Import 'Fix the logical replication t…
Browse files Browse the repository at this point in the history
…imeout during large transactions.'

Summary:
Original commit: cd5af9e / D35740
This diff imports the upstream PG commit `Fix the logical replication timeout during large transactions.` - `87c1dd246af8ace926645900f02886905b889718`.

PG commit: postgres/postgres@87c1dd2
PG commit description:
```
The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published or because all the changes
got filtered. We do try to send the keep_alive if necessary at the end of
the transaction (via WalSndWriteData()) but by that time the
subscriber-side can timeout and exit.

To fix this we try to send the keepalive message if required after
processing certain threshold of changes.

Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
```
######Backport Description
No merge conflicts were encountered.

Jira: DB-11366

Test Plan: Jenkins: test regex: .*ReplicationSlot.*

Reviewers: stiwary

Reviewed By: stiwary

Subscribers: ycdcxcluster, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D35743
  • Loading branch information
Sumukh-Phalgaonkar committed Jun 12, 2024
1 parent be6e0b0 commit 788715f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 10 deletions.
10 changes: 10 additions & 0 deletions src/postgres/src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i

/* set output state */
ctx->accept_writes = false;
ctx->end_xact = false;

/* do the actual work: call callback */
ctx->callbacks.startup_cb(ctx, opt, is_init);
Expand Down Expand Up @@ -696,6 +697,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)

/* set output state */
ctx->accept_writes = false;
ctx->end_xact = false;

/* do the actual work: call callback */
ctx->callbacks.shutdown_cb(ctx);
Expand Down Expand Up @@ -731,6 +733,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
ctx->end_xact = false;

/* do the actual work: call callback */
ctx->callbacks.begin_cb(ctx, txn);
Expand Down Expand Up @@ -762,6 +765,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
ctx->end_xact = true;

/* do the actual work: call callback */
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
Expand Down Expand Up @@ -801,6 +805,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;

ctx->end_xact = false;

ctx->callbacks.change_cb(ctx, txn, relation, change);

/* Pop the error context stack */
Expand Down Expand Up @@ -841,6 +847,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;

ctx->end_xact = false;

ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);

/* Pop the error context stack */
Expand All @@ -867,6 +875,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)

/* set output state */
ctx->accept_writes = false;
ctx->end_xact = false;

/* do the actual work: call callback */
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
Expand Down Expand Up @@ -904,6 +913,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
ctx->end_xact = false;

/* do the actual work: call callback */
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
Expand Down
40 changes: 39 additions & 1 deletion src/postgres/src/backend/replication/pgoutput/pgoutput.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void update_replication_progress(LogicalDecodingContext *ctx);

/* Entry in the map used to remember which relation schemas we sent. */
typedef struct RelationSyncEntry
Expand Down Expand Up @@ -260,7 +261,7 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
OutputPluginUpdateProgress(ctx);
update_replication_progress(ctx);

OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
Expand Down Expand Up @@ -322,6 +323,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContext old;
RelationSyncEntry *relentry;

update_replication_progress(ctx);

if (!is_publishable_relation(relation))
return;

Expand Down Expand Up @@ -415,6 +418,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelids;
Oid *relids;

update_replication_progress(ctx);

old = MemoryContextSwitchTo(data->context);

relids = palloc0(nrelations * sizeof(Oid));
Expand Down Expand Up @@ -694,3 +699,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
entry->replicate_valid = false;
}

/*
* Try to update progress and send a keepalive message if too many changes were
* processed.
*
* For a large transaction, if we don't send any change to the downstream for a
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
* This can happen when all or most of the changes are not published.
*/
static void
update_replication_progress(LogicalDecodingContext *ctx)
{
static int changes_count = 0;

/*
* We don't want to try sending a keepalive message after processing each
* change as that can have overhead. Tests revealed that there is no
* noticeable overhead in doing it after continuously processing 100 or so
* changes.
*/
#define CHANGES_THRESHOLD 100

/*
* If we are at the end of transaction LSN, update progress tracking.
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
* try to send a keepalive message if required.
*/
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
{
OutputPluginUpdateProgress(ctx);
changes_count = 0;
}
}
46 changes: 37 additions & 9 deletions src/postgres/src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply);
static void WalSndKeepaliveIfNecessary(void);
static void WalSndCheckTimeOut(void);
Expand Down Expand Up @@ -1376,6 +1377,20 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}

/* If we have pending write here, go to slow path */
ProcessPendingWrites();

if (IsYugaByteEnabled())
YbWalSndTotalTimeInSendingMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
* Wait until there is no pending write. Also process replies from the other
* side and check timeouts during that.
*/
static void
ProcessPendingWrites(void)
{
for (;;)
{
int wakeEvents;
Expand Down Expand Up @@ -1428,10 +1443,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
WalSndShutdown();
}

if (IsYugaByteEnabled())
YbWalSndTotalTimeInSendingMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);

/* reactivate latch so WalSndLoop knows to continue */
SetLatch(MyLatch);
}
Expand All @@ -1446,18 +1457,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();
bool end_xact = ctx->end_xact;

/*
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
* avoid flooding the lag tracker when we commit frequently.
*
* We don't have a mechanism to get the ack for any LSN other than end
* xact LSN from the downstream. So, we track lag only for end of
* transaction LSN.
*/
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
if (!TimestampDifferenceExceeds(sendTime, now,
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
return;
if (end_xact && TimestampDifferenceExceeds(sendTime, now,
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
{
LagTrackerWrite(lsn, now);
sendTime = now;
}

LagTrackerWrite(lsn, now);
sendTime = now;
/*
* Try to send a keepalive if required. We don't need to try sending keep
* alive messages at the transaction end as that will be done at a later
* point in time. This is required only for large transactions where we
* don't send any changes to the downstream and the receiver can timeout
* due to that.
*/
if (!end_xact &&
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2))
ProcessPendingWrites();
}

/*
Expand Down
3 changes: 3 additions & 0 deletions src/postgres/src/include/replication/logical.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ typedef struct LogicalDecodingContext
*/
bool fast_forward;

/* Are we processing the end LSN of a transaction? */
bool end_xact;

OutputPluginCallbacks callbacks;
OutputPluginOptions options;

Expand Down

0 comments on commit 788715f

Please sign in to comment.