This repository has been archived by the owner on Feb 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
job_queue.c
152 lines (120 loc) · 4.25 KB
/
job_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/* -------------------------------------------------------------------------
*
* Job queue mechanism
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "access/xact.h"
#include "executor/spi.h"
#include "pgstat.h"
#include "utils/snapmgr.h"
#include "commands/async.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(job_queue_launch);
void job_queue_main(void) pg_attribute_noreturn();
static
void job_consumer(StringInfoData buf)
{
bool hadJob;
int ret;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, buf.data);
ret = SPI_execute(buf.data, false, 1);
if (ret != SPI_OK_SELECT)
{
elog(FATAL, "error grabbing next job from queue, returned code: %d", ret);
}
hadJob = SPI_processed > 0 && SPI_tuptable != NULL;
if (hadJob)
{
int retProc;
TupleDesc tupdesc = SPI_tuptable->tupdesc;
SPITupleTable *tuptable = SPI_tuptable;
StringInfoData jobProcedure;
HeapTuple tuple = tuptable->vals[0];
char *arguments = SPI_getvalue(tuple, tupdesc, 3);
initStringInfo(&jobProcedure);
appendStringInfo(&jobProcedure, "SELECT ");
appendStringInfo(&jobProcedure, "%s", SPI_getvalue(tuple, tupdesc, 2));
appendStringInfo(&jobProcedure, "(%s)", arguments != NULL ? arguments : "\0");
elog(LOG, "job found, id: %s proc: %s", SPI_getvalue(tuple, tupdesc, 1), jobProcedure.data);
pgstat_report_activity(STATE_RUNNING, jobProcedure.data);
retProc = SPI_execute(jobProcedure.data, false, 0); //FIXME need to fix error treatment here, e.g: error thrown by the procedure
if (retProc != SPI_OK_SELECT)
elog(FATAL, "job procedure error, returned code: %d", retProc);
elog(LOG, "job finished");
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_stat(false);
pgstat_report_activity(STATE_IDLE, NULL);
ProcessCompletedNotifies();
if (hadJob)
{
job_consumer(buf);
}
}
void
job_queue_main(void)
{
char *databaseName = MyBgworkerEntry->bgw_extra;
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf,
"WITH next_job AS ( "
" SELECT id, proc, args FROM jobs_queue "
" ORDER BY priority ASC, id ASC "
" FOR UPDATE SKIP LOCKED "
" LIMIT 1 "
" ), queue_pop AS ( "
" DELETE FROM jobs_queue WHERE id = (SELECT id FROM next_job) "
" ) "
"SELECT id, proc, string_agg(CASE WHEN value ~ '^[0-9]+$' THEN value ELSE quote_literal(value) END, ',') AS args "
"FROM next_job LEFT JOIN LATERAL jsonb_array_elements_text(args) ON TRUE GROUP BY id, proc");
BackgroundWorkerInitializeConnection(databaseName, NULL);
elog(LOG, "job worker initialized for database %s", databaseName);
job_consumer(buf);
elog(LOG, "job worker ended, no more jobs left");
proc_exit(0);
}
Datum
job_queue_launch(PG_FUNCTION_ARGS)
{
char *databaseName = PG_GETARG_CSTRING(0);
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = NULL;
sprintf(worker.bgw_library_name, "job_queue");
sprintf(worker.bgw_function_name, "job_queue_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "job worker %d", 1); //TODO probably it's a good idea to instantiate more workers
memcpy(worker.bgw_extra, databaseName, sizeof(&databaseName));
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
PG_RETURN_NULL();
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STOPPED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not start worker job"),
errhint("More details may be available in the server log.")));
if (status == BGWH_POSTMASTER_DIED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("cannot start worker job without postmaster"),
errhint("Kill all remaining database processes and restart the database.")));
Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
}