Skip to content

Commit

Permalink
Support generic WAL messages for logical decoding
Browse files Browse the repository at this point in the history
This feature allows software to insert data into WAL stream that can be
read by wal2json. Those messages could be useful to control replication,
for example. Messages can be sent as transactional or not. Non-transactional messages mean that it is sent even if the transaction is rollbacked.

There was a PR yugabyte#20 for this same feature but I didn't use it. Indeed,
this code was dusty in my computer for a few months.

NOTE: 'message' test will fail on <= 9.5 because this feature was coded
in 9.6 (I don't want to complicate Makefile).
  • Loading branch information
Euler Taveira committed Aug 22, 2017
1 parent 2828409 commit 645ab69
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
MODULES = wal2json

# message test will fail for <= 9.5
REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea
delete3 delete4 savepoint specialvalue toast bytea message

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
122 changes: 122 additions & 0 deletions expected/message.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
\set VERBOSITY terse
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
?column?
----------
init
(1 row)

SELECT 'msg1' FROM pg_logical_emit_message(true, 'wal2json', 'this is a\ message');
?column?
----------
msg1
(1 row)

SELECT 'msg2' FROM pg_logical_emit_message(false, 'wal2json', 'this is "another" message');
?column?
----------
msg2
(1 row)

BEGIN;
SELECT 'msg3' FROM pg_logical_emit_message(true, 'wal2json', 'this message will not be printed');
?column?
----------
msg3
(1 row)

SELECT 'msg4' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed even if the transaction is rollbacked');
?column?
----------
msg4
(1 row)

ROLLBACK;
BEGIN;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #1');
?column?
----------
msg5
(1 row)

SELECT 'msg6' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed before message #1');
?column?
----------
msg6
(1 row)

SELECT 'msg7' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #2');
?column?
----------
msg7
(1 row)

COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1');
data
---------------------------------------------------------------------------------------------------------
{ +
"change": [ +
{ +
"kind": "message", +
"transactional": true, +
"prefix": "wal2json", +
"content": "this is a\ message" +
} +
] +
}
{ +
"change": [ +
{ +
"kind": "message", +
"transactional": false, +
"prefix": "wal2json", +
"content": "this is "another" message" +
} +
] +
}
{ +
"change": [ +
{ +
"kind": "message", +
"transactional": false, +
"prefix": "wal2json", +
"content": "this message will be printed even if the transaction is rollbacked"+
} +
] +
}
{ +
"change": [ +
{ +
"kind": "message", +
"transactional": false, +
"prefix": "wal2json", +
"content": "this message will be printed before message #1" +
} +
] +
}
{ +
"change": [ +
{ +
"kind": "message", +
"transactional": true, +
"prefix": "wal2json", +
"content": "this is message #1" +
} +
,{ +
"kind": "message", +
"transactional": true, +
"prefix": "wal2json", +
"content": "this is message #2" +
} +
] +
}
(5 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)

24 changes: 24 additions & 0 deletions sql/message.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
\set VERBOSITY terse

-- predictability
SET synchronous_commit = on;

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

SELECT 'msg1' FROM pg_logical_emit_message(true, 'wal2json', 'this is a\ message');
SELECT 'msg2' FROM pg_logical_emit_message(false, 'wal2json', 'this is "another" message');

BEGIN;
SELECT 'msg3' FROM pg_logical_emit_message(true, 'wal2json', 'this message will not be printed');
SELECT 'msg4' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed even if the transaction is rollbacked');
ROLLBACK;

BEGIN;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #1');
SELECT 'msg6' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed before message #1');
SELECT 'msg7' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #2');
COMMIT;

SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1');

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
125 changes: 125 additions & 0 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

#include "replication/output_plugin.h"
#include "replication/logical.h"
#if PG_VERSION_NUM >= 90600
#include "replication/message.h"
#endif

#include "utils/builtins.h"
#include "utils/lsyscache.h"
Expand Down Expand Up @@ -68,6 +71,12 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
#if PG_VERSION_NUM >= 90600
static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size content_size, const char *content);
#endif

