diff --git a/pgsync/__init__.py b/pgsync/__init__.py index 5d441df0..2570a913 100644 --- a/pgsync/__init__.py +++ b/pgsync/__init__.py @@ -4,4 +4,4 @@ __author__ = "Tolu Aina" __email__ = "tolu@pgsync.com" -__version__ = "2.3.4" +__version__ = "2.4.0" diff --git a/pgsync/base.py b/pgsync/base.py index efdbeaa8..736cf86f 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -51,7 +51,7 @@ class Payload(object): - __slots__ = ("tg_op", "table", "schema", "old", "new", "xmin") + __slots__ = ("tg_op", "table", "schema", "old", "new", "xmin", "indices") def __init__( self, @@ -61,6 +61,7 @@ def __init__( old: dict = Optional[None], new: dict = Optional[None], xmin: int = Optional[None], + indices: List[str] = Optional[None], ): self.tg_op: str = tg_op self.table: str = table @@ -68,6 +69,7 @@ def __init__( self.old: dict = old or {} self.new: dict = new or {} self.xmin: str = xmin + self.indices: List[str] = indices @property def data(self) -> dict: @@ -481,12 +483,17 @@ def logical_slot_count_changes( # Views... def create_view( - self, schema: str, tables: Set, user_defined_fkey_tables: dict + self, + index: str, + schema: str, + tables: Set, + user_defined_fkey_tables: dict, ) -> None: create_view( self.engine, self.models, self.fetchall, + index, schema, tables, user_defined_fkey_tables, diff --git a/pgsync/sync.py b/pgsync/sync.py index 6b82d9a3..1b789d59 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -279,7 +279,9 @@ def setup(self) -> None: user_defined_fkey_tables.setdefault(node.table, set()) user_defined_fkey_tables[node.table] |= set(columns) if tables: - self.create_view(schema, tables, user_defined_fkey_tables) + self.create_view( + self.index, schema, tables, user_defined_fkey_tables + ) self.create_triggers( schema, tables=tables, join_queries=join_queries ) @@ -1039,9 +1041,10 @@ def poll_db(self) -> None: notification: AnyStr = conn.notifies.pop(0) if notification.channel == self.database: payload = json.loads(notification.payload) - payloads.append(payload) - logger.debug(f"on_notify: {payload}") - self.count["db"] += 1 + if self.index in payload["indices"]: + payloads.append(payload) + logger.debug(f"on_notify: {payload}") + self.count["db"] += 1 @exception def async_poll_db(self) -> None: @@ -1060,9 +1063,10 @@ def async_poll_db(self) -> None: notification: AnyStr = self.conn.notifies.pop(0) if notification.channel == self.database: payload = json.loads(notification.payload) - self.redis.bulk_push([payload]) - logger.debug(f"on_notify: {payload}") - self.count["db"] += 1 + if self.index in payload["indices"]: + self.redis.bulk_push([payload]) + logger.debug(f"on_notify: {payload}") + self.count["db"] += 1 def refresh_views(self) -> None: self._refresh_views() @@ -1181,7 +1185,7 @@ async def async_status(self) -> None: def _status(self, label: str) -> None: sys.stdout.write( - f"{label} {self.database} " + f"{label} {self.database}:{self.index} " f"Xlog: [{self.count['xlog']:,}] => " f"Db: [{self.count['db']:,}] => " f"Redis: [total = {self.count['redis']:,} " diff --git a/pgsync/trigger.py b/pgsync/trigger.py index d726f651..c380a948 100644 --- a/pgsync/trigger.py +++ b/pgsync/trigger.py @@ -9,6 +9,7 @@ new_row JSON; notification JSON; xmin BIGINT; + _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; @@ -18,8 +19,8 @@ IF TG_OP = 'DELETE' THEN - SELECT primary_keys - INTO _primary_keys + SELECT primary_keys, indices + INTO _primary_keys, _indices FROM {MATERIALIZED_VIEW} WHERE table_name = TG_TABLE_NAME; @@ -33,8 +34,8 @@ ELSE IF TG_OP <> 'TRUNCATE' THEN - SELECT primary_keys, foreign_keys - INTO _primary_keys, _foreign_keys + SELECT primary_keys, foreign_keys, indices + INTO _primary_keys, _foreign_keys, _indices FROM {MATERIALIZED_VIEW} WHERE table_name = TG_TABLE_NAME; @@ -61,6 +62,7 @@ 'xmin', xmin, 'new', new_row, 'old', old_row, + 'indices', _indices, 'tg_op', TG_OP, 'table', TG_TABLE_NAME, 'schema', TG_TABLE_SCHEMA diff --git a/pgsync/view.py b/pgsync/view.py index 3c4b34e3..ea57dfe5 100644 --- a/pgsync/view.py +++ b/pgsync/view.py @@ -194,6 +194,7 @@ def create_view( engine: sa.engine.Engine, models: Callable, fetchall: Callable, + index: str, schema: str, tables: Set, user_defined_fkey_tables: dict, @@ -213,28 +214,36 @@ def create_view( So if 'specie' was the only row before, and the next query returns 'unit' and 'structure', we want to end up with the result below. - table_name | primary_keys | foreign_keys - -----------+--------------+-------------- - specie | {id} | {id, user_id} - unit | {id} | {id, profile_id} - structure | {id} | {id} + table_name | primary_keys | foreign_keys | indices + ------------+--------------+------------------+------------ + specie | {id} | {id, user_id} | {foo, bar} + unit | {id} | {id, profile_id} | {foo, bar} + structure | {id} | {id} | {foo, bar} + unit | {id} | {id, profile_id} | {foo, bar} + structure | {id} | {id} | {foo, bar} """ rows: dict = {} if MATERIALIZED_VIEW in views: - for table_name, primary_keys, foreign_keys in fetchall( + for table_name, primary_keys, foreign_keys, indices in fetchall( sa.select(["*"]).select_from( sa.text(f"{schema}.{MATERIALIZED_VIEW}") ) ): rows.setdefault( table_name, - {"primary_keys": set(), "foreign_keys": set()}, + { + "primary_keys": set(), + "foreign_keys": set(), + "indices": set(), + }, ) if primary_keys: rows[table_name]["primary_keys"] = set(primary_keys) if foreign_keys: rows[table_name]["foreign_keys"] = set(foreign_keys) + if indices: + rows[table_name]["indices"] = set(indices) engine.execute(DropView(schema, MATERIALIZED_VIEW)) @@ -245,32 +254,39 @@ def create_view( for table_name, columns in fetchall(_primary_keys(models, schema, tables)): rows.setdefault( table_name, - {"primary_keys": set(), "foreign_keys": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, ) if columns: rows[table_name]["primary_keys"] |= set(columns) + rows[table_name]["indices"] |= set([index]) for table_name, columns in fetchall(_foreign_keys(models, schema, tables)): rows.setdefault( table_name, - {"primary_keys": set(), "foreign_keys": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, ) if columns: rows[table_name]["foreign_keys"] |= set(columns) + rows[table_name]["indices"] |= set([index]) if user_defined_fkey_tables: for table_name, columns in user_defined_fkey_tables.items(): rows.setdefault( table_name, - {"primary_keys": set(), "foreign_keys": set()}, + { + "primary_keys": set(), + "foreign_keys": set(), + "indices": set(), + }, ) if columns: rows[table_name]["foreign_keys"] |= set(columns) + rows[table_name]["indices"] |= set([index]) if not rows: rows.setdefault( None, - {"primary_keys": set(), "foreign_keys": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, ) statement = sa.select( @@ -278,6 +294,7 @@ def create_view( sa.column("table_name"), sa.column("primary_keys"), sa.column("foreign_keys"), + sa.column("indices"), ) .data( [ @@ -289,6 +306,9 @@ def create_view( array(fields.get("foreign_keys")) if fields.get("foreign_keys") else None, + array(fields.get("indices")) + if fields.get("indices") + else None, ) for table_name, fields in rows.items() ] @@ -299,7 +319,12 @@ def create_view( engine.execute(CreateView(schema, MATERIALIZED_VIEW, statement)) engine.execute(DropIndex("_idx")) engine.execute( - CreateIndex("_idx", schema, MATERIALIZED_VIEW, ["table_name"]) + CreateIndex( + "_idx", + schema, + MATERIALIZED_VIEW, + ["table_name"], + ) ) logger.debug(f"Created view: {schema}.{MATERIALIZED_VIEW}") diff --git a/tests/test_sync.py b/tests/test_sync.py index d0f5c914..86fcad98 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -311,7 +311,7 @@ def test_status(self, sync): with patch("pgsync.sync.sys") as mock_sys: sync._status("mydb") mock_sys.stdout.write.assert_called_once_with( - "mydb testdb " + "mydb testdb:testdb " "Xlog: [0] => " "Db: [0] => " "Redis: [total = 0 " @@ -765,6 +765,7 @@ def test_setup(self, mock_teardown, sync): join_queries=True, ) mock_create_view.assert_called_once_with( + "testdb", "public", {"publisher", "book"}, {"publisher": {"publisher_id", "id"}}, diff --git a/tests/test_trigger.py b/tests/test_trigger.py index 62315261..ab9256ab 100644 --- a/tests/test_trigger.py +++ b/tests/test_trigger.py @@ -18,6 +18,7 @@ def test_trigger_template(self): new_row JSON; notification JSON; xmin BIGINT; + _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; @@ -27,8 +28,8 @@ def test_trigger_template(self): IF TG_OP = 'DELETE' THEN - SELECT primary_keys - INTO _primary_keys + SELECT primary_keys, indices + INTO _primary_keys, _indices FROM _view WHERE table_name = TG_TABLE_NAME; @@ -42,8 +43,8 @@ def test_trigger_template(self): ELSE IF TG_OP <> 'TRUNCATE' THEN - SELECT primary_keys, foreign_keys - INTO _primary_keys, _foreign_keys + SELECT primary_keys, foreign_keys, indices + INTO _primary_keys, _foreign_keys, _indices FROM _view WHERE table_name = TG_TABLE_NAME; @@ -70,6 +71,7 @@ def test_trigger_template(self): 'xmin', xmin, 'new', new_row, 'old', old_row, + 'indices', _indices, 'tg_op', TG_OP, 'table', TG_TABLE_NAME, 'schema', TG_TABLE_SCHEMA diff --git a/tests/test_view.py b/tests/test_view.py index e6106db5..643db3f0 100644 --- a/tests/test_view.py +++ b/tests/test_view.py @@ -235,6 +235,7 @@ def fetchall(statement): connection.engine, pg_base.models, fetchall, + "testdb", DEFAULT_SCHEMA, ["book", "publisher"], user_defined_fkey_tables={}, @@ -254,6 +255,7 @@ def fetchall(statement): connection.engine, pg_base.models, fetchall, + "testdb", "myschema", set(["book", "publisher"]), user_defined_fkey_tables=user_defined_fkey_tables,