Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to include a list of column names for updates in the case… #99

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ MODULES = wal2json
# message test will fail for <= 9.5
REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea message typmod \
filtertable selecttable include_timestamp include_lsn include_xids
filtertable selecttable include_timestamp include_lsn include_xids \
missing_toast

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Parameters
* `filter-tables`: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. `*.foo` means table foo in all schemas and `bar.*` means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table `"public"."Foo bar"` should be specified as `public.Foo\ bar`.
* `add-tables`: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from `filter-tables`.
* `format-version`: defines which format to use. Default is _1_.
* `include-missing-toast`: include a list of columns which are not included in return data since they are toasted and were not updated. Default is _false_

Examples
========
Expand Down
103 changes: 103 additions & 0 deletions expected/missing_toast.out

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions sql/missing_toast.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
\set VERBOSITY terse

-- predictability
SET synchronous_commit = on;

DROP TABLE IF EXISTS missing_toast;

CREATE TABLE missing_toast (
id serial primary key,
an_integer integer,
toasted_col1 text,
toasted_col2 text
);

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

-- uncompressed external toast data
INSERT INTO missing_toast (toasted_col1, toasted_col2) SELECT string_agg(g.i::text, ''), string_agg((g.i*2)::text, '') FROM generate_series(1, 2000) g(i);

-- update of existing column
UPDATE missing_toast SET toasted_col1 = (SELECT string_agg((g.i*2)::text, '') FROM generate_series(1, 2000) g(i)) where id = 1;

UPDATE missing_toast set an_integer = 1 where id = 1;

UPDATE missing_toast SET toasted_col1 = (SELECT string_agg((g.i*2)::text, '') FROM generate_series(1, 2000) g(i)),
toasted_col2 = (SELECT string_agg((g.i*2)::text, '') FROM generate_series(1, 2000) g(i))
where id = 1;

SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'include-typmod', '0', 'include-missing-toast', '1');
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
37 changes: 37 additions & 0 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ typedef struct
bool include_type_oids; /* include data type oids */
bool include_typmod; /* include typmod in types */
bool include_not_null; /* include not-null constraints */
bool include_missing_toast; /* include list of missing toast columns */

bool pretty_print; /* pretty-print JSON? */
bool write_in_chunks; /* write in chunks? */
Expand Down Expand Up @@ -145,6 +146,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
data->write_in_chunks = false;
data->include_lsn = false;
data->include_not_null = false;
data->include_missing_toast = false;
data->filter_tables = NIL;

data->format_version = WAL2JSON_FORMAT_VERSION;
Expand Down Expand Up @@ -369,6 +371,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
pfree(rawstr);
}
}
else if (strcmp(elem->defname, "include-missing-toast") == 0)
{
if (elem->arg == NULL)
{
elog(DEBUG1, "include-missing-toast argument is null");
data->include_missing_toast = true;
}
else if (!parse_bool(strVal(elem->arg), &data->include_missing_toast))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "format-version") == 0)
{
if (elem->arg == NULL)
Expand Down Expand Up @@ -492,11 +507,13 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
int natt;

StringInfoData colnames;
StringInfoData missingtoastcols;
StringInfoData coltypes;
StringInfoData coltypeoids;
StringInfoData colnotnulls;
StringInfoData colvalues;
char comma[3] = "";
char toastcomma[3] = "";

data = ctx->output_plugin_private;

Expand All @@ -506,6 +523,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
initStringInfo(&coltypeoids);
if (data->include_not_null)
initStringInfo(&colnotnulls);
if (data->include_missing_toast)
initStringInfo(&missingtoastcols);
initStringInfo(&colvalues);

/*
Expand All @@ -530,6 +549,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
appendStringInfo(&coltypeoids, "%s%s%s\"columntypeoids\":%s[", data->ht, data->ht, data->ht, data->sp);
if (data->include_not_null)
appendStringInfo(&colnotnulls, "%s%s%s\"columnoptionals\":%s[", data->ht, data->ht, data->ht, data->sp);
if (data->include_missing_toast)
appendStringInfo(&missingtoastcols, "%s%s%s\"missingtoastcols\":%s[", data->ht, data->ht, data->ht, data->sp);
appendStringInfo(&colvalues, "%s%s%s\"columnvalues\":%s[", data->ht, data->ht, data->ht, data->sp);
}

Expand Down Expand Up @@ -610,6 +631,16 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
/* XXX Unchanged TOAST Datum does not need to be output */
if (!isnull && typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
{
if (data->include_missing_toast)
{
appendStringInfo(&missingtoastcols, "%s", toastcomma);
escape_json(&missingtoastcols, NameStr(attr->attname));
}

/* first missing toast has no comma */
if (strcmp(toastcomma, "") == 0)
snprintf(toastcomma, 3, ",%s", data->sp);

elog(DEBUG1, "column \"%s\" has an unchanged TOAST", NameStr(attr->attname));
continue;
}
Expand Down Expand Up @@ -736,6 +767,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
appendStringInfo(&coltypeoids, "],%s", data->nl);
if (data->include_not_null)
appendStringInfo(&colnotnulls, "],%s", data->nl);
if (data->include_missing_toast)
appendStringInfo(&missingtoastcols, "],%s", data->nl);
if (hasreplident)
appendStringInfo(&colvalues, "],%s", data->nl);
else
Expand All @@ -750,6 +783,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
appendStringInfoString(ctx->out, coltypeoids.data);
if (data->include_not_null)
appendStringInfoString(ctx->out, colnotnulls.data);
if (data->include_missing_toast)
appendStringInfoString(ctx->out, missingtoastcols.data);
appendStringInfoString(ctx->out, colvalues.data);

pfree(colnames.data);
Expand All @@ -758,6 +793,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
pfree(coltypeoids.data);
if (data->include_not_null)
pfree(colnotnulls.data);
if (data->include_missing_toast)
pfree(missingtoastcols.data);
pfree(colvalues.data);
}

Expand Down