void
_PG_init(void)
Expand All @@ -85,6 +94,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pg_decode_change;
cb->commit_cb = pg_decode_commit_txn;
cb->shutdown_cb = pg_decode_shutdown;
#if PG_VERSION_NUM >= 90600
cb->message_cb = pg_decode_message;
#endif
}

/* Initialize this plugin */
Expand Down Expand Up @@ -869,3 +881,116 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (data->write_in_chunks)
OutputPluginWrite(ctx, true);
}

#if PG_VERSION_NUM >= 90600
/* Callback for generic logical decoding messages */
static void
pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr lsn, bool transactional, const char *prefix, Size
content_size, const char *content)
{
JsonDecodingData *data;
MemoryContext old;

data = ctx->output_plugin_private;

/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);

/*
* write immediately iif (i) write-in-chunks=1 or (ii) non-transactional
* messages.
*/
if (data->write_in_chunks || !transactional)
OutputPluginPrepareWrite(ctx, true);

/*
* increment counter only for transactional messages because
* non-transactional message has only one object.
*/
if (transactional)
data->nr_changes++;

if (data->pretty_print)
{
/* if we don't write in chunks, we need a newline here */
if (!data->write_in_chunks && transactional)
appendStringInfoChar(ctx->out, '\n');

/* build a complete JSON object for non-transactional message */
if (!transactional)
{
appendStringInfoString(ctx->out, "{\n");
appendStringInfoString(ctx->out, "\t\"change\": [\n");
}

appendStringInfoString(ctx->out, "\t\t");

if (data->nr_changes > 1)
appendStringInfoChar(ctx->out, ',');

appendStringInfoString(ctx->out, "{\n");

appendStringInfoString(ctx->out, "\t\t\t\"kind\": \"message\",\n");

if (transactional)
appendStringInfoString(ctx->out, "\t\t\t\"transactional\": true,\n");
else
appendStringInfoString(ctx->out, "\t\t\t\"transactional\": false,\n");

appendStringInfo(ctx->out, "\t\t\t\"prefix\": \"%s\",\n", prefix);
appendStringInfoString(ctx->out, "\t\t\t\"content\": \"");
appendBinaryStringInfo(ctx->out, content, content_size);
appendStringInfoString(ctx->out, "\"\n");
appendStringInfoString(ctx->out, "\t\t}");

/* build a complete JSON object for non-transactional message */
if (!transactional)
{
appendStringInfoString(ctx->out, "\n\t]");
appendStringInfoString(ctx->out, "\n}");
}
}
else
{
/* build a complete JSON object for non-transactional message */
if (!transactional)
{
appendStringInfoString(ctx->out, "{");
appendStringInfoString(ctx->out, "\"change\":[");
}

if (data->nr_changes > 1)
appendStringInfoString(ctx->out, ",{");
else
appendStringInfoChar(ctx->out, '{');

appendStringInfoString(ctx->out, "\"kind\":\"message\",");

if (transactional)
appendStringInfoString(ctx->out, "\"transactional\":true,");
else
appendStringInfoString(ctx->out, "\"transactional\":false,");

appendStringInfo(ctx->out, "\"prefix\":");
quote_escape_json(ctx->out, prefix);
appendStringInfoChar(ctx->out, ',');
appendStringInfoString(ctx->out, "\"content\":");
quote_escape_json(ctx->out, content);
appendStringInfoChar(ctx->out, '}');

/* build a complete JSON object for non-transactional message */
if (!transactional)
{
appendStringInfoChar(ctx->out, ']');
appendStringInfoChar(ctx->out, '}');
}
}

MemoryContextSwitchTo(old);
MemoryContextReset(data->context);

if (data->write_in_chunks || !transactional)
OutputPluginWrite(ctx, true);
}
#endif

0 comments on commit 645ab69

Please sign in to comment.