diff --git a/Makefile b/Makefile index ee71f1dd1ab1..c82b478c8af8 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ MODULES = wal2json +# message test will fail for <= 9.5 REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ - delete3 delete4 savepoint specialvalue toast bytea + delete3 delete4 savepoint specialvalue toast bytea message PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/expected/message.out b/expected/message.out new file mode 100644 index 000000000000..1e83fbeaf569 --- /dev/null +++ b/expected/message.out @@ -0,0 +1,122 @@ +\set VERBOSITY terse +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'wal2json', 'this is a\ message'); + ?column? +---------- + msg1 +(1 row) + +SELECT 'msg2' FROM pg_logical_emit_message(false, 'wal2json', 'this is "another" message'); + ?column? +---------- + msg2 +(1 row) + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'wal2json', 'this message will not be printed'); + ?column? +---------- + msg3 +(1 row) + +SELECT 'msg4' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed even if the transaction is rollbacked'); + ?column? +---------- + msg4 +(1 row) + +ROLLBACK; +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #1'); + ?column? +---------- + msg5 +(1 row) + +SELECT 'msg6' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed before message #1'); + ?column? +---------- + msg6 +(1 row) + +SELECT 'msg7' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #2'); + ?column? +---------- + msg7 +(1 row) + +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1'); + data +--------------------------------------------------------------------------------------------------------- + { + + "change": [ + + { + + "kind": "message", + + "transactional": true, + + "prefix": "wal2json", + + "content": "this is a\ message" + + } + + ] + + } + { + + "change": [ + + { + + "kind": "message", + + "transactional": false, + + "prefix": "wal2json", + + "content": "this is "another" message" + + } + + ] + + } + { + + "change": [ + + { + + "kind": "message", + + "transactional": false, + + "prefix": "wal2json", + + "content": "this message will be printed even if the transaction is rollbacked"+ + } + + ] + + } + { + + "change": [ + + { + + "kind": "message", + + "transactional": false, + + "prefix": "wal2json", + + "content": "this message will be printed before message #1" + + } + + ] + + } + { + + "change": [ + + { + + "kind": "message", + + "transactional": true, + + "prefix": "wal2json", + + "content": "this is message #1" + + } + + ,{ + + "kind": "message", + + "transactional": true, + + "prefix": "wal2json", + + "content": "this is message #2" + + } + + ] + + } +(5 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/sql/message.sql b/sql/message.sql new file mode 100644 index 000000000000..a376bd7b58ae --- /dev/null +++ b/sql/message.sql @@ -0,0 +1,24 @@ +\set VERBOSITY terse + +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'wal2json', 'this is a\ message'); +SELECT 'msg2' FROM pg_logical_emit_message(false, 'wal2json', 'this is "another" message'); + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'wal2json', 'this message will not be printed'); +SELECT 'msg4' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed even if the transaction is rollbacked'); +ROLLBACK; + +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #1'); +SELECT 'msg6' FROM pg_logical_emit_message(false, 'wal2json', 'this message will be printed before message #1'); +SELECT 'msg7' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #2'); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1'); + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/wal2json.c b/wal2json.c index 285eecf387df..88c77cfd7bee 100644 --- a/wal2json.c +++ b/wal2json.c @@ -22,6 +22,9 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#if PG_VERSION_NUM >= 90600 +#include "replication/message.h" +#endif #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -68,6 +71,12 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +#if PG_VERSION_NUM >= 90600 +static void pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size content_size, const char *content); +#endif void _PG_init(void) @@ -85,6 +94,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pg_decode_change; cb->commit_cb = pg_decode_commit_txn; cb->shutdown_cb = pg_decode_shutdown; +#if PG_VERSION_NUM >= 90600 + cb->message_cb = pg_decode_message; +#endif } /* Initialize this plugin */ @@ -869,3 +881,116 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (data->write_in_chunks) OutputPluginWrite(ctx, true); } + +#if PG_VERSION_NUM >= 90600 +/* Callback for generic logical decoding messages */ +static void +pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr lsn, bool transactional, const char *prefix, Size + content_size, const char *content) +{ + JsonDecodingData *data; + MemoryContext old; + + data = ctx->output_plugin_private; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + /* + * write immediately iif (i) write-in-chunks=1 or (ii) non-transactional + * messages. + */ + if (data->write_in_chunks || !transactional) + OutputPluginPrepareWrite(ctx, true); + + /* + * increment counter only for transactional messages because + * non-transactional message has only one object. + */ + if (transactional) + data->nr_changes++; + + if (data->pretty_print) + { + /* if we don't write in chunks, we need a newline here */ + if (!data->write_in_chunks && transactional) + appendStringInfoChar(ctx->out, '\n'); + + /* build a complete JSON object for non-transactional message */ + if (!transactional) + { + appendStringInfoString(ctx->out, "{\n"); + appendStringInfoString(ctx->out, "\t\"change\": [\n"); + } + + appendStringInfoString(ctx->out, "\t\t"); + + if (data->nr_changes > 1) + appendStringInfoChar(ctx->out, ','); + + appendStringInfoString(ctx->out, "{\n"); + + appendStringInfoString(ctx->out, "\t\t\t\"kind\": \"message\",\n"); + + if (transactional) + appendStringInfoString(ctx->out, "\t\t\t\"transactional\": true,\n"); + else + appendStringInfoString(ctx->out, "\t\t\t\"transactional\": false,\n"); + + appendStringInfo(ctx->out, "\t\t\t\"prefix\": \"%s\",\n", prefix); + appendStringInfoString(ctx->out, "\t\t\t\"content\": \""); + appendBinaryStringInfo(ctx->out, content, content_size); + appendStringInfoString(ctx->out, "\"\n"); + appendStringInfoString(ctx->out, "\t\t}"); + + /* build a complete JSON object for non-transactional message */ + if (!transactional) + { + appendStringInfoString(ctx->out, "\n\t]"); + appendStringInfoString(ctx->out, "\n}"); + } + } + else + { + /* build a complete JSON object for non-transactional message */ + if (!transactional) + { + appendStringInfoString(ctx->out, "{"); + appendStringInfoString(ctx->out, "\"change\":["); + } + + if (data->nr_changes > 1) + appendStringInfoString(ctx->out, ",{"); + else + appendStringInfoChar(ctx->out, '{'); + + appendStringInfoString(ctx->out, "\"kind\":\"message\","); + + if (transactional) + appendStringInfoString(ctx->out, "\"transactional\":true,"); + else + appendStringInfoString(ctx->out, "\"transactional\":false,"); + + appendStringInfo(ctx->out, "\"prefix\":"); + quote_escape_json(ctx->out, prefix); + appendStringInfoChar(ctx->out, ','); + appendStringInfoString(ctx->out, "\"content\":"); + quote_escape_json(ctx->out, content); + appendStringInfoChar(ctx->out, '}'); + + /* build a complete JSON object for non-transactional message */ + if (!transactional) + { + appendStringInfoChar(ctx->out, ']'); + appendStringInfoChar(ctx->out, '}'); + } + } + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + if (data->write_in_chunks || !transactional) + OutputPluginWrite(ctx, true); +} +#endif