-
Notifications
You must be signed in to change notification settings - Fork 2
/
parallel.c
1757 lines (1529 loc) · 46.5 KB
/
parallel.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*-------------------------------------------------------------------------
*
* parallel.c
*
* Parallel support for pg_dump and pg_restore
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/bin/pg_dump/parallel.c
*
*-------------------------------------------------------------------------
*/
/*
* Parallel operation works like this:
*
* The original, master process calls ParallelBackupStart(), which forks off
* the desired number of worker processes, which each enter WaitForCommands().
*
* The master process dispatches an individual work item to one of the worker
* processes in DispatchJobForTocEntry(). That calls
* AH->MasterStartParallelItemPtr, a routine of the output format. This
* function's arguments are the parents archive handle AH (containing the full
* catalog information), the TocEntry that the worker should work on and a
* T_Action value indicating whether this is a backup or a restore task. The
* function simply converts the TocEntry assignment into a command string that
* is then sent over to the worker process. In the simplest case that would be
* something like "DUMP 1234", with 1234 being the TocEntry id.
*
* The worker process receives and decodes the command and passes it to the
* routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
* which are routines of the current archive format. That routine performs
* the required action (dump or restore) and returns a malloc'd status string.
* The status string is passed back to the master where it is interpreted by
* AH->MasterEndParallelItemPtr, another format-specific routine. That
* function can update state or catalog information on the master's side,
* depending on the reply from the worker process. In the end it returns a
* status code, which is 0 for successful execution.
*
* Remember that we have forked off the workers only after we have read in
* the catalog. That's why our worker processes can also access the catalog
* information. (In the Windows case, the workers are threads in the same
* process. To avoid problems, they work with cloned copies of the Archive
* data structure; see RunWorker().)
*
* In the master process, the workerStatus field for each worker has one of
* the following values:
* WRKR_IDLE: it's waiting for a command
* WRKR_WORKING: it's been sent a command
* WRKR_FINISHED: it's returned a result
* WRKR_TERMINATED: process ended
* The FINISHED state indicates that the worker is idle, but we've not yet
* dealt with the status code it returned from the prior command.
* ReapWorkerStatus() extracts the unhandled command status value and sets
* the workerStatus back to WRKR_IDLE.
*/
#include "postgres_fe.h"
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include "pg_backup_utils.h"
#include "parallel.h"
#ifndef WIN32
#include <sys/types.h>
#include <sys/wait.h>
#include "signal.h"
#include <unistd.h>
#include <fcntl.h>
#endif
/* Mnemonic macros for indexing the fd array returned by pipe(2) */
#define PIPE_READ 0
#define PIPE_WRITE 1
#ifdef WIN32
/*
* Structure to hold info passed by _beginthreadex() to the function it calls
* via its single allowed argument.
*/
typedef struct
{
ArchiveHandle *AH;
ParallelSlot *slot;
RestoreOptions *ropt;
} WorkerInfo;
/* Windows implementation of pipe access */
static int pgpipe(int handles[2]);
static int piperead(int s, char *buf, int len);
#define pipewrite(a,b,c) send(a,b,c,0)
#else /* !WIN32 */
/* Non-Windows implementation of pipe access */
#define pgpipe(a) pipe(a)
#define piperead(a,b,c) read(a,b,c)
#define pipewrite(a,b,c) write(a,b,c)
#endif /* WIN32 */
/*
* State info for archive_close_connection() shutdown callback.
*/
typedef struct ShutdownInformation
{
ParallelState *pstate;
Archive *AHX;
} ShutdownInformation;
static ShutdownInformation shutdown_info;
/*
* State info for signal handling.
* We assume signal_info initializes to zeroes.
*
* On Unix, myAH is the master DB connection in the master process, and the
* worker's own connection in worker processes. On Windows, we have only one
* instance of signal_info, so myAH is the master connection and the worker
* connections must be dug out of pstate->parallelSlot[].
*/
typedef struct DumpSignalInformation
{
ArchiveHandle *myAH; /* database connection to issue cancel for */
ParallelState *pstate; /* parallel state, if any */
bool handler_set; /* signal handler set up in this process? */
#ifndef WIN32
bool am_worker; /* am I a worker process? */
#endif
} DumpSignalInformation;
static volatile DumpSignalInformation signal_info;
#ifdef WIN32
static CRITICAL_SECTION signal_info_lock;
#endif
/*
* Write a simple string to stderr --- must be safe in a signal handler.
* We ignore the write() result since there's not much we could do about it.
* Certain compilers make that harder than it ought to be.
*/
#define write_stderr(str) \
do { \
const char *str_ = (str); \
int rc_; \
rc_ = write(fileno(stderr), str_, strlen(str_)); \
(void) rc_; \
} while (0)
#ifdef WIN32
/* file-scope variables */
static DWORD tls_index;
/* globally visible variables (needed by exit_nicely) */
bool parallel_init_done = false;
DWORD mainThreadId;
#endif /* WIN32 */
static const char *modulename = gettext_noop("parallel archiver");
/* Local function prototypes */
static ParallelSlot *GetMyPSlot(ParallelState *pstate);
static void archive_close_connection(int code, void *arg);
static void ShutdownWorkersHard(ParallelState *pstate);
static void WaitForTerminatingWorkers(ParallelState *pstate);
static void setup_cancel_handler(void);
static void set_cancel_pstate(ParallelState *pstate);
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot, RestoreOptions *ropt);
static bool HasEveryWorkerTerminated(ParallelState *pstate);
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
static char *getMessageFromMaster(int pipefd[2]);
static void sendMessageToMaster(int pipefd[2], const char *str);
static int select_loop(int maxFd, fd_set *workerset);
static char *getMessageFromWorker(ParallelState *pstate,
bool do_wait, int *worker);
static void sendMessageToWorker(ParallelState *pstate,
int worker, const char *str);
static char *readMessageFromPipe(int fd);
#define messageStartsWith(msg, prefix) \
(strncmp(msg, prefix, strlen(prefix)) == 0)
#define messageEquals(msg, pattern) \
(strcmp(msg, pattern) == 0)
/*
* Shutdown callback to clean up socket access
*/
#ifdef WIN32
static void
shutdown_parallel_dump_utils(int code, void *unused)
{
/* Call the cleanup function only from the main thread */
if (mainThreadId == GetCurrentThreadId())
WSACleanup();
}
#endif
/*
* Initialize parallel dump support --- should be called early in process
* startup. (Currently, this is called whether or not we intend parallel
* activity.)
*/
void
init_parallel_dump_utils(void)
{
#ifdef WIN32
if (!parallel_init_done)
{
WSADATA wsaData;
int err;
/* Prepare for threaded operation */
tls_index = TlsAlloc();
mainThreadId = GetCurrentThreadId();
/* Initialize socket access */
err = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (err != 0)
{
fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
exit_nicely(1);
}
/* ... and arrange to shut it down at exit */
on_exit_nicely(shutdown_parallel_dump_utils, NULL);
parallel_init_done = true;
}
#endif
}
/*
* Find the ParallelSlot for the current worker process or thread.
*
* Returns NULL if no matching slot is found (this implies we're the master).
*/
static ParallelSlot *
GetMyPSlot(ParallelState *pstate)
{
int i;
for (i = 0; i < pstate->numWorkers; i++)
{
#ifdef WIN32
if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
#else
if (pstate->parallelSlot[i].pid == getpid())
#endif
return &(pstate->parallelSlot[i]);
}
return NULL;
}
/*
* A thread-local version of getLocalPQExpBuffer().
*
* Non-reentrant but reduces memory leakage: we'll consume one buffer per
* thread, which is much better than one per fmtId/fmtQualifiedId call.
*/
#ifdef WIN32
static PQExpBuffer
getThreadLocalPQExpBuffer(void)
{
/*
* The Tls code goes awry if we use a static var, so we provide for both
* static and auto, and omit any use of the static var when using Tls. We
* rely on TlsGetValue() to return 0 if the value is not yet set.
*/
static PQExpBuffer s_id_return = NULL;
PQExpBuffer id_return;
if (parallel_init_done)
id_return = (PQExpBuffer) TlsGetValue(tls_index);
else
id_return = s_id_return;
if (id_return) /* first time through? */
{
/* same buffer, just wipe contents */
resetPQExpBuffer(id_return);
}
else
{
/* new buffer */
id_return = createPQExpBuffer();
if (parallel_init_done)
TlsSetValue(tls_index, id_return);
else
s_id_return = id_return;
}
return id_return;
}
#endif /* WIN32 */
/*
* pg_dump and pg_restore call this to register the cleanup handler
* as soon as they've created the ArchiveHandle.
*/
void
on_exit_close_archive(Archive *AHX)
{
shutdown_info.AHX = AHX;
on_exit_nicely(archive_close_connection, &shutdown_info);
}
/*
* on_exit_nicely handler for shutting down database connections and
* worker processes cleanly.
*/
static void
archive_close_connection(int code, void *arg)
{
ShutdownInformation *si = (ShutdownInformation *) arg;
if (si->pstate)
{
/* In parallel mode, must figure out who we are */
ParallelSlot *slot = GetMyPSlot(si->pstate);
if (!slot)
{
/*
* We're the master. Forcibly shut down workers, then close our
* own database connection, if any.
*/
ShutdownWorkersHard(si->pstate);
if (si->AHX)
DisconnectDatabase(si->AHX);
}
else
{
/*
* We're a worker. Shut down our own DB connection if any. On
* Windows, we also have to close our communication sockets, to
* emulate what will happen on Unix when the worker process exits.
* (Without this, if this is a premature exit, the master would
* fail to detect it because there would be no EOF condition on
* the other end of the pipe.)
*/
if (slot->args->AH)
DisconnectDatabase(&(slot->args->AH->public));
#ifdef WIN32
closesocket(slot->pipeRevRead);
closesocket(slot->pipeRevWrite);
#endif
}
}
else
{
/* Non-parallel operation: just kill the master DB connection */
if (si->AHX)
DisconnectDatabase(si->AHX);
}
}
/*
* Forcibly shut down any remaining workers, waiting for them to finish.
*
* Note that we don't expect to come here during normal exit (the workers
* should be long gone, and the ParallelState too). We're only here in an
* exit_horribly() situation, so intervening to cancel active commands is
* appropriate.
*/
static void
ShutdownWorkersHard(ParallelState *pstate)
{
int i;
/*
* Close our write end of the sockets so that any workers waiting for
* commands know they can exit.
*/
for (i = 0; i < pstate->numWorkers; i++)
closesocket(pstate->parallelSlot[i].pipeWrite);
/*
* Force early termination of any commands currently in progress.
*/
#ifndef WIN32
/* On non-Windows, send SIGTERM to each worker process. */
for (i = 0; i < pstate->numWorkers; i++)
{
pid_t pid = pstate->parallelSlot[i].pid;
if (pid != 0)
kill(pid, SIGTERM);
}
#else
/*
* On Windows, send query cancels directly to the workers' backends. Use
* a critical section to ensure worker threads don't change state.
*/
EnterCriticalSection(&signal_info_lock);
for (i = 0; i < pstate->numWorkers; i++)
{
ArchiveHandle *AH = pstate->parallelSlot[i].args->AH;
char errbuf[1];
if (AH != NULL && AH->connCancel != NULL)
(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
}
LeaveCriticalSection(&signal_info_lock);
#endif
/* Now wait for them to terminate. */
WaitForTerminatingWorkers(pstate);
}
/*
* Wait for all workers to terminate.
*/
static void
WaitForTerminatingWorkers(ParallelState *pstate)
{
while (!HasEveryWorkerTerminated(pstate))
{
ParallelSlot *slot = NULL;
int j;
#ifndef WIN32
/* On non-Windows, use wait() to wait for next worker to end */
int status;
pid_t pid = wait(&status);
/* Find dead worker's slot, and clear the PID field */
for (j = 0; j < pstate->numWorkers; j++)
{
slot = &(pstate->parallelSlot[j]);
if (slot->pid == pid)
{
slot->pid = 0;
break;
}
}
#else /* WIN32 */
/* On Windows, we must use WaitForMultipleObjects() */
HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
int nrun = 0;
DWORD ret;
uintptr_t hThread;
for (j = 0; j < pstate->numWorkers; j++)
{
if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
{
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
nrun++;
}
}
ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
Assert(ret != WAIT_FAILED);
hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
free(lpHandles);
/* Find dead worker's slot, and clear the hThread field */
for (j = 0; j < pstate->numWorkers; j++)
{
slot = &(pstate->parallelSlot[j]);
if (slot->hThread == hThread)
{
/* For cleanliness, close handles for dead threads */
CloseHandle((HANDLE) slot->hThread);
slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
break;
}
}
#endif /* WIN32 */
/* On all platforms, update workerStatus as well */
Assert(j < pstate->numWorkers);
slot->workerStatus = WRKR_TERMINATED;
}
}
/*
* Code for responding to cancel interrupts (SIGINT, control-C, etc)
*
* This doesn't quite belong in this module, but it needs access to the
* ParallelState data, so there's not really a better place either.
*
* When we get a cancel interrupt, we could just die, but in pg_restore that
* could leave a SQL command (e.g., CREATE INDEX on a large table) running
* for a long time. Instead, we try to send a cancel request and then die.
* pg_dump probably doesn't really need this, but we might as well use it
* there too. Note that sending the cancel directly from the signal handler
* is safe because PQcancel() is written to make it so.
*
* In parallel operation on Unix, each process is responsible for canceling
* its own connection (this must be so because nobody else has access to it).
* Furthermore, the master process should attempt to forward its signal to
* each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
* needed because typing control-C at the console would deliver SIGINT to
* every member of the terminal process group --- but in other scenarios it
* might be that only the master gets signaled.
*
* On Windows, the cancel handler runs in a separate thread, because that's
* how SetConsoleCtrlHandler works. We make it stop worker threads, send
* cancels on all active connections, and then return FALSE, which will allow
* the process to die. For safety's sake, we use a critical section to
* protect the PGcancel structures against being changed while the signal
* thread runs.
*/
#ifndef WIN32
/*
* Signal handler (Unix only)
*/
static void
sigTermHandler(SIGNAL_ARGS)
{
int i;
char errbuf[1];
/*
* Some platforms allow delivery of new signals to interrupt an active
* signal handler. That could muck up our attempt to send PQcancel, so
* disable the signals that setup_cancel_handler enabled.
*/
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGTERM, SIG_IGN);
pqsignal(SIGQUIT, SIG_IGN);
/*
* If we're in the master, forward signal to all workers. (It seems best
* to do this before PQcancel; killing the master transaction will result
* in invalid-snapshot errors from active workers, which maybe we can
* quiet by killing workers first.) Ignore any errors.
*/
if (signal_info.pstate != NULL)
{
for (i = 0; i < signal_info.pstate->numWorkers; i++)
{
pid_t pid = signal_info.pstate->parallelSlot[i].pid;
if (pid != 0)
kill(pid, SIGTERM);
}
}
/*
* Send QueryCancel if we have a connection to send to. Ignore errors,
* there's not much we can do about them anyway.
*/
if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
(void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
/*
* Report we're quitting, using nothing more complicated than write(2).
* When in parallel operation, only the master process should do this.
*/
if (!signal_info.am_worker)
{
if (progname)
{
write_stderr(progname);
write_stderr(": ");
}
write_stderr("terminated by user\n");
}
/* And die. */
exit(1);
}
/*
* Enable cancel interrupt handler, if not already done.
*/
static void
setup_cancel_handler(void)
{
/*
* When forking, signal_info.handler_set will propagate into the new
* process, but that's fine because the signal handler state does too.
*/
if (!signal_info.handler_set)
{
signal_info.handler_set = true;
pqsignal(SIGINT, sigTermHandler);
pqsignal(SIGTERM, sigTermHandler);
pqsignal(SIGQUIT, sigTermHandler);
}
}
#else /* WIN32 */
/*
* Console interrupt handler --- runs in a newly-started thread.
*
* After stopping other threads and sending cancel requests on all open
* connections, we return FALSE which will allow the default ExitProcess()
* action to be taken.
*/
static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)
{
int i;
char errbuf[1];
if (dwCtrlType == CTRL_C_EVENT ||
dwCtrlType == CTRL_BREAK_EVENT)
{
/* Critical section prevents changing data we look at here */
EnterCriticalSection(&signal_info_lock);
/*
* If in parallel mode, stop worker threads and send QueryCancel to
* their connected backends. The main point of stopping the worker
* threads is to keep them from reporting the query cancels as errors,
* which would clutter the user's screen. We needn't stop the master
* thread since it won't be doing much anyway. Do this before
* canceling the main transaction, else we might get invalid-snapshot
* errors reported before we can stop the workers. Ignore errors,
* there's not much we can do about them anyway.
*/
if (signal_info.pstate != NULL)
{
for (i = 0; i < signal_info.pstate->numWorkers; i++)
{
ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
ArchiveHandle *AH = slot->args->AH;
HANDLE hThread = (HANDLE) slot->hThread;
/*
* Using TerminateThread here may leave some resources leaked,
* but it doesn't matter since we're about to end the whole
* process.
*/
if (hThread != INVALID_HANDLE_VALUE)
TerminateThread(hThread, 0);
if (AH != NULL && AH->connCancel != NULL)
(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
}
}
/*
* Send QueryCancel to master connection, if enabled. Ignore errors,
* there's not much we can do about them anyway.
*/
if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
(void) PQcancel(signal_info.myAH->connCancel,
errbuf, sizeof(errbuf));
LeaveCriticalSection(&signal_info_lock);
/*
* Report we're quitting, using nothing more complicated than
* write(2). (We might be able to get away with using write_msg()
* here, but since we terminated other threads uncleanly above, it
* seems better to assume as little as possible.)
*/
if (progname)
{
write_stderr(progname);
write_stderr(": ");
}
write_stderr("terminated by user\n");
}
/* Always return FALSE to allow signal handling to continue */
return FALSE;
}
/*
* Enable cancel interrupt handler, if not already done.
*/
static void
setup_cancel_handler(void)
{
if (!signal_info.handler_set)
{
signal_info.handler_set = true;
InitializeCriticalSection(&signal_info_lock);
SetConsoleCtrlHandler(consoleHandler, TRUE);
}
}
#endif /* WIN32 */
/*
* set_archive_cancel_info
*
* Fill AH->connCancel with cancellation info for the specified database
* connection; or clear it if conn is NULL.
*/
void
set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
{
PGcancel *oldConnCancel;
/*
* Activate the interrupt handler if we didn't yet in this process. On
* Windows, this also initializes signal_info_lock; therefore it's
* important that this happen at least once before we fork off any
* threads.
*/
setup_cancel_handler();
/*
* On Unix, we assume that storing a pointer value is atomic with respect
* to any possible signal interrupt. On Windows, use a critical section.
*/
#ifdef WIN32
EnterCriticalSection(&signal_info_lock);
#endif
/* Free the old one if we have one */
oldConnCancel = AH->connCancel;
/* be sure interrupt handler doesn't use pointer while freeing */
AH->connCancel = NULL;
if (oldConnCancel != NULL)
PQfreeCancel(oldConnCancel);
/* Set the new one if specified */
if (conn)
AH->connCancel = PQgetCancel(conn);
/*
* On Unix, there's only ever one active ArchiveHandle per process, so we
* can just set signal_info.myAH unconditionally. On Windows, do that
* only in the main thread; worker threads have to make sure their
* ArchiveHandle appears in the pstate data, which is dealt with in
* RunWorker().
*/
#ifndef WIN32
signal_info.myAH = AH;
#else
if (mainThreadId == GetCurrentThreadId())
signal_info.myAH = AH;
#endif
#ifdef WIN32
LeaveCriticalSection(&signal_info_lock);
#endif
}
/*
* set_cancel_pstate
*
* Set signal_info.pstate to point to the specified ParallelState, if any.
* We need this mainly to have an interlock against Windows signal thread.
*/
static void
set_cancel_pstate(ParallelState *pstate)
{
#ifdef WIN32
EnterCriticalSection(&signal_info_lock);
#endif
signal_info.pstate = pstate;
#ifdef WIN32
LeaveCriticalSection(&signal_info_lock);
#endif
}
/*
* set_cancel_slot_archive
*
* Set ParallelSlot's AH field to point to the specified archive, if any.
* We need this mainly to have an interlock against Windows signal thread.
*/
static void
set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
{
#ifdef WIN32
EnterCriticalSection(&signal_info_lock);
#endif
slot->args->AH = AH;
#ifdef WIN32
LeaveCriticalSection(&signal_info_lock);
#endif
}
/*
* This function is called by both Unix and Windows variants to set up
* and run a worker process. Caller should exit the process (or thread)
* upon return.
*/
static void
RunWorker(ArchiveHandle *AH, ParallelSlot *slot, RestoreOptions *ropt)
{
int pipefd[2];
/* fetch child ends of pipes */
pipefd[PIPE_READ] = slot->pipeRevRead;
pipefd[PIPE_WRITE] = slot->pipeRevWrite;
/*
* Clone the archive so that we have our own state to work with, and in
* particular our own database connection.
*
* We clone on Unix as well as Windows, even though technically we don't
* need to because fork() gives us a copy in our own address space
* already. But CloneArchive resets the state information and also clones
* the database connection which both seem kinda helpful.
*/
AH = CloneArchive(AH);
/* Remember cloned archive where signal handler can find it */
set_cancel_slot_archive(slot, AH);
/*
* Call the setup worker function that's defined in the ArchiveHandle.
*/
(AH->SetupWorkerPtr) ((Archive *) AH, ropt);
/*
* Execute commands until done.
*/
WaitForCommands(AH, pipefd);
/*
* Disconnect from database and clean up.
*/
set_cancel_slot_archive(slot, NULL);
DisconnectDatabase(&(AH->public));
DeCloneArchive(AH);
}
/*
* Thread base function for Windows
*/
#ifdef WIN32
static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo *wi)
{
ArchiveHandle *AH = wi->AH;
ParallelSlot *slot = wi->slot;
RestoreOptions *ropt = wi->ropt;
/* Don't need WorkerInfo anymore */
free(wi);
/* Run the worker ... */
RunWorker(AH, slot, ropt);
/* Exit the thread */
_endthreadex(0);
return 0;
}
#endif /* WIN32 */
/*
* This function starts a parallel dump or restore by spawning off the worker
* processes. For Windows, it creates a number of threads; on Unix the
* workers are created with fork().
*/
ParallelState *
ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
{
ParallelState *pstate;
int i;
const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
Assert(AH->public.numWorkers > 0);
pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
pstate->numWorkers = AH->public.numWorkers;
pstate->parallelSlot = NULL;
if (AH->public.numWorkers == 1)
return pstate;
pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
memset((void *) pstate->parallelSlot, 0, slotSize);
#ifdef WIN32
/* Make fmtId() and fmtQualifiedId() use thread-local storage */
getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
#endif
/*
* Set the pstate in shutdown_info, to tell the exit handler that it must
* clean up workers as well as the main database connection. But we don't
* set this in signal_info yet, because we don't want child processes to
* inherit non-NULL signal_info.pstate.
*/
shutdown_info.pstate = pstate;
/*
* Temporarily disable query cancellation on the master connection. This
* ensures that child processes won't inherit valid AH->connCancel
* settings and thus won't try to issue cancels against the master's
* connection. No harm is done if we fail while it's disabled, because
* the master connection is idle at this point anyway.
*/
set_archive_cancel_info(AH, NULL);
/* Ensure stdio state is quiesced before forking */
fflush(NULL);
/* Create desired number of workers */
for (i = 0; i < pstate->numWorkers; i++)
{
#ifdef WIN32
WorkerInfo *wi;
uintptr_t handle;
#else
pid_t pid;
#endif
ParallelSlot *slot = &(pstate->parallelSlot[i]);
int pipeMW[2],
pipeWM[2];
/* Create communication pipes for this worker */
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
exit_horribly(modulename,
"could not create communication channels: %s\n",
strerror(errno));
slot->workerStatus = WRKR_IDLE;
slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
slot->args->AH = NULL;
slot->args->te = NULL;
/* master's ends of the pipes */
slot->pipeRead = pipeWM[PIPE_READ];
slot->pipeWrite = pipeMW[PIPE_WRITE];
/* child's ends of the pipes */
slot->pipeRevRead = pipeMW[PIPE_READ];
slot->pipeRevWrite = pipeWM[PIPE_WRITE];
#ifdef WIN32
/* Create transient structure to pass args to worker function */
wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
wi->AH = AH;
wi->slot = slot;
wi->ropt = ropt;
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
wi, 0, &(slot->threadId));
slot->hThread = handle;
#else /* !WIN32 */
pid = fork();
if (pid == 0)
{
/* we are the worker */
int j;
/* this is needed for GetMyPSlot() */
slot->pid = getpid();
/* instruct signal handler that we're in a worker now */
signal_info.am_worker = true;
/* close read end of Worker -> Master */
closesocket(pipeWM[PIPE_READ]);
/* close write end of Master -> Worker */
closesocket(pipeMW[PIPE_WRITE]);
/*
* Close all inherited fds for communication of the master with
* previously-forked workers.
*/
for (j = 0; j < i; j++)
{
closesocket(pstate->parallelSlot[j].pipeRead);
closesocket(pstate->parallelSlot[j].pipeWrite);
}
/* Run the worker ... */
RunWorker(AH, slot, ropt);
/* We can just exit(0) when done */
exit(0);
}
else if (pid < 0)
{
/* fork failed */
exit_horribly(modulename,
"could not create worker process: %s\n",
strerror(errno));