-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
walsender.c
4228 lines (3658 loc) · 121 KB
/
walsender.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*-------------------------------------------------------------------------
*
* walsender.c
*
* The WAL sender process (walsender) is new as of Postgres 9.0. It takes
* care of sending XLOG from the primary server to a single recipient.
* (Note that there can be more than one walsender process concurrently.)
* It is started by the postmaster when the walreceiver of a standby server
* connects to the primary server and requests XLOG streaming replication.
*
* A walsender is similar to a regular backend, ie. there is a one-to-one
* relationship between a connection and a walsender process, but instead
* of processing SQL queries, it understands a small set of special
* replication-mode commands. The START_REPLICATION command begins streaming
* WAL to the client. While streaming, the walsender keeps reading XLOG
* records from the disk and sends them to the standby server over the
* COPY protocol, until either side ends the replication by exiting COPY
* mode (or until the connection is closed).
*
* Normal termination is by SIGTERM, which instructs the walsender to
* close the connection and exit(0) at the next convenient moment. Emergency
* termination is by SIGQUIT; like any backend, the walsender will simply
* abort and exit on SIGQUIT. A close of the connection and a FATAL error
* are treated as not a crash but approximately normal termination;
* the walsender will exit quickly without sending any more XLOG records.
*
* If the server is shut down, checkpointer sends us
* PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
* the backend is idle or runs an SQL query this causes the backend to
* shutdown, if logical replication is in progress all existing WAL records
* are processed followed by a shutdown. Otherwise this causes the walsender
* to switch to the "stopping" state. In this state, the walsender will reject
* any further replication commands. The checkpointer begins the shutdown
* checkpoint once all walsenders are confirmed as stopping. When the shutdown
* checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
* walsender to send any outstanding WAL, including the shutdown checkpoint
* record, wait for it to be replicated to the standby, and then exit.
*
*
* Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/walsender.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
/*
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
*
* We don't have a good idea of what a good value would be; there's some
* overhead per message in both walsender and walreceiver, but on the other
* hand sending large batches makes walsender less responsive to signals
* because signals are checked only between messages. 128kB (with
* default 8k blocks) seems like a reasonable guess for now.
*/
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
/* Array of WalSnds in shared memory */
WalSndCtlData *WalSndCtl = NULL;
/* My slot in the shared memory array */
WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process? */
bool am_cascading_walsender = false; /* Am I cascading WAL to another
* standby? */
bool am_db_walsender = false; /* Connected to a database? */
/* GUC variables */
int max_wal_senders = 10; /* the maximum number of concurrent
* walsenders */
int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
* data message */
bool log_replication_commands = false;
/*
* State for WalSndWakeupRequest
*/
bool wake_wal_senders = false;
/*
* xlogreader used for replication. Note that a WAL sender doing physical
* replication does not need xlogreader to read WAL, but it needs one to
* keep a state of its work.
*/
static XLogReaderState *xlogreader = NULL;
/*
* If the UPLOAD_MANIFEST command is used to provide a backup manifest in
* preparation for an incremental backup, uploaded_manifest will be point
* to an object containing information about its contexts, and
* uploaded_manifest_mcxt will point to the memory context that contains
* that object and all of its subordinate data. Otherwise, both values will
* be NULL.
*/
static IncrementalBackupInfo *uploaded_manifest = NULL;
static MemoryContext uploaded_manifest_mcxt = NULL;
/*
* These variables keep track of the state of the timeline we're currently
* sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
* the timeline is not the latest timeline on this server, and the server's
* history forked off from that timeline at sendTimeLineValidUpto.
*/
static TimeLineID sendTimeLine = 0;
static TimeLineID sendTimeLineNextTLI = 0;
static bool sendTimeLineIsHistoric = false;
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
/* Buffers for constructing outgoing messages and processing reply messages. */
static StringInfoData output_message;
static StringInfoData reply_message;
static StringInfoData tmpbuf;
/* Timestamp of last ProcessRepliesIfAny(). */
static TimestampTz last_processing = 0;
/*
* Timestamp of last ProcessRepliesIfAny() that saw a reply from the
* standby. Set to 0 if wal_sender_timeout doesn't need to be active.
*/
static TimestampTz last_reply_timestamp = 0;
/* Have we sent a heartbeat message asking for reply, since last reply? */
static bool waiting_for_ping_response = false;
/*
* While streaming WAL in Copy mode, streamingDoneSending is set to true
* after we have sent CopyDone. We should not send any more CopyData messages
* after that. streamingDoneReceiving is set to true when we receive CopyDone
* from the other end. When both become true, it's time to exit Copy mode.
*/
static bool streamingDoneSending;
static bool streamingDoneReceiving;
/* Are we there yet? */
static bool WalSndCaughtUp = false;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGUSR2 = false;
static volatile sig_atomic_t got_STOPPING = false;
/*
* This is set while we are streaming. When not set
* PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
* the main loop is responsible for checking got_STOPPING and terminating when
* it's set (after streaming any remaining WAL).
*/
static volatile sig_atomic_t replication_active = false;
static LogicalDecodingContext *logical_decoding_ctx = NULL;
/* A sample associating a WAL location with the time it was written. */
typedef struct
{
XLogRecPtr lsn;
TimestampTz time;
} WalTimeSample;
/* The size of our buffer of time samples. */
#define LAG_TRACKER_BUFFER_SIZE 8192
/* A mechanism for tracking replication lag. */
typedef struct
{
XLogRecPtr last_lsn;
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
int write_head;
int read_heads[NUM_SYNC_REP_WAIT_MODE];
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
} LagTracker;
static LagTracker *lag_tracker;
/* Signal handlers */
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
typedef void (*WalSndSendDataCallback) (void);
static void WalSndLoop(WalSndSendDataCallback send_data);
static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
static void WalSndShutdown(void) pg_attribute_noreturn();
static void XLogSendPhysical(void);
static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
static void IdentifySystem(void);
static void UploadManifest(void);
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset,
IncrementalBackupInfo *ib);
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
static void StartReplication(StartReplicationCmd *cmd);
static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
static void WalSndKeepaliveIfNecessary(void);
static void WalSndCheckTimeOut(void);
static long WalSndComputeSleeptime(TimestampTz now);
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
bool skipped_xact);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p);
/* Initialize walsender process before entering the main command loop */
void
InitWalSender(void)
{
am_cascading_walsender = RecoveryInProgress();
/* Create a per-walsender data structure in shared memory */
InitWalSenderSlot();
/* need resource owner for e.g. basebackups */
CreateAuxProcessResourceOwner();
/*
* Let postmaster know that we're a WAL sender. Once we've declared us as
* a WAL sender process, postmaster will let us outlive the bgwriter and
* kill us last in the shutdown sequence, so we get a chance to stream all
* remaining WAL at shutdown, including the shutdown checkpoint. Note that
* there's no going back, and we mustn't write any WAL records after this.
*/
MarkPostmasterChildWalSender();
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/*
* If the client didn't specify a database to connect to, show in PGPROC
* that our advertised xmin should affect vacuum horizons in all
* databases. This allows physical replication clients to send hot
* standby feedback that will delay vacuum cleanup in all databases.
*/
if (MyDatabaseId == InvalidOid)
{
Assert(MyProc->xmin == InvalidTransactionId);
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
MyProc->statusFlags |= PROC_AFFECTS_ALL_HORIZONS;
ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
LWLockRelease(ProcArrayLock);
}
/* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
}
/*
* Clean up after an error.
*
* WAL sender processes don't use transactions like regular backends do.
* This function does any cleanup required after an error in a WAL sender
* process, similar to what transaction abort does in a regular backend.
*/
void
WalSndErrorCleanup(void)
{
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
wal_segment_close(xlogreader);
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
ReplicationSlotCleanup(false);
replication_active = false;
/*
* If there is a transaction in progress, it will clean up our
* ResourceOwner, but if a replication command set up a resource owner
* without a transaction, we've got to clean that up now.
*/
if (!IsTransactionOrTransactionBlock())
ReleaseAuxProcessResources(false);
if (got_STOPPING || got_SIGUSR2)
proc_exit(0);
/* Revert back to startup state */
WalSndSetState(WALSNDSTATE_STARTUP);
}
/*
* Handle a client's connection abort in an orderly manner.
*/
static void
WalSndShutdown(void)
{
/*
* Reset whereToSendOutput to prevent ereport from attempting to send any
* more messages to the standby.
*/
if (whereToSendOutput == DestRemote)
whereToSendOutput = DestNone;
proc_exit(0);
abort(); /* keep the compiler quiet */
}
/*
* Handle the IDENTIFY_SYSTEM command.
*/
static void
IdentifySystem(void)
{
char sysid[32];
char xloc[MAXFNAMELEN];
XLogRecPtr logptr;
char *dbname = NULL;
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[4];
bool nulls[4] = {0};
TimeLineID currTLI;
/*
* Reply with a result set with one row, four columns. First col is system
* ID, second is timeline ID, third is current xlog location and the
* fourth contains the database name if we are connected to one.
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
GetSystemIdentifier());
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
logptr = GetStandbyFlushRecPtr(&currTLI);
else
logptr = GetFlushRecPtr(&currTLI);
snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
if (MyDatabaseId != InvalidOid)
{
MemoryContext cur = CurrentMemoryContext;
/* syscache access needs a transaction env. */
StartTransactionCommand();
dbname = get_database_name(MyDatabaseId);
/* copy dbname out of TX context */
dbname = MemoryContextStrdup(cur, dbname);
CommitTransactionCommand();
}
dest = CreateDestReceiver(DestRemoteSimple);
/* need a tuple descriptor representing four columns */
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
TEXTOID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
INT8OID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
TEXTOID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
TEXTOID, -1, 0);
/* prepare for projection of tuples */
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
/* column 1: system identifier */
values[0] = CStringGetTextDatum(sysid);
/* column 2: timeline */
values[1] = Int64GetDatum(currTLI);
/* column 3: wal location */
values[2] = CStringGetTextDatum(xloc);
/* column 4: database name, or NULL if none */
if (dbname)
values[3] = CStringGetTextDatum(dbname);
else
nulls[3] = true;
/* send it to dest */
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
}
/* Handle READ_REPLICATION_SLOT command */
static void
ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
{
#define READ_REPLICATION_SLOT_COLS 3
ReplicationSlot *slot;
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[READ_REPLICATION_SLOT_COLS] = {0};
bool nulls[READ_REPLICATION_SLOT_COLS];
tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
TEXTOID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
TEXTOID, -1, 0);
/* TimeLineID is unsigned, so int4 is not wide enough. */
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
INT8OID, -1, 0);
memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
slot = SearchNamedReplicationSlot(cmd->slotname, false);
if (slot == NULL || !slot->in_use)
{
LWLockRelease(ReplicationSlotControlLock);
}
else
{
ReplicationSlot slot_contents;
int i = 0;
/* Copy slot contents while holding spinlock */
SpinLockAcquire(&slot->mutex);
slot_contents = *slot;
SpinLockRelease(&slot->mutex);
LWLockRelease(ReplicationSlotControlLock);
if (OidIsValid(slot_contents.data.database))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use %s with a logical replication slot",
"READ_REPLICATION_SLOT"));
/* slot type */
values[i] = CStringGetTextDatum("physical");
nulls[i] = false;
i++;
/* start LSN */
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
{
char xloc[64];
snprintf(xloc, sizeof(xloc), "%X/%X",
LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
values[i] = CStringGetTextDatum(xloc);
nulls[i] = false;
}
i++;
/* timeline this WAL was produced on */
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
{
TimeLineID slots_position_timeline;
TimeLineID current_timeline;
List *timeline_history = NIL;
/*
* While in recovery, use as timeline the currently-replaying one
* to get the LSN position's history.
*/
if (RecoveryInProgress())
(void) GetXLogReplayRecPtr(¤t_timeline);
else
current_timeline = GetWALInsertionTimeLine();
timeline_history = readTimeLineHistory(current_timeline);
slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
timeline_history);
values[i] = Int64GetDatum((int64) slots_position_timeline);
nulls[i] = false;
}
i++;
Assert(i == READ_REPLICATION_SLOT_COLS);
}
dest = CreateDestReceiver(DestRemoteSimple);
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
}
/*
* Handle TIMELINE_HISTORY command.
*/
static void
SendTimeLineHistory(TimeLineHistoryCmd *cmd)
{
DestReceiver *dest;
TupleDesc tupdesc;
StringInfoData buf;
char histfname[MAXFNAMELEN];
char path[MAXPGPATH];
int fd;
off_t histfilelen;
off_t bytesleft;
Size len;
dest = CreateDestReceiver(DestRemoteSimple);
/*
* Reply with a result set with one row, and two columns. The first col is
* the name of the history file, 2nd is the contents.
*/
tupdesc = CreateTemplateTupleDesc(2);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
TLHistoryFileName(histfname, cmd->timeline);
TLHistoryFilePath(path, cmd->timeline);
/* Send a RowDescription message */
dest->rStartup(dest, CMD_SELECT, tupdesc);
/* Send a DataRow message */
pq_beginmessage(&buf, PqMsg_DataRow);
pq_sendint16(&buf, 2); /* # of columns */
len = strlen(histfname);
pq_sendint32(&buf, len); /* col1 len */
pq_sendbytes(&buf, histfname, len);
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
if (fd < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", path)));
/* Determine file length and send it to client */
histfilelen = lseek(fd, 0, SEEK_END);
if (histfilelen < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek to end of file \"%s\": %m", path)));
if (lseek(fd, 0, SEEK_SET) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek to beginning of file \"%s\": %m", path)));
pq_sendint32(&buf, histfilelen); /* col2 len */
bytesleft = histfilelen;
while (bytesleft > 0)
{
PGAlignedBlock rbuf;
int nread;
pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
nread = read(fd, rbuf.data, sizeof(rbuf));
pgstat_report_wait_end();
if (nread < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m",
path)));
else if (nread == 0)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read file \"%s\": read %d of %zu",
path, nread, (Size) bytesleft)));
pq_sendbytes(&buf, rbuf.data, nread);
bytesleft -= nread;
}
if (CloseTransientFile(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m", path)));
pq_endmessage(&buf);
}
/*
* Handle UPLOAD_MANIFEST command.
*/
static void
UploadManifest(void)
{
MemoryContext mcxt;
IncrementalBackupInfo *ib;
off_t offset = 0;
StringInfoData buf;
/*
* parsing the manifest will use the cryptohash stuff, which requires a
* resource owner
*/
Assert(AuxProcessResourceOwner != NULL);
Assert(CurrentResourceOwner == AuxProcessResourceOwner ||
CurrentResourceOwner == NULL);
CurrentResourceOwner = AuxProcessResourceOwner;
/* Prepare to read manifest data into a temporary context. */
mcxt = AllocSetContextCreate(CurrentMemoryContext,
"incremental backup information",
ALLOCSET_DEFAULT_SIZES);
ib = CreateIncrementalBackupInfo(mcxt);
/* Send a CopyInResponse message */
pq_beginmessage(&buf, PqMsg_CopyInResponse);
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_reuse(&buf);
pq_flush();
/* Receive packets from client until done. */
while (HandleUploadManifestPacket(&buf, &offset, ib))
;
/* Finish up manifest processing. */
FinalizeIncrementalManifest(ib);
/*
* Discard any old manifest information and arrange to preserve the new
* information we just got.
*
* We assume that MemoryContextDelete and MemoryContextSetParent won't
* fail, and thus we shouldn't end up bailing out of here in such a way as
* to leave dangling pointers.
*/
if (uploaded_manifest_mcxt != NULL)
MemoryContextDelete(uploaded_manifest_mcxt);
MemoryContextSetParent(mcxt, CacheMemoryContext);
uploaded_manifest = ib;
uploaded_manifest_mcxt = mcxt;
/* clean up the resource owner we created */
ReleaseAuxProcessResources(true);
}
/*
* Process one packet received during the handling of an UPLOAD_MANIFEST
* operation.
*
* 'buf' is scratch space. This function expects it to be initialized, doesn't
* care what the current contents are, and may override them with completely
* new contents.
*
* The return value is true if the caller should continue processing
* additional packets and false if the UPLOAD_MANIFEST operation is complete.
*/
static bool
HandleUploadManifestPacket(StringInfo buf, off_t *offset,
IncrementalBackupInfo *ib)
{
int mtype;
int maxmsglen;
HOLD_CANCEL_INTERRUPTS();
pq_startmsgread();
mtype = pq_getbyte();
if (mtype == EOF)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection with an open transaction")));
switch (mtype)
{
case 'd': /* CopyData */
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
case 'c': /* CopyDone */
case 'f': /* CopyFail */
case 'H': /* Flush */
case 'S': /* Sync */
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected message type 0x%02X during COPY from stdin",
mtype)));
maxmsglen = 0; /* keep compiler quiet */
break;
}
/* Now collect the message body */
if (pq_getmessage(buf, maxmsglen))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection with an open transaction")));
RESUME_CANCEL_INTERRUPTS();
/* Process the message */
switch (mtype)
{
case 'd': /* CopyData */
AppendIncrementalManifestData(ib, buf->data, buf->len);
return true;
case 'c': /* CopyDone */
return false;
case 'H': /* Sync */
case 'S': /* Flush */
/* Ignore these while in CopyOut mode as we do elsewhere. */
return true;
case 'f':
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
pq_getmsgstring(buf))));
}
/* Not reached. */
Assert(false);
return false;
}
/*
* Handle START_REPLICATION command.
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
*/
static void
StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
XLogRecPtr FlushPtr;
TimeLineID FlushTLI;
/* create xlogreader for physical replication */
xlogreader =
XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
errdetail("Failed while allocating a WAL reading processor.")));
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
* difficult for there to be WAL data that we can still see that was
* written at wal_level='minimal'.
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a logical replication slot for physical replication")));
/*
* We don't need to verify the slot's restart_lsn here; instead we
* rely on the caller requesting the starting point to use. If the
* WAL segment doesn't exist, we'll fail later.
*/
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record.
*/
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
else
FlushPtr = GetFlushRecPtr(&FlushTLI);
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
if (sendTimeLine == FlushTLI)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
}
else
{
List *timeLineHistory;
sendTimeLineIsHistoric = true;
/*
* Check that the timeline the client requested exists, and the
* requested start location is on that timeline.
*/
timeLineHistory = readTimeLineHistory(FlushTLI);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
/*
* Found the requested timeline in the history. Check that
* requested startpoint is on that timeline in our history.
*
* This is quite loose on purpose. We only check that we didn't
* fork off the requested timeline before the switchpoint. We
* don't check that we switched *to* it before the requested
* starting point. This is because the client can legitimately
* request to start replication from the beginning of the WAL
* segment that contains switchpoint, but on the new timeline, so
* that it doesn't end up with a partial segment. If you ask for
* too old a starting point, you'll get an error later when we
* fail to find the requested WAL segment in pg_wal.
*
* XXX: we could be more strict here and only allow a startpoint
* that's older than the switchpoint, if it's still in the same
* WAL segment.
*/
if (!XLogRecPtrIsInvalid(switchpoint) &&
switchpoint < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
LSN_FORMAT_ARGS(cmd->startpoint),
cmd->timeline),
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
LSN_FORMAT_ARGS(switchpoint))));
}
sendTimeLineValidUpto = switchpoint;
}
}
else
{
sendTimeLine = FlushTLI;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
}
streamingDoneSending = streamingDoneReceiving = false;
/* If there is nothing to stream, don't even enter COPY mode */
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, PqMsg_CopyBothResponse);
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
pq_flush();
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
LSN_FORMAT_ARGS(cmd->startpoint),
LSN_FORMAT_ARGS(FlushPtr))));
}
/* Start streaming from the requested point */
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Main loop of walsender */
replication_active = true;
WalSndLoop(XLogSendPhysical);
replication_active = false;
if (got_STOPPING)
proc_exit(0);
WalSndSetState(WALSNDSTATE_STARTUP);
Assert(streamingDoneSending && streamingDoneReceiving);
}
if (cmd->slotname)
ReplicationSlotRelease();
/*
* Copy is finished now. Send a single-row result set indicating the next
* timeline.
*/
if (sendTimeLineIsHistoric)
{
char startpos_str[8 + 1 + 8 + 1];
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2] = {0};
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
LSN_FORMAT_ARGS(sendTimeLineValidUpto));
dest = CreateDestReceiver(DestRemoteSimple);
/*
* Need a tuple descriptor representing two columns. int8 may seem
* like a surprising data type for this, but in theory int4 would not
* be wide enough for this, as TimeLineID is unsigned.
*/
tupdesc = CreateTemplateTupleDesc(2);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
INT8OID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
TEXTOID, -1, 0);
/* prepare for projection of tuple */
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
values[1] = CStringGetTextDatum(startpos_str);