-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathbdr_nodecache.c
363 lines (304 loc) · 8.36 KB
/
bdr_nodecache.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
/* -------------------------------------------------------------------------
*
* bdr_nodecache.c
* shmem cache for local node entry in bdr_nodes, holds one entry per
* each local bdr database
*
* Copyright (c) 2015, PostgreSQL Global Development Group
*
* IDENTIFICATION
* bdr_nodecache.c
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "bdr.h"
#include "access/heapam.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "utils/catcache.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
/*
* Caches for our name and (if we're an apply worker or walsender) our peer
* node's name, to bypass the usual nodecache machinery and provide quick, safe
* access when not in a txn.
*/
static const char * my_node_name = NULL;
/*
* To make sure cached name calls are for the correct node id and don't produce
* confusing results, check node id each call.
*/
static BDRNodeId remote_node_id;
static const char * remote_node_name = NULL;
static HTAB *BDRNodeCacheHash = NULL;
/*
* Because PostgreSQL does not have enought relation lookup functions.
*/
static Oid
bdr_get_relname_relid(const char *nspname, const char *relname)
{
Oid nspid;
Oid relid;
nspid = get_namespace_oid(nspname, false);
relid = get_relname_relid(relname, nspid);
if (!relid)
elog(ERROR, "cache lookup failed for relation %s.%s",
nspname, relname);
return relid;
}
/*
* Send cache invalidation singal to all backends.
*/
void
bdr_nodecache_invalidate(void)
{
CacheInvalidateRelcacheByRelid(bdr_get_relname_relid("bdr", "bdr_nodes"));
}
/*
* Invalidate the session local cache.
*/
static void
bdr_nodecache_invalidate_callback(Datum arg, Oid relid)
{
if (BDRNodeCacheHash == NULL)
return;
if (relid == InvalidOid ||
relid == BdrNodesRelid)
{
HASH_SEQ_STATUS status;
BDRNodeInfo *entry;
hash_seq_init(&status, BDRNodeCacheHash);
/* We currently always invalidate everything */
while ((entry = (BDRNodeInfo *) hash_seq_search(&status)) != NULL)
{
entry->valid = false;
}
}
}
static void
bdr_nodecache_initialize()
{
HASHCTL ctl;
/* Make sure we've initialized CacheMemoryContext. */
if (CacheMemoryContext == NULL)
CreateCacheMemoryContext();
/* Initialize the hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(BDRNodeId);
ctl.entrysize = sizeof(BDRNodeInfo);
ctl.hash = tag_hash;
ctl.hcxt = CacheMemoryContext;
BDRNodeCacheHash = hash_create("BDR node cache", 128, &ctl,
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
/*
* Watch for invalidation events.
* XXX: This breaks if the table is dropped and recreated, during the
* lifetime of this backend.
*/
BdrNodesRelid = bdr_get_relname_relid("bdr", "bdr_nodes");
CacheRegisterRelcacheCallback(bdr_nodecache_invalidate_callback,
(Datum) 0);
}
static BDRNodeInfo*
bdr_nodecache_lookup(const BDRNodeId * const nodeid, bool missing_ok)
{
BDRNodeInfo *entry,
*nodeinfo;
bool found;
MemoryContext saved_ctx;
/*
* We potentially need to access syscaches, but it's not safe to start a
* txn here, since we might clobber memory contexts, resource owners, etc
* set up elsewhere.
*/
Assert(IsTransactionState());
if (BDRNodeCacheHash == NULL)
bdr_nodecache_initialize();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
entry = hash_search(BDRNodeCacheHash, (void *) nodeid,
HASH_ENTER, &found);
if (found)
{
if (entry->valid)
{
Assert(IsTransactionState());
return entry;
}
else
{
/*
* Entry exists but is invalid. Release any memory it holds in
* CacheMemoryContext before we zero the entry for re-use.
*/
if (entry->local_dsn != NULL)
pfree(entry->local_dsn);
if (entry->init_from_dsn != NULL)
pfree(entry->init_from_dsn);
if (entry->name != NULL)
pfree(entry->name);
}
}
/* zero out data part of the entry */
memset(((char *) entry) + offsetof(BDRNodeInfo, valid),
0,
sizeof(BDRNodeInfo) - offsetof(BDRNodeInfo, valid));
saved_ctx = MemoryContextSwitchTo(TopMemoryContext);
nodeinfo = bdr_nodes_get_local_info(nodeid);
MemoryContextSwitchTo(saved_ctx);
if (nodeinfo == NULL)
{
Assert(IsTransactionState());
if (!missing_ok)
elog(ERROR, "could not find node "BDR_NODEID_FORMAT,
BDR_NODEID_FORMAT_ARGS(*nodeid));
else
return NULL;
}
entry->status = nodeinfo->status;
if (nodeinfo->local_dsn)
entry->local_dsn = MemoryContextStrdup(CacheMemoryContext,
nodeinfo->local_dsn);
if (nodeinfo->init_from_dsn)
entry->init_from_dsn = MemoryContextStrdup(CacheMemoryContext,
nodeinfo->init_from_dsn);
entry->read_only = nodeinfo->read_only;
if (nodeinfo->name)
entry->name = MemoryContextStrdup(CacheMemoryContext,
nodeinfo->name);
entry->seq_id = nodeinfo->seq_id;
entry->valid = true;
bdr_bdr_node_free(nodeinfo);
Assert(IsTransactionState());
return entry;
}
/*
* Look up our node name from the nodecache.
*
* A txn must be active.
*
* If you need to call this from a context where you're not sure there'll be an
* open txn, use bdr_local_node_name_cached().
*/
const char *
bdr_local_node_name(void)
{
BDRNodeId nodeid;
BDRNodeInfo *node;
bdr_make_my_nodeid(&nodeid);
node = bdr_nodecache_lookup(&nodeid, true);
if (node == NULL)
return false;
return node->name;
}
bool
bdr_local_node_read_only(void)
{
BDRNodeId nodeid;
BDRNodeInfo *node;
bdr_make_my_nodeid(&nodeid);
node = bdr_nodecache_lookup(&nodeid, true);
if (node == NULL)
return false;
return node->read_only;
}
char
bdr_local_node_status(void)
{
BDRNodeId nodeid;
BDRNodeInfo *node;
bdr_make_my_nodeid(&nodeid);
node = bdr_nodecache_lookup(&nodeid, true);
if (node == NULL)
return '\0';
return node->status;
}
/*
* Get 16-bit node sequence ID, or
* -1 if no node or no sequence assigned.
*/
int32
bdr_local_node_seq_id(void)
{
BDRNodeId nodeid;
BDRNodeInfo *node;
bdr_make_my_nodeid(&nodeid);
node = bdr_nodecache_lookup(&nodeid, true);
if (node == NULL)
return -1;
return node->seq_id;
}
/*
* Look up the specified node in the nodecache and return a guaranteed
* non-null pointer. If no node name found, use (none) or if missing_ok = f,
* abort.
*
* Return value is owned by the cache and must not be free'd.
*/
const char * bdr_nodeid_name(const BDRNodeId * const node, bool missing_ok)
{
BDRNodeInfo * const nodeinfo = bdr_nodecache_lookup(node, missing_ok);
return nodeinfo == NULL || nodeinfo->name == NULL ? "(none)" : nodeinfo->name;
}
/*
* The full nodecache requires a transaction to be open. Since we
* often want to output our own node name and that of our peer node,
* we cache them at worker startup.
*
* This cache doesn't get invalidated if node names change, but since our
* application_name doesn't either, users should expect to have to restart
* workers anyway. The node name doesn't act as a key to anything so
* not invalidating it on change isn't a big deal; about all it can do
* is affect synchronous_standby_names .
*
* Must be called after background worker setup so ThisTimeLineID
* is initialized, while there's an open txn.
*
* TODO: If we made the nodecache eager, so it reloaded fully on
* invalidations, we could get rid of this hack.
*/
void
bdr_setup_my_cached_node_names()
{
BDRNodeId myid;
Assert(IsTransactionState());
bdr_make_my_nodeid(&myid);
my_node_name = MemoryContextStrdup(CacheMemoryContext, bdr_nodeid_name(&myid, false));
}
void
bdr_setup_cached_remote_name(const BDRNodeId * const remote_nodeid)
{
Assert(IsTransactionState());
remote_node_name = MemoryContextStrdup(CacheMemoryContext, bdr_nodeid_name(remote_nodeid, false));
bdr_nodeid_cpy(&remote_node_id, remote_nodeid);
}
const char *
bdr_get_my_cached_node_name()
{
if (my_node_name != NULL)
return my_node_name;
else if (IsTransactionState())
{
/* We might get called from a user backend too, within a function */
return bdr_local_node_name();
}
else
return "(unknown)";
}
const char *
bdr_get_my_cached_remote_name(const BDRNodeId * const remote_nodeid)
{
if (remote_node_name != NULL && bdr_nodeid_eq(&remote_node_id, remote_nodeid))
return remote_node_name;
else if (IsTransactionState())
{
/* We might get called from a user backend */
return bdr_nodeid_name(remote_nodeid, true);
}
else
return "(unknown)";
}