Skip to content

Commit

Permalink
Filter is not applied for TRUNCATE
Browse files Browse the repository at this point in the history
Filter is failing to exclude tables that matchs the filter. Filter code
was refactored at commit a96dd31 but I
forgot to replace the filter in the truncate function. I include a
regression test to cover this code.
  • Loading branch information
eulerto committed Aug 22, 2021
1 parent 1527dfc commit ef1da1f
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 44 deletions.
35 changes: 33 additions & 2 deletions expected/truncate.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ CREATE TABLE table_truncate_1 (a integer, b text);
CREATE TABLE table_truncate_2 (a integer, b text);
CREATE TABLE table_truncate_3 (a integer, b text);
CREATE TABLE table_truncate_4 (a integer, b text);
CREATE TABLE table_truncate_5 (a integer, b text);
CREATE TABLE table_truncate_6 (a integer, b text);
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
?column?
----------
Expand All @@ -22,6 +24,9 @@ COMMIT;
BEGIN;
TRUNCATE table_truncate_4;
ROLLBACK;
BEGIN;
TRUNCATE table_truncate_5, table_truncate_6;
COMMIT;
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1');
data
-------------------------------------------------------------
Expand Down Expand Up @@ -57,7 +62,11 @@ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'fo
} +
] +
}
(2 rows)
{ +
"change": [ +
] +
}
(3 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
data
Expand All @@ -72,7 +81,29 @@ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'fo
{"action":"T","schema":"public","table":"table_truncate_3"}
{"action":"I","schema":"public","table":"table_truncate_3","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"text","value":"test3"}]}
{"action":"C"}
(10 rows)
{"action":"B"}
{"action":"T","schema":"public","table":"table_truncate_5"}
{"action":"T","schema":"public","table":"table_truncate_6"}
{"action":"C"}
(14 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-tables', '*.table_truncate_5');
data
------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"T","schema":"public","table":"table_truncate_1"}
{"action":"C"}
{"action":"B"}
{"action":"T","schema":"public","table":"table_truncate_2"}
{"action":"I","schema":"public","table":"table_truncate_1","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"text","value":"test1"}]}
{"action":"I","schema":"public","table":"table_truncate_3","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"text","value":"test2"}]}
{"action":"T","schema":"public","table":"table_truncate_3"}
{"action":"I","schema":"public","table":"table_truncate_3","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"text","value":"test3"}]}
{"action":"C"}
{"action":"B"}
{"action":"T","schema":"public","table":"table_truncate_6"}
{"action":"C"}
(13 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
Expand Down
7 changes: 7 additions & 0 deletions sql/truncate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ CREATE TABLE table_truncate_1 (a integer, b text);
CREATE TABLE table_truncate_2 (a integer, b text);
CREATE TABLE table_truncate_3 (a integer, b text);
CREATE TABLE table_truncate_4 (a integer, b text);
CREATE TABLE table_truncate_5 (a integer, b text);
CREATE TABLE table_truncate_6 (a integer, b text);

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

Expand All @@ -23,6 +25,11 @@ BEGIN;
TRUNCATE table_truncate_4;
ROLLBACK;

BEGIN;
TRUNCATE table_truncate_5, table_truncate_6;
COMMIT;

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-tables', '*.table_truncate_5');
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
66 changes: 24 additions & 42 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -2555,6 +2555,22 @@ static void pg_decode_truncate_v1(LogicalDecodingContext *ctx,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);

/* Exclude tables, if available */
if (pg_filter_by_table(data->filter_tables, schemaname, tablename))
{
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
continue;
}

/* Add tables */
if (!pg_add_by_table(data->add_tables, schemaname, tablename))
{
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
continue;
}

if (data->write_in_chunks)
OutputPluginPrepareWrite(ctx, true);

Expand Down Expand Up @@ -2641,53 +2657,19 @@ static void pg_decode_truncate_v2(LogicalDecodingContext *ctx,
tablename = RelationGetRelationName(relations[i]);

/* Exclude tables, if available */
if (list_length(data->filter_tables) > 0)
if (pg_filter_by_table(data->filter_tables, schemaname, tablename))
{
ListCell *lc;

foreach(lc, data->filter_tables)
{
SelectTable *t = lfirst(lc);

if (t->allschemas || strcmp(t->schemaname, schemaname) == 0)
{
if (t->alltables || strcmp(t->tablename, tablename) == 0)
{
elog(DEBUG2, "\"%s\".\"%s\" was filtered out",
((t->allschemas) ? "*" : t->schemaname),
((t->alltables) ? "*" : t->tablename));
continue;
}
}
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
continue;
}

/* Add tables */
if (list_length(data->add_tables) > 0)
if (!pg_add_by_table(data->add_tables, schemaname, tablename))
{
ListCell *lc;
bool skip = true;

/* all tables in all schemas are added by default */
foreach(lc, data->add_tables)
{
SelectTable *t = lfirst(lc);

if (t->allschemas || strcmp(t->schemaname, schemaname) == 0)
{
if (t->alltables || strcmp(t->tablename, tablename) == 0)
{
elog(DEBUG2, "\"%s\".\"%s\" was added",
((t->allschemas) ? "*" : t->schemaname),
((t->alltables) ? "*" : t->tablename));
skip = false;
}
}
}

/* table was not found */
if (skip)
continue;
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
continue;
}

OutputPluginPrepareWrite(ctx, true);
Expand Down

0 comments on commit ef1da1f

Please sign in to comment.