diff --git a/expected/truncate.out b/expected/truncate.out index ad5ad4db745f..4d8b965971d5 100644 --- a/expected/truncate.out +++ b/expected/truncate.out @@ -5,6 +5,8 @@ CREATE TABLE table_truncate_1 (a integer, b text); CREATE TABLE table_truncate_2 (a integer, b text); CREATE TABLE table_truncate_3 (a integer, b text); CREATE TABLE table_truncate_4 (a integer, b text); +CREATE TABLE table_truncate_5 (a integer, b text); +CREATE TABLE table_truncate_6 (a integer, b text); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); ?column? ---------- @@ -22,6 +24,9 @@ COMMIT; BEGIN; TRUNCATE table_truncate_4; ROLLBACK; +BEGIN; +TRUNCATE table_truncate_5, table_truncate_6; +COMMIT; SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1'); data ------------------------------------------------------------- @@ -57,7 +62,11 @@ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'fo } + ] + } -(2 rows) + { + + "change": [ + + ] + + } +(3 rows) SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2'); data @@ -72,7 +81,29 @@ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'fo {"action":"T","schema":"public","table":"table_truncate_3"} {"action":"I","schema":"public","table":"table_truncate_3","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"text","value":"test3"}]} {"action":"C"} -(10 rows) + {"action":"B"} + {"action":"T","schema":"public","table":"table_truncate_5"} + {"action":"T","schema":"public","table":"table_truncate_6"} + {"action":"C"} +(14 rows) + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-tables', '*.table_truncate_5'); + data +------------------------------------------------------------------------------------------------------------------------------------------------------------ + {"action":"B"} + {"action":"T","schema":"public","table":"table_truncate_1"} + {"action":"C"} + {"action":"B"} + {"action":"T","schema":"public","table":"table_truncate_2"} + {"action":"I","schema":"public","table":"table_truncate_1","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"text","value":"test1"}]} + {"action":"I","schema":"public","table":"table_truncate_3","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"text","value":"test2"}]} + {"action":"T","schema":"public","table":"table_truncate_3"} + {"action":"I","schema":"public","table":"table_truncate_3","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"text","value":"test3"}]} + {"action":"C"} + {"action":"B"} + {"action":"T","schema":"public","table":"table_truncate_6"} + {"action":"C"} +(13 rows) SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); ?column? diff --git a/sql/truncate.sql b/sql/truncate.sql index 121b735f1903..0accc3b5a143 100644 --- a/sql/truncate.sql +++ b/sql/truncate.sql @@ -6,6 +6,8 @@ CREATE TABLE table_truncate_1 (a integer, b text); CREATE TABLE table_truncate_2 (a integer, b text); CREATE TABLE table_truncate_3 (a integer, b text); CREATE TABLE table_truncate_4 (a integer, b text); +CREATE TABLE table_truncate_5 (a integer, b text); +CREATE TABLE table_truncate_6 (a integer, b text); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); @@ -23,6 +25,11 @@ BEGIN; TRUNCATE table_truncate_4; ROLLBACK; +BEGIN; +TRUNCATE table_truncate_5, table_truncate_6; +COMMIT; + SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1'); SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2'); +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-tables', '*.table_truncate_5'); SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/wal2json.c b/wal2json.c index daa78e12705b..056fa02758cf 100644 --- a/wal2json.c +++ b/wal2json.c @@ -2555,6 +2555,22 @@ static void pg_decode_truncate_v1(LogicalDecodingContext *ctx, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); + /* Exclude tables, if available */ + if (pg_filter_by_table(data->filter_tables, schemaname, tablename)) + { + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + continue; + } + + /* Add tables */ + if (!pg_add_by_table(data->add_tables, schemaname, tablename)) + { + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + continue; + } + if (data->write_in_chunks) OutputPluginPrepareWrite(ctx, true); @@ -2641,53 +2657,19 @@ static void pg_decode_truncate_v2(LogicalDecodingContext *ctx, tablename = RelationGetRelationName(relations[i]); /* Exclude tables, if available */ - if (list_length(data->filter_tables) > 0) + if (pg_filter_by_table(data->filter_tables, schemaname, tablename)) { - ListCell *lc; - - foreach(lc, data->filter_tables) - { - SelectTable *t = lfirst(lc); - - if (t->allschemas || strcmp(t->schemaname, schemaname) == 0) - { - if (t->alltables || strcmp(t->tablename, tablename) == 0) - { - elog(DEBUG2, "\"%s\".\"%s\" was filtered out", - ((t->allschemas) ? "*" : t->schemaname), - ((t->alltables) ? "*" : t->tablename)); - continue; - } - } - } + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + continue; } /* Add tables */ - if (list_length(data->add_tables) > 0) + if (!pg_add_by_table(data->add_tables, schemaname, tablename)) { - ListCell *lc; - bool skip = true; - - /* all tables in all schemas are added by default */ - foreach(lc, data->add_tables) - { - SelectTable *t = lfirst(lc); - - if (t->allschemas || strcmp(t->schemaname, schemaname) == 0) - { - if (t->alltables || strcmp(t->tablename, tablename) == 0) - { - elog(DEBUG2, "\"%s\".\"%s\" was added", - ((t->allschemas) ? "*" : t->schemaname), - ((t->alltables) ? "*" : t->tablename)); - skip = false; - } - } - } - - /* table was not found */ - if (skip) - continue; + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + continue; } OutputPluginPrepareWrite(ctx, true);