diff --git a/src/postgres/src/backend/catalog/yb_system_views.sql b/src/postgres/src/backend/catalog/yb_system_views.sql index 3e6f5ba643b5..159be40c73eb 100644 --- a/src/postgres/src/backend/catalog/yb_system_views.sql +++ b/src/postgres/src/backend/catalog/yb_system_views.sql @@ -856,7 +856,8 @@ CREATE VIEW pg_replication_slots AS L.catalog_xmin, L.restart_lsn, L.confirmed_flush_lsn, - L.yb_stream_id + L.yb_stream_id, + L.yb_restart_commit_ht FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/postgres/src/backend/replication/slotfuncs.c b/src/postgres/src/backend/replication/slotfuncs.c index 190e91965f1d..7f704cf99ef9 100644 --- a/src/postgres/src/backend/replication/slotfuncs.c +++ b/src/postgres/src/backend/replication/slotfuncs.c @@ -269,7 +269,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) { #define PG_GET_REPLICATION_SLOTS_COLS 11 /* YB specific fields in pg_get_replication_slots */ -#define YB_PG_GET_REPLICATION_SLOTS_COLS 1 +#define YB_PG_GET_REPLICATION_SLOTS_COLS 2 if (IsYugaByteEnabled() && !yb_enable_replication_commands) ereport(ERROR, @@ -355,6 +355,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) const char *yb_stream_id; bool yb_stream_active; + uint64 yb_restart_commit_ht; if (IsYugaByteEnabled()) { @@ -368,6 +369,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) restart_lsn = slot->restart_lsn; confirmed_flush_lsn = slot->confirmed_flush; + yb_restart_commit_ht = slot->record_id_commit_time_ht; xmin = slot->xmin; /* * Set catalog_xmin as xmin to make the PG Debezium connector work. @@ -452,9 +454,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) nulls[i++] = true; if (IsYugaByteEnabled()) + { values[i++] = CStringGetTextDatum(yb_stream_id); + values[i++] = Int64GetDatum(yb_restart_commit_ht); + } else + { nulls[i++] = true; + nulls[i++] = true; + } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/postgres/src/include/catalog/pg_proc.dat b/src/postgres/src/include/catalog/pg_proc.dat index 2940d79cf377..d4bbfe13c7cb 100644 --- a/src/postgres/src/include/catalog/pg_proc.dat +++ b/src/postgres/src/include/catalog/pg_proc.dat @@ -9927,9 +9927,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,yb_stream_id}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,yb_stream_id,yb_restart_commit_ht}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/postgres/src/include/catalog/pg_yb_migration.dat b/src/postgres/src/include/catalog/pg_yb_migration.dat index 82a27f988a1f..8ba8af4a9076 100644 --- a/src/postgres/src/include/catalog/pg_yb_migration.dat +++ b/src/postgres/src/include/catalog/pg_yb_migration.dat @@ -12,7 +12,7 @@ [ # For better version control conflict detection, list latest migration filename -# here: V49__19132__yb_local_tablets.sql -{ major => '49', minor => '0', name => '', time_applied => '_null_' } +# here: V50__22040__yb_restart_commit_ht_in_pg_replication_slots.sql +{ major => '50', minor => '0', name => '', time_applied => '_null_' } ] diff --git a/src/postgres/src/test/regress/expected/yb_pg_rules.out b/src/postgres/src/test/regress/expected/yb_pg_rules.out index 375a219095f9..90593f03e6d7 100644 --- a/src/postgres/src/test/regress/expected/yb_pg_rules.out +++ b/src/postgres/src/test/regress/expected/yb_pg_rules.out @@ -1453,8 +1453,9 @@ pg_replication_slots| SELECT l.slot_name, l.catalog_xmin, l.restart_lsn, l.confirmed_flush_lsn, - l.yb_stream_id - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, yb_stream_id) + l.yb_stream_id, + l.yb_restart_commit_ht + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, yb_stream_id, yb_restart_commit_ht) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/postgres/src/test/regress/expected/yb_replication_slot.out b/src/postgres/src/test/regress/expected/yb_replication_slot.out index e3530e94ef9a..8153f4e4904e 100644 --- a/src/postgres/src/test/regress/expected/yb_replication_slot.out +++ b/src/postgres/src/test/regress/expected/yb_replication_slot.out @@ -17,7 +17,7 @@ SELECT * FROM pg_create_logical_replication_slot('testslot2', 'pgoutput', false) testslot2 | 0/2 (1 row) --- Cannot do SELECT * since yb_stream_id changes across runs. +-- Cannot do SELECT * since yb_stream_id, yb_restart_commit_ht changes across runs. SELECT slot_name, plugin, slot_type, database, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; diff --git a/src/postgres/src/test/regress/sql/yb_replication_slot.sql b/src/postgres/src/test/regress/sql/yb_replication_slot.sql index 219ee97b88a4..05f1840e7a91 100644 --- a/src/postgres/src/test/regress/sql/yb_replication_slot.sql +++ b/src/postgres/src/test/regress/sql/yb_replication_slot.sql @@ -10,7 +10,7 @@ SET SESSION AUTHORIZATION 'regress_replicationslot_user'; SELECT * FROM pg_create_logical_replication_slot('testslot1', 'pgoutput', false); SELECT * FROM pg_create_logical_replication_slot('testslot2', 'pgoutput', false); --- Cannot do SELECT * since yb_stream_id changes across runs. +-- Cannot do SELECT * since yb_stream_id, yb_restart_commit_ht changes across runs. SELECT slot_name, plugin, slot_type, database, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; diff --git a/src/yb/yql/pgwrapper/ysql_migrations/V44__19211__yb_stream_id_in_pg_replication_slots.sql b/src/yb/yql/pgwrapper/ysql_migrations/V44__19211__yb_stream_id_in_pg_replication_slots.sql index bc2507f73e05..c5e0e18a9e53 100644 --- a/src/yb/yql/pgwrapper/ysql_migrations/V44__19211__yb_stream_id_in_pg_replication_slots.sql +++ b/src/yb/yql/pgwrapper/ysql_migrations/V44__19211__yb_stream_id_in_pg_replication_slots.sql @@ -22,20 +22,32 @@ COMMIT; -- Recreating system views that use pg_get_replication_slots to update their corresponding -- pg_rewrite entries. -CREATE OR REPLACE VIEW pg_catalog.pg_replication_slots WITH (use_initdb_acl = true) AS - SELECT - L.slot_name, - L.plugin, - L.slot_type, - L.datoid, - D.datname AS database, - L.temporary, - L.active, - L.active_pid, - L.xmin, - L.catalog_xmin, - L.restart_lsn, - L.confirmed_flush_lsn, - L.yb_stream_id - FROM pg_get_replication_slots() AS L - LEFT JOIN pg_database D ON (L.datoid = D.oid); +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT TRUE FROM pg_attribute + WHERE attrelid = 'pg_catalog.pg_replication_slots'::regclass + AND attname = 'yb_stream_id' + AND NOT attisdropped + ) THEN + CREATE OR REPLACE VIEW pg_catalog.pg_replication_slots + WITH (use_initdb_acl = true) + AS + SELECT + L.slot_name, + L.plugin, + L.slot_type, + L.datoid, + D.datname AS database, + L.temporary, + L.active, + L.active_pid, + L.xmin, + L.catalog_xmin, + L.restart_lsn, + L.confirmed_flush_lsn, + L.yb_stream_id + FROM pg_get_replication_slots() AS L + LEFT JOIN pg_database D ON (L.datoid = D.oid); + END IF; +END $$; diff --git a/src/yb/yql/pgwrapper/ysql_migrations/V50__22040__yb_restart_commit_ht_in_pg_replication_slots.sql b/src/yb/yql/pgwrapper/ysql_migrations/V50__22040__yb_restart_commit_ht_in_pg_replication_slots.sql new file mode 100644 index 000000000000..2e9e2f22c984 --- /dev/null +++ b/src/yb/yql/pgwrapper/ysql_migrations/V50__22040__yb_restart_commit_ht_in_pg_replication_slots.sql @@ -0,0 +1,54 @@ +BEGIN; + SET LOCAL yb_non_ddl_txn_for_sys_tables_allowed TO true; + + -- Add a column for restart commit ht in pg_get_replication_slots + -- TODO: As a workaround for GHI #13500, we perform a delete + insert instead + -- of an update into pg_proc. Restore to UPDATE once fixed. + DELETE FROM pg_catalog.pg_proc WHERE proname = 'pg_get_replication_slots' AND + pronamespace = 'pg_catalog'::regnamespace; + INSERT INTO pg_catalog.pg_proc ( + oid, proname, pronamespace, proowner, prolang, procost, prorows, provariadic, protransform, + prokind, prosecdef, proleakproof, proisstrict, proretset, provolatile, proparallel, + pronargs, pronargdefaults, prorettype, proargtypes, proallargtypes, proargmodes, + proargnames, proargdefaults, protrftypes, prosrc, probin, proconfig, proacl + ) VALUES ( + 3781, 'pg_get_replication_slots', 11, 10, 12, 1, 10, 0, '-', 'f', false, false, false, + true, 's', 's', 0, 0, 2249, '', '{19,19,25,26,16,16,23,28,28,3220,3220,25,20}', + '{o,o,o,o,o,o,o,o,o,o,o,o,o}', '{slot_name,plugin,slot_type,datoid,temporary,active, + active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,yb_stream_id,yb_restart_commit_ht}', + NULL, NULL, 'pg_get_replication_slots', NULL, NULL, NULL) + ON CONFLICT DO NOTHING; +COMMIT; + +-- Recreating system views that use pg_get_replication_slots to update their corresponding +-- pg_rewrite entries. +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT TRUE FROM pg_attribute + WHERE attrelid = 'pg_catalog.pg_replication_slots'::regclass + AND attname = 'yb_restart_commit_ht' + AND NOT attisdropped + ) THEN + CREATE OR REPLACE VIEW pg_catalog.pg_replication_slots + WITH (use_initdb_acl = true) + AS + SELECT + L.slot_name, + L.plugin, + L.slot_type, + L.datoid, + D.datname AS database, + L.temporary, + L.active, + L.active_pid, + L.xmin, + L.catalog_xmin, + L.restart_lsn, + L.confirmed_flush_lsn, + L.yb_stream_id, + L.yb_restart_commit_ht + FROM pg_get_replication_slots() AS L + LEFT JOIN pg_database D ON (L.datoid = D.oid); + END IF; +END $$;