From 645ab69aae268a81c900671b5dfab7029384e9ff Mon Sep 17 00:00:00 2001 From: Euler Taveira Date: Tue, 22 Aug 2017 19:51:06 -0300 Subject: [PATCH] Support generic WAL messages for logical decoding This feature allows software to insert data into WAL stream that can be read by wal2json. Those messages could be useful to control replication, for example. Messages can be sent as transactional or not. Non-transactional messages mean that it is sent even if the transaction is rollbacked. There was a PR #20 for this same feature but I didn't use it. Indeed, this code was dusty in my computer for a few months. NOTE: 'message' test will fail on <= 9.5 because this feature was coded in 9.6 (I don't want to complicate Makefile). --- Makefile | 3 +- expected/message.out | 122 +++++++++++++++++++++++++++++++++++++++++ sql/message.sql | 24 +++++++++ wal2json.c | 125 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 273 insertions(+), 1 deletion(-) create mode 100644 expected/message.out create mode 100644 sql/message.sql diff --git a/Makefile b/Makefile index ee71f1dd1ab1..c82b478c8af8 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ MODULES = wal2json +# message test will fail for <= 9.5 REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ - delete3 delete4 savepoint specialvalue toast bytea + delete3 delete4 savepoint specialvalue toast bytea message PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/expected/message.out b/expected/message.out new file mode 100644 index 000000000000..1e83fbeaf569 --- /dev/null +++ b/expected/message.out @@ -0,0 +1,122 @@ +\set VERBOSITY terse +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'wal2json', 'this is a\ message'); + ?column? +---------- + msg1 +(1 row) + +SELECT 'msg2' FROM pg_logical_emit_message(false, 'wal2json', 'this is "another" message'); + ?column? +---------- + msg2 +(1 row) + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'wal2json', 'this message will not be printed'); + ?column? +---------- + msg3 +(1 row) + +SELECT 'msg4' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed even if the transaction is rollbacked'); + ?column? +---------- + msg4 +(1 row) + +ROLLBACK; +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #1'); + ?column? +---------- + msg5 +(1 row) + +SELECT 'msg6' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed before message #1'); + ?column? +---------- + msg6 +(1 row) + +SELECT 'msg7' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #2'); + ?column? +---------- + msg7 +(1 row) + +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1'); + data +--------------------------------------------------------------------------------------------------------- + { + + "change": [ + + { + + "kind": "message", + + "transactional": true, + + "prefix": "wal2json", + + "content": "this is a\ message" + + } + + ] + + } + { + + "change": [ + + { + + "kind": "message", + + "transactional": false, + + "prefix": "wal2json", + + "content": "this is "another" message" + + } + + ] + + } + { + + "change": [ + + { + + "kind": "message", + + "transactional": false, + + "prefix": "wal2json", + + "content": "this message will be printed even if the transaction is rollbacked"+ + } + + ] + + } + { + + "change": [ + + { + + "kind": "message", + + "transactional": false, + + "prefix": "wal2json", + + "content": "this message will be printed before message #1" + + } + + ] + + } + { + + "change": [ + + { + + "kind": "message", + + "transactional": true, + + "prefix": "wal2json", + + "content": "this is message #1" + + } + + ,{ + + "kind": "message", + + "transactional": true, + + "prefix": "wal2json", + + "content": "this is message #2" + + } + + ] + + } +(5 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/sql/message.sql b/sql/message.sql new file mode 100644 index 000000000000..a376bd7b58ae --- /dev/null +++ b/sql/message.sql @@ -0,0 +1,24 @@ +\set VERBOSITY terse + +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'wal2json', 'this is a\ message'); +SELECT 'msg2' FROM pg_logical_emit_message(false, 'wal2json', 'this is "another" message'); + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'wal2json', 'this message will not be printed'); +SELECT 'msg4' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed even if the transaction is rollbacked'); +ROLLBACK; + +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #1'); +SELECT 'msg6' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed before message #1'); +SELECT 'msg7' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #2'); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1'); + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/wal2json.c b/wal2json.c index 285eecf387df..88c77cfd7bee 100644 --- a/wal2json.c +++ b/wal2json.c @@ -22,6 +22,9 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#if PG_VERSION_NUM >= 90600 +#include "replication/message.h" +#endif #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -68,6 +71,12 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +#if PG_VERSION_NUM >= 90600 +static void pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size content_size, const char *content); +#endif void _PG_init(void) @@ -85,6 +94,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pg_decode_change; cb->commit_cb = pg_decode_commit_txn; cb->shutdown_cb = pg_decode_shutdown; +#if PG_VERSION_NUM >= 90600 + cb->message_cb = pg_decode_message; +#endif } /* Initialize this plugin */ @@ -869,3 +881,116 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (data->write_in_chunks) OutputPluginWrite(ctx, true); } + +#if PG_VERSION_NUM >= 90600 +/* Callback for generic logical decoding messages */ +static void +pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr lsn, bool transactional, const char *prefix, Size + content_size, const char *content) +{ + JsonDecodingData *data; + MemoryContext old; + + data = ctx->output_plugin_private; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + /* + * write immediately iif (i) write-in-chunks=1 or (ii) non-transactional + * messages. + */ + if (data->write_in_chunks || !transactional) + OutputPluginPrepareWrite(ctx, true); + + /* + * increment counter only for transactional messages because + * non-transactional message has only one object. + */ + if (transactional) + data->nr_changes++; + + if (data->pretty_print) + { + /* if we don't write in chunks, we need a newline here */ + if (!data->write_in_chunks && transactional) + appendStringInfoChar(ctx->out, '\n'); + + /* build a complete JSON object for non-transactional message */ + if (!transactional) + { + appendStringInfoString(ctx->out, "{\n"); + appendStringInfoString(ctx->out, "\t\"change\": [\n"); + } + + appendStringInfoString(ctx->out, "\t\t"); + + if (data->nr_changes > 1) + appendStringInfoChar(ctx->out, ','); + + appendStringInfoString(ctx->out, "{\n"); + + appendStringInfoString(ctx->out, "\t\t\t\"kind\": \"message\",\n"); + + if (transactional) + appendStringInfoString(ctx->out, "\t\t\t\"transactional\": true,\n"); + else + appendStringInfoString(ctx->out, "\t\t\t\"transactional\": false,\n"); + + appendStringInfo(ctx->out, "\t\t\t\"prefix\": \"%s\",\n", prefix); + appendStringInfoString(ctx->out, "\t\t\t\"content\": \""); + appendBinaryStringInfo(ctx->out, content, content_size); + appendStringInfoString(ctx->out, "\"\n"); + appendStringInfoString(ctx->out, "\t\t}"); + + /* build a complete JSON object for non-transactional message */ + if (!transactional) + { + appendStringInfoString(ctx->out, "\n\t]"); + appendStringInfoString(ctx->out, "\n}"); + } + } + else + { + /* build a complete JSON object for non-transactional message */ + if (!transactional) + { + appendStringInfoString(ctx->out, "{"); + appendStringInfoString(ctx->out, "\"change\":["); + } + + if (data->nr_changes > 1) + appendStringInfoString(ctx->out, ",{"); + else + appendStringInfoChar(ctx->out, '{'); + + appendStringInfoString(ctx->out, "\"kind\":\"message\","); + + if (transactional) + appendStringInfoString(ctx->out, "\"transactional\":true,"); + else + appendStringInfoString(ctx->out, "\"transactional\":false,"); + + appendStringInfo(ctx->out, "\"prefix\":"); + quote_escape_json(ctx->out, prefix); + appendStringInfoChar(ctx->out, ','); + appendStringInfoString(ctx->out, "\"content\":"); + quote_escape_json(ctx->out, content); + appendStringInfoChar(ctx->out, '}'); + + /* build a complete JSON object for non-transactional message */ + if (!transactional) + { + appendStringInfoChar(ctx->out, ']'); + appendStringInfoChar(ctx->out, '}'); + } + } + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + if (data->write_in_chunks || !transactional) + OutputPluginWrite(ctx, true); +} +#endif